xref: /oneTBB/include/oneapi/tbb/flow_graph.h (revision 3e352b48)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #include <atomic>
21 #include <memory>
22 #include <type_traits>
23 
24 #include "detail/_config.h"
25 #include "detail/_namespace_injection.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "detail/_pipeline_filters.h"
31 #include "detail/_task.h"
32 #include "detail/_small_object_pool.h"
33 #include "cache_aligned_allocator.h"
34 #include "detail/_exception.h"
35 #include "detail/_template_helpers.h"
36 #include "detail/_aggregator.h"
37 #include "detail/_allocator_traits.h"
38 #include "detail/_utils.h"
39 #include "profiling.h"
40 #include "task_arena.h"
41 
42 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
43    #if __INTEL_COMPILER
44        // Disabled warning "routine is both inline and noinline"
45        #pragma warning (push)
46        #pragma warning( disable: 2196 )
47    #endif
48    #define __TBB_NOINLINE_SYM __attribute__((noinline))
49 #else
50    #define __TBB_NOINLINE_SYM
51 #endif
52 
53 #include <tuple>
54 #include <list>
55 #include <queue>
56 #if __TBB_CPP20_CONCEPTS_PRESENT
57 #include <concepts>
58 #endif
59 
60 /** @file
61   \brief The graph related classes and functions
62 
63   There are some applications that best express dependencies as messages
64   passed between nodes in a graph.  These messages may contain data or
65   simply act as signals that a predecessors has completed. The graph
66   class and its associated node classes can be used to express such
67   applications.
68 */
69 
70 namespace tbb {
71 namespace detail {
72 
73 namespace d1 {
74 
75 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
76 enum concurrency { unlimited = 0, serial = 1 };
77 
78 //! A generic null type
79 struct null_type {};
80 
81 //! An empty class used for messages that mean "I'm done"
82 class continue_msg {};
83 
84 } // namespace d1
85 
86 #if __TBB_CPP20_CONCEPTS_PRESENT
87 namespace d0 {
88 
89 template <typename ReturnType, typename OutputType>
90 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
91                                 std::same_as<OutputType, ReturnType>;
92 
93 template <typename Body, typename Output>
94 concept continue_node_body = std::copy_constructible<Body> &&
95                              requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
96                                  { body(v) } -> node_body_return_type<Output>;
97                              };
98 
99 template <typename Body, typename Input, typename Output>
100 concept function_node_body = std::copy_constructible<Body> &&
101                              requires( Body& body, const Input& v ) {
102                                  { body(v) } -> node_body_return_type<Output>;
103                              };
104 
105 template <typename FunctionObject, typename Input, typename Key>
106 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
107                                     requires( FunctionObject& func, const Input& v ) {
108                                         { func(v) } -> adaptive_same_as<Key>;
109                                     };
110 
111 template <typename Body, typename Output>
112 concept input_node_body = std::copy_constructible<Body> &&
113                           requires( Body& body, tbb::detail::d1::flow_control& fc ) {
114                               { body(fc) } -> adaptive_same_as<Output>;
115                           };
116 
117 template <typename Body, typename Input, typename OutputPortsType>
118 concept multifunction_node_body = std::copy_constructible<Body> &&
119                                   requires( Body& body, const Input& v, OutputPortsType& p ) {
120                                       body(v, p);
121                                   };
122 
123 template <typename Sequencer, typename Value>
124 concept sequencer = std::copy_constructible<Sequencer> &&
125                     requires( Sequencer& seq, const Value& value ) {
126                         { seq(value) } -> adaptive_same_as<std::size_t>;
127                     };
128 
129 template <typename Body, typename Input, typename GatewayType>
130 concept async_node_body = std::copy_constructible<Body> &&
131                           requires( Body& body, const Input& v, GatewayType& gateway ) {
132                               body(v, gateway);
133                           };
134 
135 } // namespace d0
136 #endif // __TBB_CPP20_CONCEPTS_PRESENT
137 
138 namespace d1 {
139 
140 //! Forward declaration section
141 template< typename T > class sender;
142 template< typename T > class receiver;
143 class continue_receiver;
144 
145 template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
146 
147 template<typename T, typename M> class successor_cache;
148 template<typename T, typename M> class broadcast_cache;
149 template<typename T, typename M> class round_robin_cache;
150 template<typename T, typename M> class predecessor_cache;
151 template<typename T, typename M> class reservable_predecessor_cache;
152 
153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
154 namespace order {
155 struct following;
156 struct preceding;
157 }
158 template<typename Order, typename... Args> struct node_set;
159 #endif
160 
161 
162 } // namespace d1
163 } // namespace detail
164 } // namespace tbb
165 
166 //! The graph class
167 #include "detail/_flow_graph_impl.h"
168 
169 namespace tbb {
170 namespace detail {
171 namespace d1 {
172 
173 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
174     if (second->priority > first->priority)
175         return std::make_pair(second, first);
176     return std::make_pair(first, second);
177 }
178 
179 // submit task if necessary. Returns the non-enqueued task if there is one.
180 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
181     // if no RHS task, don't change left.
182     if (right == nullptr) return left;
183     // right != nullptr
184     if (left == nullptr) return right;
185     if (left == SUCCESSFULLY_ENQUEUED) return right;
186     // left contains a task
187     if (right != SUCCESSFULLY_ENQUEUED) {
188         // both are valid tasks
189         auto tasks_pair = order_tasks(left, right);
190         spawn_in_graph_arena(g, *tasks_pair.first);
191         return tasks_pair.second;
192     }
193     return left;
194 }
195 
196 //! Pure virtual template class that defines a sender of messages of type T
197 template< typename T >
198 class sender {
199 public:
200     virtual ~sender() {}
201 
202     //! Request an item from the sender
203     virtual bool try_get( T & ) { return false; }
204 
205     //! Reserves an item in the sender
206     virtual bool try_reserve( T & ) { return false; }
207 
208     //! Releases the reserved item
209     virtual bool try_release( ) { return false; }
210 
211     //! Consumes the reserved item
212     virtual bool try_consume( ) { return false; }
213 
214 protected:
215     //! The output type of this sender
216     typedef T output_type;
217 
218     //! The successor type for this node
219     typedef receiver<T> successor_type;
220 
221     //! Add a new successor to this node
222     virtual bool register_successor( successor_type &r ) = 0;
223 
224     //! Removes a successor from this node
225     virtual bool remove_successor( successor_type &r ) = 0;
226 
227     template<typename C>
228     friend bool register_successor(sender<C>& s, receiver<C>& r);
229 
230     template<typename C>
231     friend bool remove_successor  (sender<C>& s, receiver<C>& r);
232 };  // class sender<T>
233 
234 template<typename C>
235 bool register_successor(sender<C>& s, receiver<C>& r) {
236     return s.register_successor(r);
237 }
238 
239 template<typename C>
240 bool remove_successor(sender<C>& s, receiver<C>& r) {
241     return s.remove_successor(r);
242 }
243 
244 //! Pure virtual template class that defines a receiver of messages of type T
245 template< typename T >
246 class receiver {
247 public:
248     //! Destructor
249     virtual ~receiver() {}
250 
251     //! Put an item to the receiver
252     bool try_put( const T& t ) {
253         graph_task *res = try_put_task(t);
254         if (!res) return false;
255         if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
256         return true;
257     }
258 
259     //! put item to successor; return task to run the successor if possible.
260 protected:
261     //! The input type of this receiver
262     typedef T input_type;
263 
264     //! The predecessor type for this node
265     typedef sender<T> predecessor_type;
266 
267     template< typename R, typename B > friend class run_and_put_task;
268     template< typename X, typename Y > friend class broadcast_cache;
269     template< typename X, typename Y > friend class round_robin_cache;
270     virtual graph_task *try_put_task(const T& t) = 0;
271     virtual graph& graph_reference() const = 0;
272 
273     template<typename TT, typename M> friend class successor_cache;
274     virtual bool is_continue_receiver() { return false; }
275 
276     // TODO revamp: reconsider the inheritance and move node priority out of receiver
277     virtual node_priority_t priority() const { return no_priority; }
278 
279     //! Add a predecessor to the node
280     virtual bool register_predecessor( predecessor_type & ) { return false; }
281 
282     //! Remove a predecessor from the node
283     virtual bool remove_predecessor( predecessor_type & ) { return false; }
284 
285     template <typename C>
286     friend bool register_predecessor(receiver<C>& r, sender<C>& s);
287     template <typename C>
288     friend bool remove_predecessor  (receiver<C>& r, sender<C>& s);
289 }; // class receiver<T>
290 
291 template <typename C>
292 bool register_predecessor(receiver<C>& r, sender<C>& s) {
293     return r.register_predecessor(s);
294 }
295 
296 template <typename C>
297 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
298     return r.remove_predecessor(s);
299 }
300 
301 //! Base class for receivers of completion messages
302 /** These receivers automatically reset, but cannot be explicitly waited on */
303 class continue_receiver : public receiver< continue_msg > {
304 protected:
305 
306     //! Constructor
307     explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
308         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
309         my_current_count = 0;
310         my_priority = a_priority;
311     }
312 
313     //! Copy constructor
314     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
315         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
316         my_current_count = 0;
317         my_priority = src.my_priority;
318     }
319 
320     //! Increments the trigger threshold
321     bool register_predecessor( predecessor_type & ) override {
322         spin_mutex::scoped_lock l(my_mutex);
323         ++my_predecessor_count;
324         return true;
325     }
326 
327     //! Decrements the trigger threshold
328     /** Does not check to see if the removal of the predecessor now makes the current count
329         exceed the new threshold.  So removing a predecessor while the graph is active can cause
330         unexpected results. */
331     bool remove_predecessor( predecessor_type & ) override {
332         spin_mutex::scoped_lock l(my_mutex);
333         --my_predecessor_count;
334         return true;
335     }
336 
337     //! The input type
338     typedef continue_msg input_type;
339 
340     //! The predecessor type for this node
341     typedef receiver<input_type>::predecessor_type predecessor_type;
342 
343     template< typename R, typename B > friend class run_and_put_task;
344     template<typename X, typename Y> friend class broadcast_cache;
345     template<typename X, typename Y> friend class round_robin_cache;
346     // execute body is supposed to be too small to create a task for.
347     graph_task* try_put_task( const input_type & ) override {
348         {
349             spin_mutex::scoped_lock l(my_mutex);
350             if ( ++my_current_count < my_predecessor_count )
351                 return SUCCESSFULLY_ENQUEUED;
352             else
353                 my_current_count = 0;
354         }
355         graph_task* res = execute();
356         return res? res : SUCCESSFULLY_ENQUEUED;
357     }
358 
359     spin_mutex my_mutex;
360     int my_predecessor_count;
361     int my_current_count;
362     int my_initial_predecessor_count;
363     node_priority_t my_priority;
364     // the friend declaration in the base class did not eliminate the "protected class"
365     // error in gcc 4.1.2
366     template<typename U, typename V> friend class limiter_node;
367 
368     virtual void reset_receiver( reset_flags f ) {
369         my_current_count = 0;
370         if (f & rf_clear_edges) {
371             my_predecessor_count = my_initial_predecessor_count;
372         }
373     }
374 
375     //! Does whatever should happen when the threshold is reached
376     /** This should be very fast or else spawn a task.  This is
377         called while the sender is blocked in the try_put(). */
378     virtual graph_task* execute() = 0;
379     template<typename TT, typename M> friend class successor_cache;
380     bool is_continue_receiver() override { return true; }
381 
382     node_priority_t priority() const override { return my_priority; }
383 }; // class continue_receiver
384 
385 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
386     template <typename K, typename T>
387     K key_from_message( const T &t ) {
388         return t.key();
389     }
390 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
391 
392 } // d1
393 } // detail
394 } // tbb
395 
396 #include "detail/_flow_graph_trace_impl.h"
397 #include "detail/_hash_compare.h"
398 
399 namespace tbb {
400 namespace detail {
401 namespace d1 {
402 
403 #include "detail/_flow_graph_body_impl.h"
404 #include "detail/_flow_graph_cache_impl.h"
405 #include "detail/_flow_graph_types_impl.h"
406 
407 using namespace graph_policy_namespace;
408 
409 template <typename C, typename N>
410 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
411 {
412     if (begin) current_node = my_graph->my_nodes;
413     //else it is an end iterator by default
414 }
415 
416 template <typename C, typename N>
417 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
418     __TBB_ASSERT(current_node, "graph_iterator at end");
419     return *operator->();
420 }
421 
422 template <typename C, typename N>
423 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
424     return current_node;
425 }
426 
427 template <typename C, typename N>
428 void graph_iterator<C,N>::internal_forward() {
429     if (current_node) current_node = current_node->next;
430 }
431 
432 //! Constructs a graph with isolated task_group_context
433 inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
434     prepare_task_arena();
435     own_context = true;
436     cancelled = false;
437     caught_exception = false;
438     my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
439     fgt_graph(this);
440     my_is_active = true;
441 }
442 
443 inline graph::graph(task_group_context& use_this_context) :
444     my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
445     prepare_task_arena();
446     own_context = false;
447     cancelled = false;
448     caught_exception = false;
449     fgt_graph(this);
450     my_is_active = true;
451 }
452 
453 inline graph::~graph() {
454     wait_for_all();
455     if (own_context) {
456         my_context->~task_group_context();
457         r1::cache_aligned_deallocate(my_context);
458     }
459     delete my_task_arena;
460 }
461 
462 inline void graph::reserve_wait() {
463     my_wait_context.reserve();
464     fgt_reserve_wait(this);
465 }
466 
467 inline void graph::release_wait() {
468     fgt_release_wait(this);
469     my_wait_context.release();
470 }
471 
472 inline void graph::register_node(graph_node *n) {
473     n->next = nullptr;
474     {
475         spin_mutex::scoped_lock lock(nodelist_mutex);
476         n->prev = my_nodes_last;
477         if (my_nodes_last) my_nodes_last->next = n;
478         my_nodes_last = n;
479         if (!my_nodes) my_nodes = n;
480     }
481 }
482 
483 inline void graph::remove_node(graph_node *n) {
484     {
485         spin_mutex::scoped_lock lock(nodelist_mutex);
486         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
487         if (n->prev) n->prev->next = n->next;
488         if (n->next) n->next->prev = n->prev;
489         if (my_nodes_last == n) my_nodes_last = n->prev;
490         if (my_nodes == n) my_nodes = n->next;
491     }
492     n->prev = n->next = nullptr;
493 }
494 
495 inline void graph::reset( reset_flags f ) {
496     // reset context
497     deactivate_graph(*this);
498 
499     my_context->reset();
500     cancelled = false;
501     caught_exception = false;
502     // reset all the nodes comprising the graph
503     for(iterator ii = begin(); ii != end(); ++ii) {
504         graph_node *my_p = &(*ii);
505         my_p->reset_node(f);
506     }
507     // Reattach the arena. Might be useful to run the graph in a particular task_arena
508     // while not limiting graph lifetime to a single task_arena::execute() call.
509     prepare_task_arena( /*reinit=*/true );
510     activate_graph(*this);
511 }
512 
513 inline void graph::cancel() {
514     my_context->cancel_group_execution();
515 }
516 
517 inline graph::iterator graph::begin() { return iterator(this, true); }
518 
519 inline graph::iterator graph::end() { return iterator(this, false); }
520 
521 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
522 
523 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
524 
525 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
526 
527 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
528 
529 inline graph_node::graph_node(graph& g) : my_graph(g) {
530     my_graph.register_node(this);
531 }
532 
533 inline graph_node::~graph_node() {
534     my_graph.remove_node(this);
535 }
536 
537 #include "detail/_flow_graph_node_impl.h"
538 
539 
540 //! An executable node that acts as a source, i.e. it has no predecessors
541 
542 template < typename Output >
543     __TBB_requires(std::copyable<Output>)
544 class input_node : public graph_node, public sender< Output > {
545 public:
546     //! The type of the output message, which is complete
547     typedef Output output_type;
548 
549     //! The type of successors of this node
550     typedef typename sender<output_type>::successor_type successor_type;
551 
552     // Input node has no input type
553     typedef null_type input_type;
554 
555     //! Constructor for a node with a successor
556     template< typename Body >
557         __TBB_requires(input_node_body<Body, Output>)
558      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
559          : graph_node(g), my_active(false)
560          , my_body( new input_body_leaf< output_type, Body>(body) )
561          , my_init_body( new input_body_leaf< output_type, Body>(body) )
562          , my_successors(this), my_reserved(false), my_has_cached_item(false)
563     {
564         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
565                            static_cast<sender<output_type> *>(this), this->my_body);
566     }
567 
568 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
569     template <typename Body, typename... Successors>
570         __TBB_requires(input_node_body<Body, Output>)
571     input_node( const node_set<order::preceding, Successors...>& successors, Body body )
572         : input_node(successors.graph_reference(), body)
573     {
574         make_edges(*this, successors);
575     }
576 #endif
577 
578     //! Copy constructor
579     __TBB_NOINLINE_SYM input_node( const input_node& src )
580         : graph_node(src.my_graph), sender<Output>()
581         , my_active(false)
582         , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
583         , my_successors(this), my_reserved(false), my_has_cached_item(false)
584     {
585         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
586                            static_cast<sender<output_type> *>(this), this->my_body);
587     }
588 
589     //! The destructor
590     ~input_node() { delete my_body; delete my_init_body; }
591 
592     //! Add a new successor to this node
593     bool register_successor( successor_type &r ) override {
594         spin_mutex::scoped_lock lock(my_mutex);
595         my_successors.register_successor(r);
596         if ( my_active )
597             spawn_put();
598         return true;
599     }
600 
601     //! Removes a successor from this node
602     bool remove_successor( successor_type &r ) override {
603         spin_mutex::scoped_lock lock(my_mutex);
604         my_successors.remove_successor(r);
605         return true;
606     }
607 
608     //! Request an item from the node
609     bool try_get( output_type &v ) override {
610         spin_mutex::scoped_lock lock(my_mutex);
611         if ( my_reserved )
612             return false;
613 
614         if ( my_has_cached_item ) {
615             v = my_cached_item;
616             my_has_cached_item = false;
617             return true;
618         }
619         // we've been asked to provide an item, but we have none.  enqueue a task to
620         // provide one.
621         if ( my_active )
622             spawn_put();
623         return false;
624     }
625 
626     //! Reserves an item.
627     bool try_reserve( output_type &v ) override {
628         spin_mutex::scoped_lock lock(my_mutex);
629         if ( my_reserved ) {
630             return false;
631         }
632 
633         if ( my_has_cached_item ) {
634             v = my_cached_item;
635             my_reserved = true;
636             return true;
637         } else {
638             return false;
639         }
640     }
641 
642     //! Release a reserved item.
643     /** true = item has been released and so remains in sender, dest must request or reserve future items */
644     bool try_release( ) override {
645         spin_mutex::scoped_lock lock(my_mutex);
646         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
647         my_reserved = false;
648         if(!my_successors.empty())
649             spawn_put();
650         return true;
651     }
652 
653     //! Consumes a reserved item
654     bool try_consume( ) override {
655         spin_mutex::scoped_lock lock(my_mutex);
656         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
657         my_reserved = false;
658         my_has_cached_item = false;
659         if ( !my_successors.empty() ) {
660             spawn_put();
661         }
662         return true;
663     }
664 
665     //! Activates a node that was created in the inactive state
666     void activate() {
667         spin_mutex::scoped_lock lock(my_mutex);
668         my_active = true;
669         if (!my_successors.empty())
670             spawn_put();
671     }
672 
673     template<typename Body>
674     Body copy_function_object() {
675         input_body<output_type> &body_ref = *this->my_body;
676         return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
677     }
678 
679 protected:
680 
681     //! resets the input_node to its initial state
682     void reset_node( reset_flags f) override {
683         my_active = false;
684         my_reserved = false;
685         my_has_cached_item = false;
686 
687         if(f & rf_clear_edges) my_successors.clear();
688         if(f & rf_reset_bodies) {
689             input_body<output_type> *tmp = my_init_body->clone();
690             delete my_body;
691             my_body = tmp;
692         }
693     }
694 
695 private:
696     spin_mutex my_mutex;
697     bool my_active;
698     input_body<output_type> *my_body;
699     input_body<output_type> *my_init_body;
700     broadcast_cache< output_type > my_successors;
701     bool my_reserved;
702     bool my_has_cached_item;
703     output_type my_cached_item;
704 
705     // used by apply_body_bypass, can invoke body of node.
706     bool try_reserve_apply_body(output_type &v) {
707         spin_mutex::scoped_lock lock(my_mutex);
708         if ( my_reserved ) {
709             return false;
710         }
711         if ( !my_has_cached_item ) {
712             flow_control control;
713 
714             fgt_begin_body( my_body );
715 
716             my_cached_item = (*my_body)(control);
717             my_has_cached_item = !control.is_pipeline_stopped;
718 
719             fgt_end_body( my_body );
720         }
721         if ( my_has_cached_item ) {
722             v = my_cached_item;
723             my_reserved = true;
724             return true;
725         } else {
726             return false;
727         }
728     }
729 
730     graph_task* create_put_task() {
731         small_object_allocator allocator{};
732         typedef input_node_task_bypass< input_node<output_type> > task_type;
733         graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
734         my_graph.reserve_wait();
735         return t;
736     }
737 
738     //! Spawns a task that applies the body
739     void spawn_put( ) {
740         if(is_graph_active(this->my_graph)) {
741             spawn_in_graph_arena(this->my_graph, *create_put_task());
742         }
743     }
744 
745     friend class input_node_task_bypass< input_node<output_type> >;
746     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
747     graph_task* apply_body_bypass( ) {
748         output_type v;
749         if ( !try_reserve_apply_body(v) )
750             return nullptr;
751 
752         graph_task *last_task = my_successors.try_put_task(v);
753         if ( last_task )
754             try_consume();
755         else
756             try_release();
757         return last_task;
758     }
759 };  // class input_node
760 
761 //! Implements a function node that supports Input -> Output
762 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
763     __TBB_requires(std::default_initializable<Input> &&
764                    std::copy_constructible<Input> &&
765                    std::copy_constructible<Output>)
766 class function_node
767     : public graph_node
768     , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
769     , public function_output<Output>
770 {
771     typedef cache_aligned_allocator<Input> internals_allocator;
772 
773 public:
774     typedef Input input_type;
775     typedef Output output_type;
776     typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
777     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
778     typedef function_output<output_type> fOutput_type;
779     typedef typename input_impl_type::predecessor_type predecessor_type;
780     typedef typename fOutput_type::successor_type successor_type;
781 
782     using input_impl_type::my_predecessors;
783 
784     //! Constructor
785     // input_queue_type is allocated here, but destroyed in the function_input_base.
786     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
787     // be done in one place.  This would be an interface-breaking change.
788     template< typename Body >
789         __TBB_requires(function_node_body<Body, Input, Output>)
790      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
791                    Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
792         : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
793           fOutput_type(g) {
794         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
795                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
796     }
797 
798     template <typename Body>
799         __TBB_requires(function_node_body<Body, Input, Output>)
800     function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
801         : function_node(g, concurrency, body, Policy(), a_priority) {}
802 
803 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
804     template <typename Body, typename... Args>
805         __TBB_requires(function_node_body<Body, Input, Output>)
806     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
807                    Policy p = Policy(), node_priority_t a_priority = no_priority )
808         : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
809         make_edges_in_order(nodes, *this);
810     }
811 
812     template <typename Body, typename... Args>
813         __TBB_requires(function_node_body<Body, Input, Output>)
814     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
815         : function_node(nodes, concurrency, body, Policy(), a_priority) {}
816 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
817 
818     //! Copy constructor
819     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
820         graph_node(src.my_graph),
821         input_impl_type(src),
822         fOutput_type(src.my_graph) {
823         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
824                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
825     }
826 
827 protected:
828     template< typename R, typename B > friend class run_and_put_task;
829     template<typename X, typename Y> friend class broadcast_cache;
830     template<typename X, typename Y> friend class round_robin_cache;
831     using input_impl_type::try_put_task;
832 
833     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
834 
835     void reset_node(reset_flags f) override {
836         input_impl_type::reset_function_input(f);
837         // TODO: use clear() instead.
838         if(f & rf_clear_edges) {
839             successors().clear();
840             my_predecessors.clear();
841         }
842         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
843         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
844     }
845 
846 };  // class function_node
847 
848 //! implements a function node that supports Input -> (set of outputs)
849 // Output is a tuple of output types.
850 template<typename Input, typename Output, typename Policy = queueing>
851     __TBB_requires(std::default_initializable<Input> &&
852                    std::copy_constructible<Input>)
853 class multifunction_node :
854     public graph_node,
855     public multifunction_input
856     <
857         Input,
858         typename wrap_tuple_elements<
859             std::tuple_size<Output>::value,  // #elements in tuple
860             multifunction_output,  // wrap this around each element
861             Output // the tuple providing the types
862         >::type,
863         Policy,
864         cache_aligned_allocator<Input>
865     >
866 {
867     typedef cache_aligned_allocator<Input> internals_allocator;
868 
869 protected:
870     static const int N = std::tuple_size<Output>::value;
871 public:
872     typedef Input input_type;
873     typedef null_type output_type;
874     typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
875     typedef multifunction_input<
876         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
877     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
878 private:
879     using input_impl_type::my_predecessors;
880 public:
881     template<typename Body>
882         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
883     __TBB_NOINLINE_SYM multifunction_node(
884         graph &g, size_t concurrency,
885         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
886     ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
887         fgt_multioutput_node_with_body<N>(
888             CODEPTR(), FLOW_MULTIFUNCTION_NODE,
889             &this->my_graph, static_cast<receiver<input_type> *>(this),
890             this->output_ports(), this->my_body
891         );
892     }
893 
894     template <typename Body>
895         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
896     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
897         : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
898 
899 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
900     template <typename Body, typename... Args>
901         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
902     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
903                        Policy p = Policy(), node_priority_t a_priority = no_priority)
904         : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
905         make_edges_in_order(nodes, *this);
906     }
907 
908     template <typename Body, typename... Args>
909         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
910     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
911         : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
912 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
913 
914     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
915         graph_node(other.my_graph), input_impl_type(other) {
916         fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
917                 &this->my_graph, static_cast<receiver<input_type> *>(this),
918                 this->output_ports(), this->my_body );
919     }
920 
921     // all the guts are in multifunction_input...
922 protected:
923     void reset_node(reset_flags f) override { input_impl_type::reset(f); }
924 };  // multifunction_node
925 
926 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
927 //  successors.  The node has unlimited concurrency, so it does not reject inputs.
928 template<typename TupleType>
929 class split_node : public graph_node, public receiver<TupleType> {
930     static const int N = std::tuple_size<TupleType>::value;
931     typedef receiver<TupleType> base_type;
932 public:
933     typedef TupleType input_type;
934     typedef typename wrap_tuple_elements<
935             N,  // #elements in tuple
936             multifunction_output,  // wrap this around each element
937             TupleType // the tuple providing the types
938         >::type  output_ports_type;
939 
940     __TBB_NOINLINE_SYM explicit split_node(graph &g)
941         : graph_node(g),
942           my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
943     {
944         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
945             static_cast<receiver<input_type> *>(this), this->output_ports());
946     }
947 
948 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
949     template <typename... Args>
950     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
951         make_edges_in_order(nodes, *this);
952     }
953 #endif
954 
955     __TBB_NOINLINE_SYM split_node(const split_node& other)
956         : graph_node(other.my_graph), base_type(other),
957           my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
958     {
959         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
960             static_cast<receiver<input_type> *>(this), this->output_ports());
961     }
962 
963     output_ports_type &output_ports() { return my_output_ports; }
964 
965 protected:
966     graph_task *try_put_task(const TupleType& t) override {
967         // Sending split messages in parallel is not justified, as overheads would prevail.
968         // Also, we do not have successors here. So we just tell the task returned here is successful.
969         return emit_element<N>::emit_this(this->my_graph, t, output_ports());
970     }
971     void reset_node(reset_flags f) override {
972         if (f & rf_clear_edges)
973             clear_element<N>::clear_this(my_output_ports);
974 
975         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
976     }
977     graph& graph_reference() const override {
978         return my_graph;
979     }
980 
981 private:
982     output_ports_type my_output_ports;
983 };
984 
985 //! Implements an executable node that supports continue_msg -> Output
986 template <typename Output, typename Policy = Policy<void> >
987     __TBB_requires(std::copy_constructible<Output>)
988 class continue_node : public graph_node, public continue_input<Output, Policy>,
989                       public function_output<Output> {
990 public:
991     typedef continue_msg input_type;
992     typedef Output output_type;
993     typedef continue_input<Output, Policy> input_impl_type;
994     typedef function_output<output_type> fOutput_type;
995     typedef typename input_impl_type::predecessor_type predecessor_type;
996     typedef typename fOutput_type::successor_type successor_type;
997 
998     //! Constructor for executable node with continue_msg -> Output
999     template <typename Body >
1000         __TBB_requires(continue_node_body<Body, Output>)
1001     __TBB_NOINLINE_SYM continue_node(
1002         graph &g,
1003         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1004     ) : graph_node(g), input_impl_type( g, body, a_priority ),
1005         fOutput_type(g) {
1006         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1007 
1008                                            static_cast<receiver<input_type> *>(this),
1009                                            static_cast<sender<output_type> *>(this), this->my_body );
1010     }
1011 
1012     template <typename Body>
1013         __TBB_requires(continue_node_body<Body, Output>)
1014     continue_node( graph& g, Body body, node_priority_t a_priority )
1015         : continue_node(g, body, Policy(), a_priority) {}
1016 
1017 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1018     template <typename Body, typename... Args>
1019         __TBB_requires(continue_node_body<Body, Output>)
1020     continue_node( const node_set<Args...>& nodes, Body body,
1021                    Policy p = Policy(), node_priority_t a_priority = no_priority )
1022         : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1023         make_edges_in_order(nodes, *this);
1024     }
1025     template <typename Body, typename... Args>
1026         __TBB_requires(continue_node_body<Body, Output>)
1027     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1028         : continue_node(nodes, body, Policy(), a_priority) {}
1029 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1030 
1031     //! Constructor for executable node with continue_msg -> Output
1032     template <typename Body >
1033         __TBB_requires(continue_node_body<Body, Output>)
1034     __TBB_NOINLINE_SYM continue_node(
1035         graph &g, int number_of_predecessors,
1036         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1037     ) : graph_node(g)
1038       , input_impl_type(g, number_of_predecessors, body, a_priority),
1039         fOutput_type(g) {
1040         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1041                                            static_cast<receiver<input_type> *>(this),
1042                                            static_cast<sender<output_type> *>(this), this->my_body );
1043     }
1044 
1045     template <typename Body>
1046         __TBB_requires(continue_node_body<Body, Output>)
1047     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1048         : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1049 
1050 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1051     template <typename Body, typename... Args>
1052         __TBB_requires(continue_node_body<Body, Output>)
1053     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1054                    Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1055         : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1056         make_edges_in_order(nodes, *this);
1057     }
1058 
1059     template <typename Body, typename... Args>
1060         __TBB_requires(continue_node_body<Body, Output>)
1061     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1062                    Body body, node_priority_t a_priority )
1063         : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1064 #endif
1065 
1066     //! Copy constructor
1067     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1068         graph_node(src.my_graph), input_impl_type(src),
1069         function_output<Output>(src.my_graph) {
1070         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1071                                            static_cast<receiver<input_type> *>(this),
1072                                            static_cast<sender<output_type> *>(this), this->my_body );
1073     }
1074 
1075 protected:
1076     template< typename R, typename B > friend class run_and_put_task;
1077     template<typename X, typename Y> friend class broadcast_cache;
1078     template<typename X, typename Y> friend class round_robin_cache;
1079     using input_impl_type::try_put_task;
1080     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1081 
1082     void reset_node(reset_flags f) override {
1083         input_impl_type::reset_receiver(f);
1084         if(f & rf_clear_edges)successors().clear();
1085         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1086     }
1087 };  // continue_node
1088 
1089 //! Forwards messages of type T to all successors
1090 template <typename T>
1091 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1092 public:
1093     typedef T input_type;
1094     typedef T output_type;
1095     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1096     typedef typename sender<output_type>::successor_type successor_type;
1097 private:
1098     broadcast_cache<input_type> my_successors;
1099 public:
1100 
1101     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1102         fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1103                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1104     }
1105 
1106 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1107     template <typename... Args>
1108     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1109         make_edges_in_order(nodes, *this);
1110     }
1111 #endif
1112 
1113     // Copy constructor
1114     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1115 
1116     //! Adds a successor
1117     bool register_successor( successor_type &r ) override {
1118         my_successors.register_successor( r );
1119         return true;
1120     }
1121 
1122     //! Removes s as a successor
1123     bool remove_successor( successor_type &r ) override {
1124         my_successors.remove_successor( r );
1125         return true;
1126     }
1127 
1128 protected:
1129     template< typename R, typename B > friend class run_and_put_task;
1130     template<typename X, typename Y> friend class broadcast_cache;
1131     template<typename X, typename Y> friend class round_robin_cache;
1132     //! build a task to run the successor if possible.  Default is old behavior.
1133     graph_task *try_put_task(const T& t) override {
1134         graph_task *new_task = my_successors.try_put_task(t);
1135         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1136         return new_task;
1137     }
1138 
1139     graph& graph_reference() const override {
1140         return my_graph;
1141     }
1142 
1143     void reset_node(reset_flags f) override {
1144         if (f&rf_clear_edges) {
1145            my_successors.clear();
1146         }
1147         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1148     }
1149 };  // broadcast_node
1150 
1151 //! Forwards messages in arbitrary order
1152 template <typename T>
1153 class buffer_node
1154     : public graph_node
1155     , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1156     , public receiver<T>, public sender<T>
1157 {
1158     typedef cache_aligned_allocator<T> internals_allocator;
1159 
1160 public:
1161     typedef T input_type;
1162     typedef T output_type;
1163     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1164     typedef typename sender<output_type>::successor_type successor_type;
1165     typedef buffer_node<T> class_type;
1166 
1167 protected:
1168     typedef size_t size_type;
1169     round_robin_cache< T, null_rw_mutex > my_successors;
1170 
1171     friend class forward_task_bypass< class_type >;
1172 
1173     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1174     };
1175 
1176     // implements the aggregator_operation concept
1177     class buffer_operation : public aggregated_operation< buffer_operation > {
1178     public:
1179         char type;
1180         T* elem;
1181         graph_task* ltask;
1182         successor_type *r;
1183 
1184         buffer_operation(const T& e, op_type t) : type(char(t))
1185                                                   , elem(const_cast<T*>(&e)) , ltask(nullptr)
1186         {}
1187         buffer_operation(op_type t) : type(char(t)),  ltask(nullptr) {}
1188     };
1189 
1190     bool forwarder_busy;
1191     typedef aggregating_functor<class_type, buffer_operation> handler_type;
1192     friend class aggregating_functor<class_type, buffer_operation>;
1193     aggregator< handler_type, buffer_operation> my_aggregator;
1194 
1195     virtual void handle_operations(buffer_operation *op_list) {
1196         handle_operations_impl(op_list, this);
1197     }
1198 
1199     template<typename derived_type>
1200     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1201         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1202 
1203         buffer_operation *tmp = nullptr;
1204         bool try_forwarding = false;
1205         while (op_list) {
1206             tmp = op_list;
1207             op_list = op_list->next;
1208             switch (tmp->type) {
1209             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1210             case rem_succ: internal_rem_succ(tmp); break;
1211             case req_item: internal_pop(tmp); break;
1212             case res_item: internal_reserve(tmp); break;
1213             case rel_res:  internal_release(tmp); try_forwarding = true; break;
1214             case con_res:  internal_consume(tmp); try_forwarding = true; break;
1215             case put_item: try_forwarding = internal_push(tmp); break;
1216             case try_fwd_task: internal_forward_task(tmp); break;
1217             }
1218         }
1219 
1220         derived->order();
1221 
1222         if (try_forwarding && !forwarder_busy) {
1223             if(is_graph_active(this->my_graph)) {
1224                 forwarder_busy = true;
1225                 typedef forward_task_bypass<class_type> task_type;
1226                 small_object_allocator allocator{};
1227                 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1228                 my_graph.reserve_wait();
1229                 // tmp should point to the last item handled by the aggregator.  This is the operation
1230                 // the handling thread enqueued.  So modifying that record will be okay.
1231                 // TODO revamp: check that the issue is still present
1232                 // workaround for icc bug  (at least 12.0 and 13.0)
1233                 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
1234                 //        argument types are: (graph, graph_task *, graph_task *)
1235                 graph_task *z = tmp->ltask;
1236                 graph &g = this->my_graph;
1237                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
1238             }
1239         }
1240     }  // handle_operations
1241 
1242     inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1243         return op_data.ltask;
1244     }
1245 
1246     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1247         graph_task *ft = grab_forwarding_task(op_data);
1248         if(ft) {
1249             spawn_in_graph_arena(graph_reference(), *ft);
1250             return true;
1251         }
1252         return false;
1253     }
1254 
1255     //! This is executed by an enqueued task, the "forwarder"
1256     virtual graph_task *forward_task() {
1257         buffer_operation op_data(try_fwd_task);
1258         graph_task *last_task = nullptr;
1259         do {
1260             op_data.status = WAIT;
1261             op_data.ltask = nullptr;
1262             my_aggregator.execute(&op_data);
1263 
1264             // workaround for icc bug
1265             graph_task *xtask = op_data.ltask;
1266             graph& g = this->my_graph;
1267             last_task = combine_tasks(g, last_task, xtask);
1268         } while (op_data.status ==SUCCEEDED);
1269         return last_task;
1270     }
1271 
1272     //! Register successor
1273     virtual void internal_reg_succ(buffer_operation *op) {
1274         my_successors.register_successor(*(op->r));
1275         op->status.store(SUCCEEDED, std::memory_order_release);
1276     }
1277 
1278     //! Remove successor
1279     virtual void internal_rem_succ(buffer_operation *op) {
1280         my_successors.remove_successor(*(op->r));
1281         op->status.store(SUCCEEDED, std::memory_order_release);
1282     }
1283 
1284 private:
1285     void order() {}
1286 
1287     bool is_item_valid() {
1288         return this->my_item_valid(this->my_tail - 1);
1289     }
1290 
1291     void try_put_and_add_task(graph_task*& last_task) {
1292         graph_task *new_task = my_successors.try_put_task(this->back());
1293         if (new_task) {
1294             // workaround for icc bug
1295             graph& g = this->my_graph;
1296             last_task = combine_tasks(g, last_task, new_task);
1297             this->destroy_back();
1298         }
1299     }
1300 
1301 protected:
1302     //! Tries to forward valid items to successors
1303     virtual void internal_forward_task(buffer_operation *op) {
1304         internal_forward_task_impl(op, this);
1305     }
1306 
1307     template<typename derived_type>
1308     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1309         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1310 
1311         if (this->my_reserved || !derived->is_item_valid()) {
1312             op->status.store(FAILED, std::memory_order_release);
1313             this->forwarder_busy = false;
1314             return;
1315         }
1316         // Try forwarding, giving each successor a chance
1317         graph_task* last_task = nullptr;
1318         size_type counter = my_successors.size();
1319         for (; counter > 0 && derived->is_item_valid(); --counter)
1320             derived->try_put_and_add_task(last_task);
1321 
1322         op->ltask = last_task;  // return task
1323         if (last_task && !counter) {
1324             op->status.store(SUCCEEDED, std::memory_order_release);
1325         }
1326         else {
1327             op->status.store(FAILED, std::memory_order_release);
1328             forwarder_busy = false;
1329         }
1330     }
1331 
1332     virtual bool internal_push(buffer_operation *op) {
1333         this->push_back(*(op->elem));
1334         op->status.store(SUCCEEDED, std::memory_order_release);
1335         return true;
1336     }
1337 
1338     virtual void internal_pop(buffer_operation *op) {
1339         if(this->pop_back(*(op->elem))) {
1340             op->status.store(SUCCEEDED, std::memory_order_release);
1341         }
1342         else {
1343             op->status.store(FAILED, std::memory_order_release);
1344         }
1345     }
1346 
1347     virtual void internal_reserve(buffer_operation *op) {
1348         if(this->reserve_front(*(op->elem))) {
1349             op->status.store(SUCCEEDED, std::memory_order_release);
1350         }
1351         else {
1352             op->status.store(FAILED, std::memory_order_release);
1353         }
1354     }
1355 
1356     virtual void internal_consume(buffer_operation *op) {
1357         this->consume_front();
1358         op->status.store(SUCCEEDED, std::memory_order_release);
1359     }
1360 
1361     virtual void internal_release(buffer_operation *op) {
1362         this->release_front();
1363         op->status.store(SUCCEEDED, std::memory_order_release);
1364     }
1365 
1366 public:
1367     //! Constructor
1368     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1369         : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1370           sender<T>(), my_successors(this), forwarder_busy(false)
1371     {
1372         my_aggregator.initialize_handler(handler_type(this));
1373         fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1374                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1375     }
1376 
1377 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1378     template <typename... Args>
1379     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1380         make_edges_in_order(nodes, *this);
1381     }
1382 #endif
1383 
1384     //! Copy constructor
1385     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1386 
1387     //
1388     // message sender implementation
1389     //
1390 
1391     //! Adds a new successor.
1392     /** Adds successor r to the list of successors; may forward tasks.  */
1393     bool register_successor( successor_type &r ) override {
1394         buffer_operation op_data(reg_succ);
1395         op_data.r = &r;
1396         my_aggregator.execute(&op_data);
1397         (void)enqueue_forwarding_task(op_data);
1398         return true;
1399     }
1400 
1401     //! Removes a successor.
1402     /** Removes successor r from the list of successors.
1403         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1404     bool remove_successor( successor_type &r ) override {
1405         // TODO revamp: investigate why full qualification is necessary here
1406         tbb::detail::d1::remove_predecessor(r, *this);
1407         buffer_operation op_data(rem_succ);
1408         op_data.r = &r;
1409         my_aggregator.execute(&op_data);
1410         // even though this operation does not cause a forward, if we are the handler, and
1411         // a forward is scheduled, we may be the first to reach this point after the aggregator,
1412         // and so should check for the task.
1413         (void)enqueue_forwarding_task(op_data);
1414         return true;
1415     }
1416 
1417     //! Request an item from the buffer_node
1418     /**  true = v contains the returned item<BR>
1419          false = no item has been returned */
1420     bool try_get( T &v ) override {
1421         buffer_operation op_data(req_item);
1422         op_data.elem = &v;
1423         my_aggregator.execute(&op_data);
1424         (void)enqueue_forwarding_task(op_data);
1425         return (op_data.status==SUCCEEDED);
1426     }
1427 
1428     //! Reserves an item.
1429     /**  false = no item can be reserved<BR>
1430          true = an item is reserved */
1431     bool try_reserve( T &v ) override {
1432         buffer_operation op_data(res_item);
1433         op_data.elem = &v;
1434         my_aggregator.execute(&op_data);
1435         (void)enqueue_forwarding_task(op_data);
1436         return (op_data.status==SUCCEEDED);
1437     }
1438 
1439     //! Release a reserved item.
1440     /**  true = item has been released and so remains in sender */
1441     bool try_release() override {
1442         buffer_operation op_data(rel_res);
1443         my_aggregator.execute(&op_data);
1444         (void)enqueue_forwarding_task(op_data);
1445         return true;
1446     }
1447 
1448     //! Consumes a reserved item.
1449     /** true = item is removed from sender and reservation removed */
1450     bool try_consume() override {
1451         buffer_operation op_data(con_res);
1452         my_aggregator.execute(&op_data);
1453         (void)enqueue_forwarding_task(op_data);
1454         return true;
1455     }
1456 
1457 protected:
1458 
1459     template< typename R, typename B > friend class run_and_put_task;
1460     template<typename X, typename Y> friend class broadcast_cache;
1461     template<typename X, typename Y> friend class round_robin_cache;
1462     //! receive an item, return a task *if possible
1463     graph_task *try_put_task(const T &t) override {
1464         buffer_operation op_data(t, put_item);
1465         my_aggregator.execute(&op_data);
1466         graph_task *ft = grab_forwarding_task(op_data);
1467         // sequencer_nodes can return failure (if an item has been previously inserted)
1468         // We have to spawn the returned task if our own operation fails.
1469 
1470         if(ft && op_data.status ==FAILED) {
1471             // we haven't succeeded queueing the item, but for some reason the
1472             // call returned a task (if another request resulted in a successful
1473             // forward this could happen.)  Queue the task and reset the pointer.
1474             spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
1475         }
1476         else if(!ft && op_data.status ==SUCCEEDED) {
1477             ft = SUCCESSFULLY_ENQUEUED;
1478         }
1479         return ft;
1480     }
1481 
1482     graph& graph_reference() const override {
1483         return my_graph;
1484     }
1485 
1486 protected:
1487     void reset_node( reset_flags f) override {
1488         reservable_item_buffer<T, internals_allocator>::reset();
1489         // TODO: just clear structures
1490         if (f&rf_clear_edges) {
1491             my_successors.clear();
1492         }
1493         forwarder_busy = false;
1494     }
1495 };  // buffer_node
1496 
1497 //! Forwards messages in FIFO order
1498 template <typename T>
1499 class queue_node : public buffer_node<T> {
1500 protected:
1501     typedef buffer_node<T> base_type;
1502     typedef typename base_type::size_type size_type;
1503     typedef typename base_type::buffer_operation queue_operation;
1504     typedef queue_node class_type;
1505 
1506 private:
1507     template<typename> friend class buffer_node;
1508 
1509     bool is_item_valid() {
1510         return this->my_item_valid(this->my_head);
1511     }
1512 
1513     void try_put_and_add_task(graph_task*& last_task) {
1514         graph_task *new_task = this->my_successors.try_put_task(this->front());
1515         if (new_task) {
1516             // workaround for icc bug
1517             graph& graph_ref = this->graph_reference();
1518             last_task = combine_tasks(graph_ref, last_task, new_task);
1519             this->destroy_front();
1520         }
1521     }
1522 
1523 protected:
1524     void internal_forward_task(queue_operation *op) override {
1525         this->internal_forward_task_impl(op, this);
1526     }
1527 
1528     void internal_pop(queue_operation *op) override {
1529         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1530             op->status.store(FAILED, std::memory_order_release);
1531         }
1532         else {
1533             this->pop_front(*(op->elem));
1534             op->status.store(SUCCEEDED, std::memory_order_release);
1535         }
1536     }
1537     void internal_reserve(queue_operation *op) override {
1538         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1539             op->status.store(FAILED, std::memory_order_release);
1540         }
1541         else {
1542             this->reserve_front(*(op->elem));
1543             op->status.store(SUCCEEDED, std::memory_order_release);
1544         }
1545     }
1546     void internal_consume(queue_operation *op) override {
1547         this->consume_front();
1548         op->status.store(SUCCEEDED, std::memory_order_release);
1549     }
1550 
1551 public:
1552     typedef T input_type;
1553     typedef T output_type;
1554     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1555     typedef typename sender<output_type>::successor_type successor_type;
1556 
1557     //! Constructor
1558     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1559         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1560                                  static_cast<receiver<input_type> *>(this),
1561                                  static_cast<sender<output_type> *>(this) );
1562     }
1563 
1564 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1565     template <typename... Args>
1566     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1567         make_edges_in_order(nodes, *this);
1568     }
1569 #endif
1570 
1571     //! Copy constructor
1572     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1573         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1574                                  static_cast<receiver<input_type> *>(this),
1575                                  static_cast<sender<output_type> *>(this) );
1576     }
1577 
1578 
1579 protected:
1580     void reset_node( reset_flags f) override {
1581         base_type::reset_node(f);
1582     }
1583 };  // queue_node
1584 
1585 //! Forwards messages in sequence order
1586 template <typename T>
1587     __TBB_requires(std::copyable<T>)
1588 class sequencer_node : public queue_node<T> {
1589     function_body< T, size_t > *my_sequencer;
1590     // my_sequencer should be a benign function and must be callable
1591     // from a parallel context.  Does this mean it needn't be reset?
1592 public:
1593     typedef T input_type;
1594     typedef T output_type;
1595     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1596     typedef typename sender<output_type>::successor_type successor_type;
1597 
1598     //! Constructor
1599     template< typename Sequencer >
1600         __TBB_requires(sequencer<Sequencer, T>)
1601     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1602         my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1603         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1604                                  static_cast<receiver<input_type> *>(this),
1605                                  static_cast<sender<output_type> *>(this) );
1606     }
1607 
1608 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1609     template <typename Sequencer, typename... Args>
1610         __TBB_requires(sequencer<Sequencer, T>)
1611     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1612         : sequencer_node(nodes.graph_reference(), s) {
1613         make_edges_in_order(nodes, *this);
1614     }
1615 #endif
1616 
1617     //! Copy constructor
1618     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1619         my_sequencer( src.my_sequencer->clone() ) {
1620         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1621                                  static_cast<receiver<input_type> *>(this),
1622                                  static_cast<sender<output_type> *>(this) );
1623     }
1624 
1625     //! Destructor
1626     ~sequencer_node() { delete my_sequencer; }
1627 
1628 protected:
1629     typedef typename buffer_node<T>::size_type size_type;
1630     typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1631 
1632 private:
1633     bool internal_push(sequencer_operation *op) override {
1634         size_type tag = (*my_sequencer)(*(op->elem));
1635 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1636         if (tag < this->my_head) {
1637             // have already emitted a message with this tag
1638             op->status.store(FAILED, std::memory_order_release);
1639             return false;
1640         }
1641 #endif
1642         // cannot modify this->my_tail now; the buffer would be inconsistent.
1643         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1644 
1645         if (this->size(new_tail) > this->capacity()) {
1646             this->grow_my_array(this->size(new_tail));
1647         }
1648         this->my_tail = new_tail;
1649 
1650         const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1651         op->status.store(res, std::memory_order_release);
1652         return res ==SUCCEEDED;
1653     }
1654 };  // sequencer_node
1655 
1656 //! Forwards messages in priority order
1657 template<typename T, typename Compare = std::less<T>>
1658 class priority_queue_node : public buffer_node<T> {
1659 public:
1660     typedef T input_type;
1661     typedef T output_type;
1662     typedef buffer_node<T> base_type;
1663     typedef priority_queue_node class_type;
1664     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1665     typedef typename sender<output_type>::successor_type successor_type;
1666 
1667     //! Constructor
1668     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1669         : buffer_node<T>(g), compare(comp), mark(0) {
1670         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1671                                  static_cast<receiver<input_type> *>(this),
1672                                  static_cast<sender<output_type> *>(this) );
1673     }
1674 
1675 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1676     template <typename... Args>
1677     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1678         : priority_queue_node(nodes.graph_reference(), comp) {
1679         make_edges_in_order(nodes, *this);
1680     }
1681 #endif
1682 
1683     //! Copy constructor
1684     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1685         : buffer_node<T>(src), mark(0)
1686     {
1687         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1688                                  static_cast<receiver<input_type> *>(this),
1689                                  static_cast<sender<output_type> *>(this) );
1690     }
1691 
1692 protected:
1693 
1694     void reset_node( reset_flags f) override {
1695         mark = 0;
1696         base_type::reset_node(f);
1697     }
1698 
1699     typedef typename buffer_node<T>::size_type size_type;
1700     typedef typename buffer_node<T>::item_type item_type;
1701     typedef typename buffer_node<T>::buffer_operation prio_operation;
1702 
1703     //! Tries to forward valid items to successors
1704     void internal_forward_task(prio_operation *op) override {
1705         this->internal_forward_task_impl(op, this);
1706     }
1707 
1708     void handle_operations(prio_operation *op_list) override {
1709         this->handle_operations_impl(op_list, this);
1710     }
1711 
1712     bool internal_push(prio_operation *op) override {
1713         prio_push(*(op->elem));
1714         op->status.store(SUCCEEDED, std::memory_order_release);
1715         return true;
1716     }
1717 
1718     void internal_pop(prio_operation *op) override {
1719         // if empty or already reserved, don't pop
1720         if ( this->my_reserved == true || this->my_tail == 0 ) {
1721             op->status.store(FAILED, std::memory_order_release);
1722             return;
1723         }
1724 
1725         *(op->elem) = prio();
1726         op->status.store(SUCCEEDED, std::memory_order_release);
1727         prio_pop();
1728 
1729     }
1730 
1731     // pops the highest-priority item, saves copy
1732     void internal_reserve(prio_operation *op) override {
1733         if (this->my_reserved == true || this->my_tail == 0) {
1734             op->status.store(FAILED, std::memory_order_release);
1735             return;
1736         }
1737         this->my_reserved = true;
1738         *(op->elem) = prio();
1739         reserved_item = *(op->elem);
1740         op->status.store(SUCCEEDED, std::memory_order_release);
1741         prio_pop();
1742     }
1743 
1744     void internal_consume(prio_operation *op) override {
1745         op->status.store(SUCCEEDED, std::memory_order_release);
1746         this->my_reserved = false;
1747         reserved_item = input_type();
1748     }
1749 
1750     void internal_release(prio_operation *op) override {
1751         op->status.store(SUCCEEDED, std::memory_order_release);
1752         prio_push(reserved_item);
1753         this->my_reserved = false;
1754         reserved_item = input_type();
1755     }
1756 
1757 private:
1758     template<typename> friend class buffer_node;
1759 
1760     void order() {
1761         if (mark < this->my_tail) heapify();
1762         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
1763     }
1764 
1765     bool is_item_valid() {
1766         return this->my_tail > 0;
1767     }
1768 
1769     void try_put_and_add_task(graph_task*& last_task) {
1770         graph_task * new_task = this->my_successors.try_put_task(this->prio());
1771         if (new_task) {
1772             // workaround for icc bug
1773             graph& graph_ref = this->graph_reference();
1774             last_task = combine_tasks(graph_ref, last_task, new_task);
1775             prio_pop();
1776         }
1777     }
1778 
1779 private:
1780     Compare compare;
1781     size_type mark;
1782 
1783     input_type reserved_item;
1784 
1785     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
1786     bool prio_use_tail() {
1787         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
1788         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
1789     }
1790 
1791     // prio_push: checks that the item will fit, expand array if necessary, put at end
1792     void prio_push(const T &src) {
1793         if ( this->my_tail >= this->my_array_size )
1794             this->grow_my_array( this->my_tail + 1 );
1795         (void) this->place_item(this->my_tail, src);
1796         ++(this->my_tail);
1797         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
1798     }
1799 
1800     // prio_pop: deletes highest priority item from the array, and if it is item
1801     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
1802     // and mark.  Assumes the array has already been tested for emptiness; no failure.
1803     void prio_pop()  {
1804         if (prio_use_tail()) {
1805             // there are newly pushed elements; last one higher than top
1806             // copy the data
1807             this->destroy_item(this->my_tail-1);
1808             --(this->my_tail);
1809             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1810             return;
1811         }
1812         this->destroy_item(0);
1813         if(this->my_tail > 1) {
1814             // push the last element down heap
1815             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
1816             this->move_item(0,this->my_tail - 1);
1817         }
1818         --(this->my_tail);
1819         if(mark > this->my_tail) --mark;
1820         if (this->my_tail > 1) // don't reheap for heap of size 1
1821             reheap();
1822         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1823     }
1824 
1825     const T& prio() {
1826         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
1827     }
1828 
1829     // turn array into heap
1830     void heapify() {
1831         if(this->my_tail == 0) {
1832             mark = 0;
1833             return;
1834         }
1835         if (!mark) mark = 1;
1836         for (; mark<this->my_tail; ++mark) { // for each unheaped element
1837             size_type cur_pos = mark;
1838             input_type to_place;
1839             this->fetch_item(mark,to_place);
1840             do { // push to_place up the heap
1841                 size_type parent = (cur_pos-1)>>1;
1842                 if (!compare(this->get_my_item(parent), to_place))
1843                     break;
1844                 this->move_item(cur_pos, parent);
1845                 cur_pos = parent;
1846             } while( cur_pos );
1847             (void) this->place_item(cur_pos, to_place);
1848         }
1849     }
1850 
1851     // otherwise heapified array with new root element; rearrange to heap
1852     void reheap() {
1853         size_type cur_pos=0, child=1;
1854         while (child < mark) {
1855             size_type target = child;
1856             if (child+1<mark &&
1857                 compare(this->get_my_item(child),
1858                         this->get_my_item(child+1)))
1859                 ++target;
1860             // target now has the higher priority child
1861             if (compare(this->get_my_item(target),
1862                         this->get_my_item(cur_pos)))
1863                 break;
1864             // swap
1865             this->swap_items(cur_pos, target);
1866             cur_pos = target;
1867             child = (cur_pos<<1)+1;
1868         }
1869     }
1870 };  // priority_queue_node
1871 
1872 //! Forwards messages only if the threshold has not been reached
1873 /** This node forwards items until its threshold is reached.
1874     It contains no buffering.  If the downstream node rejects, the
1875     message is dropped. */
1876 template< typename T, typename DecrementType=continue_msg >
1877 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1878 public:
1879     typedef T input_type;
1880     typedef T output_type;
1881     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1882     typedef typename sender<output_type>::successor_type successor_type;
1883     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
1884 
1885 private:
1886     size_t my_threshold;
1887     size_t my_count; // number of successful puts
1888     size_t my_tries; // number of active put attempts
1889     size_t my_future_decrement; // number of active decrement
1890     reservable_predecessor_cache< T, spin_mutex > my_predecessors;
1891     spin_mutex my_mutex;
1892     broadcast_cache< T > my_successors;
1893 
1894     //! The internal receiver< DecrementType > that adjusts the count
1895     threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
1896 
1897     graph_task* decrement_counter( long long delta ) {
1898         if ( delta > 0 && size_t(delta) > my_threshold ) {
1899             delta = my_threshold;
1900         }
1901 
1902         {
1903             spin_mutex::scoped_lock lock(my_mutex);
1904             if ( delta > 0 && size_t(delta) > my_count ) {
1905                 if( my_tries > 0 ) {
1906                     my_future_decrement += (size_t(delta) - my_count);
1907                 }
1908                 my_count = 0;
1909             }
1910             else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
1911                 my_count = my_threshold;
1912             }
1913             else {
1914                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
1915             }
1916             __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
1917         }
1918         return forward_task();
1919     }
1920 
1921     // Let threshold_regulator call decrement_counter()
1922     friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
1923 
1924     friend class forward_task_bypass< limiter_node<T,DecrementType> >;
1925 
1926     bool check_conditions() {  // always called under lock
1927         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
1928     }
1929 
1930     // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED
1931     graph_task* forward_task() {
1932         input_type v;
1933         graph_task* rval = nullptr;
1934         bool reserved = false;
1935 
1936         {
1937             spin_mutex::scoped_lock lock(my_mutex);
1938             if ( check_conditions() )
1939                 ++my_tries;
1940             else
1941                 return nullptr;
1942         }
1943 
1944         //SUCCESS
1945         // if we can reserve and can put, we consume the reservation
1946         // we increment the count and decrement the tries
1947         if ( (my_predecessors.try_reserve(v)) == true ) {
1948             reserved = true;
1949             if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
1950                 {
1951                     spin_mutex::scoped_lock lock(my_mutex);
1952                     ++my_count;
1953                     if ( my_future_decrement ) {
1954                         if ( my_count > my_future_decrement ) {
1955                             my_count -= my_future_decrement;
1956                             my_future_decrement = 0;
1957                         }
1958                         else {
1959                             my_future_decrement -= my_count;
1960                             my_count = 0;
1961                         }
1962                     }
1963                     --my_tries;
1964                     my_predecessors.try_consume();
1965                     if ( check_conditions() ) {
1966                         if ( is_graph_active(this->my_graph) ) {
1967                             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1968                             small_object_allocator allocator{};
1969                             graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
1970                             my_graph.reserve_wait();
1971                             spawn_in_graph_arena(graph_reference(), *rtask);
1972                         }
1973                     }
1974                 }
1975                 return rval;
1976             }
1977         }
1978         //FAILURE
1979         //if we can't reserve, we decrement the tries
1980         //if we can reserve but can't put, we decrement the tries and release the reservation
1981         {
1982             spin_mutex::scoped_lock lock(my_mutex);
1983             --my_tries;
1984             if (reserved) my_predecessors.try_release();
1985             if ( check_conditions() ) {
1986                 if ( is_graph_active(this->my_graph) ) {
1987                     small_object_allocator allocator{};
1988                     typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1989                     graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1990                     my_graph.reserve_wait();
1991                     __TBB_ASSERT(!rval, "Have two tasks to handle");
1992                     return t;
1993                 }
1994             }
1995             return rval;
1996         }
1997     }
1998 
1999     void initialize() {
2000         fgt_node(
2001             CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
2002             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2003             static_cast<sender<output_type> *>(this)
2004         );
2005     }
2006 
2007 public:
2008     //! Constructor
2009     limiter_node(graph &g, size_t threshold)
2010         : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
2011         my_predecessors(this), my_successors(this), decrement(this)
2012     {
2013         initialize();
2014     }
2015 
2016 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2017     template <typename... Args>
2018     limiter_node(const node_set<Args...>& nodes, size_t threshold)
2019         : limiter_node(nodes.graph_reference(), threshold) {
2020         make_edges_in_order(nodes, *this);
2021     }
2022 #endif
2023 
2024     //! Copy constructor
2025     limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2026 
2027     //! The interface for accessing internal receiver< DecrementType > that adjusts the count
2028     receiver<DecrementType>& decrementer() { return decrement; }
2029 
2030     //! Replace the current successor with this new successor
2031     bool register_successor( successor_type &r ) override {
2032         spin_mutex::scoped_lock lock(my_mutex);
2033         bool was_empty = my_successors.empty();
2034         my_successors.register_successor(r);
2035         //spawn a forward task if this is the only successor
2036         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2037             if ( is_graph_active(this->my_graph) ) {
2038                 small_object_allocator allocator{};
2039                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2040                 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2041                 my_graph.reserve_wait();
2042                 spawn_in_graph_arena(graph_reference(), *t);
2043             }
2044         }
2045         return true;
2046     }
2047 
2048     //! Removes a successor from this node
2049     /** r.remove_predecessor(*this) is also called. */
2050     bool remove_successor( successor_type &r ) override {
2051         // TODO revamp: investigate why qualification is needed for remove_predecessor() call
2052         tbb::detail::d1::remove_predecessor(r, *this);
2053         my_successors.remove_successor(r);
2054         return true;
2055     }
2056 
2057     //! Adds src to the list of cached predecessors.
2058     bool register_predecessor( predecessor_type &src ) override {
2059         spin_mutex::scoped_lock lock(my_mutex);
2060         my_predecessors.add( src );
2061         if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2062             small_object_allocator allocator{};
2063             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2064             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2065             my_graph.reserve_wait();
2066             spawn_in_graph_arena(graph_reference(), *t);
2067         }
2068         return true;
2069     }
2070 
2071     //! Removes src from the list of cached predecessors.
2072     bool remove_predecessor( predecessor_type &src ) override {
2073         my_predecessors.remove( src );
2074         return true;
2075     }
2076 
2077 protected:
2078 
2079     template< typename R, typename B > friend class run_and_put_task;
2080     template<typename X, typename Y> friend class broadcast_cache;
2081     template<typename X, typename Y> friend class round_robin_cache;
2082     //! Puts an item to this receiver
2083     graph_task* try_put_task( const T &t ) override {
2084         {
2085             spin_mutex::scoped_lock lock(my_mutex);
2086             if ( my_count + my_tries >= my_threshold )
2087                 return nullptr;
2088             else
2089                 ++my_tries;
2090         }
2091 
2092         graph_task* rtask = my_successors.try_put_task(t);
2093         if ( !rtask ) {  // try_put_task failed.
2094             spin_mutex::scoped_lock lock(my_mutex);
2095             --my_tries;
2096             if (check_conditions() && is_graph_active(this->my_graph)) {
2097                 small_object_allocator allocator{};
2098                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2099                 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2100                 my_graph.reserve_wait();
2101             }
2102         }
2103         else {
2104             spin_mutex::scoped_lock lock(my_mutex);
2105             ++my_count;
2106             if ( my_future_decrement ) {
2107                 if ( my_count > my_future_decrement ) {
2108                     my_count -= my_future_decrement;
2109                     my_future_decrement = 0;
2110                 }
2111                 else {
2112                     my_future_decrement -= my_count;
2113                     my_count = 0;
2114                 }
2115             }
2116             --my_tries;
2117         }
2118         return rtask;
2119     }
2120 
2121     graph& graph_reference() const override { return my_graph; }
2122 
2123     void reset_node( reset_flags f ) override {
2124         my_count = 0;
2125         if ( f & rf_clear_edges ) {
2126             my_predecessors.clear();
2127             my_successors.clear();
2128         }
2129         else {
2130             my_predecessors.reset();
2131         }
2132         decrement.reset_receiver(f);
2133     }
2134 };  // limiter_node
2135 
2136 #include "detail/_flow_graph_join_impl.h"
2137 
2138 template<typename OutputTuple, typename JP=queueing> class join_node;
2139 
2140 template<typename OutputTuple>
2141 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2142 private:
2143     static const int N = std::tuple_size<OutputTuple>::value;
2144     typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2145 public:
2146     typedef OutputTuple output_type;
2147     typedef typename unfolded_type::input_ports_type input_ports_type;
2148      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2149         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2150                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2151     }
2152 
2153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2154     template <typename... Args>
2155     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2156         make_edges_in_order(nodes, *this);
2157     }
2158 #endif
2159 
2160     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2161         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2162                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2163     }
2164 
2165 };
2166 
2167 template<typename OutputTuple>
2168 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2169 private:
2170     static const int N = std::tuple_size<OutputTuple>::value;
2171     typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2172 public:
2173     typedef OutputTuple output_type;
2174     typedef typename unfolded_type::input_ports_type input_ports_type;
2175      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2176         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2177                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2178     }
2179 
2180 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2181     template <typename... Args>
2182     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2183         make_edges_in_order(nodes, *this);
2184     }
2185 #endif
2186 
2187     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2188         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2189                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2190     }
2191 
2192 };
2193 
2194 #if __TBB_CPP20_CONCEPTS_PRESENT
2195 // Helper function which is well-formed only if all of the elements in OutputTuple
2196 // satisfies join_node_function_object<body[i], tuple[i], K>
2197 template <typename OutputTuple, typename K,
2198           typename... Functions, std::size_t... Idx>
2199 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2200     requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2201              (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2202 
2203 template <typename OutputTuple, typename K, typename... Functions>
2204 concept join_node_functions = requires {
2205     join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2206 };
2207 
2208 #endif
2209 
2210 // template for key_matching join_node
2211 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2212 template<typename OutputTuple, typename K, typename KHash>
2213 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2214       key_matching_port, OutputTuple, key_matching<K,KHash> > {
2215 private:
2216     static const int N = std::tuple_size<OutputTuple>::value;
2217     typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2218 public:
2219     typedef OutputTuple output_type;
2220     typedef typename unfolded_type::input_ports_type input_ports_type;
2221 
2222 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2223     join_node(graph &g) : unfolded_type(g) {}
2224 #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2225 
2226     template<typename __TBB_B0, typename __TBB_B1>
2227         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2228      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2229         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2230                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2231     }
2232     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2233         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2234      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2235         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2236                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2237     }
2238     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2239         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2240      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2241         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2242                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2243     }
2244     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2245         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2246      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2247             unfolded_type(g, b0, b1, b2, b3, b4) {
2248         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2249                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2250     }
2251 #if __TBB_VARIADIC_MAX >= 6
2252     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2253         typename __TBB_B5>
2254         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2255      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2256             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2257         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2258                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2259     }
2260 #endif
2261 #if __TBB_VARIADIC_MAX >= 7
2262     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2263         typename __TBB_B5, typename __TBB_B6>
2264         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2265      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2266             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2267         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2268                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2269     }
2270 #endif
2271 #if __TBB_VARIADIC_MAX >= 8
2272     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2273         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2274         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2275      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2276             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2277         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2278                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2279     }
2280 #endif
2281 #if __TBB_VARIADIC_MAX >= 9
2282     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2283         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2284         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2285      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2286             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2287         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2288                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2289     }
2290 #endif
2291 #if __TBB_VARIADIC_MAX >= 10
2292     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2293         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2294         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2295      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2296             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2297         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2298                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2299     }
2300 #endif
2301 
2302 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2303     template <
2304 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2305         // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2306         template<typename...> class node_set,
2307 #endif
2308         typename... Args, typename... Bodies
2309     >
2310     __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
2311     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2312         : join_node(nodes.graph_reference(), bodies...) {
2313         make_edges_in_order(nodes, *this);
2314     }
2315 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2316 
2317     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2318         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2319                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2320     }
2321 
2322 };
2323 
2324 // indexer node
2325 #include "detail/_flow_graph_indexer_impl.h"
2326 
2327 // TODO: Implement interface with variadic template or tuple
2328 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2329                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
2330                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2331 
2332 //indexer node specializations
2333 template<typename T0>
2334 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2335 private:
2336     static const int N = 1;
2337 public:
2338     typedef std::tuple<T0> InputTuple;
2339     typedef tagged_msg<size_t, T0> output_type;
2340     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2341     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2342         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2343                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2344     }
2345 
2346 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2347     template <typename... Args>
2348     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2349         make_edges_in_order(nodes, *this);
2350     }
2351 #endif
2352 
2353     // Copy constructor
2354     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2355         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2356                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2357     }
2358 };
2359 
2360 template<typename T0, typename T1>
2361 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2362 private:
2363     static const int N = 2;
2364 public:
2365     typedef std::tuple<T0, T1> InputTuple;
2366     typedef tagged_msg<size_t, T0, T1> output_type;
2367     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2368     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2369         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2370                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2371     }
2372 
2373 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2374     template <typename... Args>
2375     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2376         make_edges_in_order(nodes, *this);
2377     }
2378 #endif
2379 
2380     // Copy constructor
2381     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2382         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2383                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2384     }
2385 
2386 };
2387 
2388 template<typename T0, typename T1, typename T2>
2389 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2390 private:
2391     static const int N = 3;
2392 public:
2393     typedef std::tuple<T0, T1, T2> InputTuple;
2394     typedef tagged_msg<size_t, T0, T1, T2> output_type;
2395     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2396     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2397         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2398                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2399     }
2400 
2401 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2402     template <typename... Args>
2403     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2404         make_edges_in_order(nodes, *this);
2405     }
2406 #endif
2407 
2408     // Copy constructor
2409     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2410         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2411                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2412     }
2413 
2414 };
2415 
2416 template<typename T0, typename T1, typename T2, typename T3>
2417 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2418 private:
2419     static const int N = 4;
2420 public:
2421     typedef std::tuple<T0, T1, T2, T3> InputTuple;
2422     typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2423     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2424     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2425         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2426                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2427     }
2428 
2429 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2430     template <typename... Args>
2431     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2432         make_edges_in_order(nodes, *this);
2433     }
2434 #endif
2435 
2436     // Copy constructor
2437     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2438         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2439                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2440     }
2441 
2442 };
2443 
2444 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2445 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2446 private:
2447     static const int N = 5;
2448 public:
2449     typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2450     typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2451     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2452     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2453         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2454                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2455     }
2456 
2457 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2458     template <typename... Args>
2459     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2460         make_edges_in_order(nodes, *this);
2461     }
2462 #endif
2463 
2464     // Copy constructor
2465     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2466         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2467                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2468     }
2469 
2470 };
2471 
2472 #if __TBB_VARIADIC_MAX >= 6
2473 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2474 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2475 private:
2476     static const int N = 6;
2477 public:
2478     typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2479     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2480     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2481     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2482         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2483                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2484     }
2485 
2486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2487     template <typename... Args>
2488     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2489         make_edges_in_order(nodes, *this);
2490     }
2491 #endif
2492 
2493     // Copy constructor
2494     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2495         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2496                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2497     }
2498 
2499 };
2500 #endif //variadic max 6
2501 
2502 #if __TBB_VARIADIC_MAX >= 7
2503 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2504          typename T6>
2505 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2506 private:
2507     static const int N = 7;
2508 public:
2509     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2510     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2511     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2512     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2513         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2514                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2515     }
2516 
2517 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2518     template <typename... Args>
2519     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2520         make_edges_in_order(nodes, *this);
2521     }
2522 #endif
2523 
2524     // Copy constructor
2525     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2526         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2527                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2528     }
2529 
2530 };
2531 #endif //variadic max 7
2532 
2533 #if __TBB_VARIADIC_MAX >= 8
2534 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2535          typename T6, typename T7>
2536 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2537 private:
2538     static const int N = 8;
2539 public:
2540     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2541     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2542     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2543     indexer_node(graph& g) : unfolded_type(g) {
2544         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2545                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2546     }
2547 
2548 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2549     template <typename... Args>
2550     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2551         make_edges_in_order(nodes, *this);
2552     }
2553 #endif
2554 
2555     // Copy constructor
2556     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2557         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2558                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2559     }
2560 
2561 };
2562 #endif //variadic max 8
2563 
2564 #if __TBB_VARIADIC_MAX >= 9
2565 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2566          typename T6, typename T7, typename T8>
2567 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2568 private:
2569     static const int N = 9;
2570 public:
2571     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2572     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2573     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2574     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2575         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2576                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2577     }
2578 
2579 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2580     template <typename... Args>
2581     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2582         make_edges_in_order(nodes, *this);
2583     }
2584 #endif
2585 
2586     // Copy constructor
2587     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2588         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2589                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2590     }
2591 
2592 };
2593 #endif //variadic max 9
2594 
2595 #if __TBB_VARIADIC_MAX >= 10
2596 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2597          typename T6, typename T7, typename T8, typename T9>
2598 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2599 private:
2600     static const int N = 10;
2601 public:
2602     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2603     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2604     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2605     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2606         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2607                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2608     }
2609 
2610 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2611     template <typename... Args>
2612     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2613         make_edges_in_order(nodes, *this);
2614     }
2615 #endif
2616 
2617     // Copy constructor
2618     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2619         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2620                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2621     }
2622 
2623 };
2624 #endif //variadic max 10
2625 
2626 template< typename T >
2627 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2628     register_successor(p, s);
2629     fgt_make_edge( &p, &s );
2630 }
2631 
2632 //! Makes an edge between a single predecessor and a single successor
2633 template< typename T >
2634 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2635     internal_make_edge( p, s );
2636 }
2637 
2638 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
2639 template< typename T, typename V,
2640           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2641 inline void make_edge( T& output, V& input) {
2642     make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2643 }
2644 
2645 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
2646 template< typename T, typename R,
2647           typename = typename T::output_ports_type >
2648 inline void make_edge( T& output, receiver<R>& input) {
2649      make_edge(std::get<0>(output.output_ports()), input);
2650 }
2651 
2652 //Makes an edge from a sender to port 0 of a multi-input successor.
2653 template< typename S,  typename V,
2654           typename = typename V::input_ports_type >
2655 inline void make_edge( sender<S>& output, V& input) {
2656      make_edge(output, std::get<0>(input.input_ports()));
2657 }
2658 
2659 template< typename T >
2660 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2661     remove_successor( p, s );
2662     fgt_remove_edge( &p, &s );
2663 }
2664 
2665 //! Removes an edge between a single predecessor and a single successor
2666 template< typename T >
2667 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2668     internal_remove_edge( p, s );
2669 }
2670 
2671 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
2672 template< typename T, typename V,
2673           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2674 inline void remove_edge( T& output, V& input) {
2675     remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2676 }
2677 
2678 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
2679 template< typename T, typename R,
2680           typename = typename T::output_ports_type >
2681 inline void remove_edge( T& output, receiver<R>& input) {
2682      remove_edge(std::get<0>(output.output_ports()), input);
2683 }
2684 //Removes an edge between a sender and port 0 of a multi-input successor.
2685 template< typename S,  typename V,
2686           typename = typename V::input_ports_type >
2687 inline void remove_edge( sender<S>& output, V& input) {
2688      remove_edge(output, std::get<0>(input.input_ports()));
2689 }
2690 
2691 //! Returns a copy of the body from a function or continue node
2692 template< typename Body, typename Node >
2693 Body copy_body( Node &n ) {
2694     return n.template copy_function_object<Body>();
2695 }
2696 
2697 //composite_node
2698 template< typename InputTuple, typename OutputTuple > class composite_node;
2699 
2700 template< typename... InputTypes, typename... OutputTypes>
2701 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2702 
2703 public:
2704     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2705     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2706 
2707 private:
2708     std::unique_ptr<input_ports_type> my_input_ports;
2709     std::unique_ptr<output_ports_type> my_output_ports;
2710 
2711     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2712     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2713 
2714 protected:
2715     void reset_node(reset_flags) override {}
2716 
2717 public:
2718     composite_node( graph &g ) : graph_node(g) {
2719         fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2720     }
2721 
2722     template<typename T1, typename T2>
2723     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2724         static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2725         static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2726 
2727         fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2728         fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2729 
2730         my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
2731         my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
2732     }
2733 
2734     template< typename... NodeTypes >
2735     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2736 
2737     template< typename... NodeTypes >
2738     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2739 
2740 
2741     input_ports_type& input_ports() {
2742          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2743          return *my_input_ports;
2744     }
2745 
2746     output_ports_type& output_ports() {
2747          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2748          return *my_output_ports;
2749     }
2750 };  // class composite_node
2751 
2752 //composite_node with only input ports
2753 template< typename... InputTypes>
2754 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
2755 public:
2756     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2757 
2758 private:
2759     std::unique_ptr<input_ports_type> my_input_ports;
2760     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2761 
2762 protected:
2763     void reset_node(reset_flags) override {}
2764 
2765 public:
2766     composite_node( graph &g ) : graph_node(g) {
2767         fgt_composite( CODEPTR(), this, &g );
2768     }
2769 
2770    template<typename T>
2771    void set_external_ports(T&& input_ports_tuple) {
2772        static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2773 
2774        fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2775 
2776        my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
2777    }
2778 
2779     template< typename... NodeTypes >
2780     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2781 
2782     template< typename... NodeTypes >
2783     void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2784 
2785 
2786     input_ports_type& input_ports() {
2787          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2788          return *my_input_ports;
2789     }
2790 
2791 };  // class composite_node
2792 
2793 //composite_nodes with only output_ports
2794 template<typename... OutputTypes>
2795 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
2796 public:
2797     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2798 
2799 private:
2800     std::unique_ptr<output_ports_type> my_output_ports;
2801     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2802 
2803 protected:
2804     void reset_node(reset_flags) override {}
2805 
2806 public:
2807     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
2808         fgt_composite( CODEPTR(), this, &g );
2809     }
2810 
2811    template<typename T>
2812    void set_external_ports(T&& output_ports_tuple) {
2813        static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2814 
2815        fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2816 
2817        my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
2818    }
2819 
2820     template<typename... NodeTypes >
2821     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2822 
2823     template<typename... NodeTypes >
2824     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2825 
2826 
2827     output_ports_type& output_ports() {
2828          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2829          return *my_output_ports;
2830     }
2831 
2832 };  // class composite_node
2833 
2834 template<typename Gateway>
2835 class async_body_base: no_assign {
2836 public:
2837     typedef Gateway gateway_type;
2838 
2839     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
2840     void set_gateway(gateway_type *gateway) {
2841         my_gateway = gateway;
2842     }
2843 
2844 protected:
2845     gateway_type *my_gateway;
2846 };
2847 
2848 template<typename Input, typename Ports, typename Gateway, typename Body>
2849 class async_body: public async_body_base<Gateway> {
2850 private:
2851     Body my_body;
2852 
2853 public:
2854     typedef async_body_base<Gateway> base_type;
2855     typedef Gateway gateway_type;
2856 
2857     async_body(const Body &body, gateway_type *gateway)
2858         : base_type(gateway), my_body(body) { }
2859 
2860     void operator()( const Input &v, Ports & ) noexcept(noexcept(my_body(v, std::declval<gateway_type&>()))) {
2861         my_body(v, *this->my_gateway);
2862     }
2863 
2864     Body get_body() { return my_body; }
2865 };
2866 
2867 //! Implements async node
2868 template < typename Input, typename Output,
2869            typename Policy = queueing_lightweight >
2870     __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
2871 class async_node
2872     : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
2873 {
2874     typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
2875     typedef multifunction_input<
2876         Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
2877 
2878 public:
2879     typedef Input input_type;
2880     typedef Output output_type;
2881     typedef receiver<input_type> receiver_type;
2882     typedef receiver<output_type> successor_type;
2883     typedef sender<input_type> predecessor_type;
2884     typedef receiver_gateway<output_type> gateway_type;
2885     typedef async_body_base<gateway_type> async_body_base_type;
2886     typedef typename base_type::output_ports_type output_ports_type;
2887 
2888 private:
2889     class receiver_gateway_impl: public receiver_gateway<Output> {
2890     public:
2891         receiver_gateway_impl(async_node* node): my_node(node) {}
2892         void reserve_wait() override {
2893             fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
2894             my_node->my_graph.reserve_wait();
2895         }
2896 
2897         void release_wait() override {
2898             async_node* n = my_node;
2899             graph* g = &n->my_graph;
2900             g->release_wait();
2901             fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
2902         }
2903 
2904         //! Implements gateway_type::try_put for an external activity to submit a message to FG
2905         bool try_put(const Output &i) override {
2906             return my_node->try_put_impl(i);
2907         }
2908 
2909     private:
2910         async_node* my_node;
2911     } my_gateway;
2912 
2913     //The substitute of 'this' for member construction, to prevent compiler warnings
2914     async_node* self() { return this; }
2915 
2916     //! Implements gateway_type::try_put for an external activity to submit a message to FG
2917     bool try_put_impl(const Output &i) {
2918         multifunction_output<Output> &port_0 = output_port<0>(*this);
2919         broadcast_cache<output_type>& port_successors = port_0.successors();
2920         fgt_async_try_put_begin(this, &port_0);
2921         // TODO revamp: change to std::list<graph_task*>
2922         graph_task_list tasks;
2923         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
2924         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
2925                       "Return status is inconsistent with the method operation." );
2926 
2927         while( !tasks.empty() ) {
2928             enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
2929         }
2930         fgt_async_try_put_end(this, &port_0);
2931         return is_at_least_one_put_successful;
2932     }
2933 
2934 public:
2935     template<typename Body>
2936         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2937     __TBB_NOINLINE_SYM async_node(
2938         graph &g, size_t concurrency,
2939         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
2940     ) : base_type(
2941         g, concurrency,
2942         async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
2943         (body, &my_gateway), a_priority ), my_gateway(self()) {
2944         fgt_multioutput_node_with_body<1>(
2945             CODEPTR(), FLOW_ASYNC_NODE,
2946             &this->my_graph, static_cast<receiver<input_type> *>(this),
2947             this->output_ports(), this->my_body
2948         );
2949     }
2950 
2951     template <typename Body>
2952         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2953     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2954         : async_node(g, concurrency, body, Policy(), a_priority) {}
2955 
2956 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2957     template <typename Body, typename... Args>
2958         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2959     __TBB_NOINLINE_SYM async_node(
2960         const node_set<Args...>& nodes, size_t concurrency, Body body,
2961         Policy = Policy(), node_priority_t a_priority = no_priority )
2962         : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
2963         make_edges_in_order(nodes, *this);
2964     }
2965 
2966     template <typename Body, typename... Args>
2967         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2968     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2969         : async_node(nodes, concurrency, body, Policy(), a_priority) {}
2970 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2971 
2972     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
2973         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
2974         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
2975 
2976         fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
2977                 &this->my_graph, static_cast<receiver<input_type> *>(this),
2978                 this->output_ports(), this->my_body );
2979     }
2980 
2981     gateway_type& gateway() {
2982         return my_gateway;
2983     }
2984 
2985     // Define sender< Output >
2986 
2987     //! Add a new successor to this node
2988     bool register_successor(successor_type&) override {
2989         __TBB_ASSERT(false, "Successors must be registered only via ports");
2990         return false;
2991     }
2992 
2993     //! Removes a successor from this node
2994     bool remove_successor(successor_type&) override {
2995         __TBB_ASSERT(false, "Successors must be removed only via ports");
2996         return false;
2997     }
2998 
2999     template<typename Body>
3000     Body copy_function_object() {
3001         typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3002         typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3003         mfn_body_type &body_ref = *this->my_body;
3004         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3005         return ab.get_body();
3006     }
3007 
3008 protected:
3009 
3010     void reset_node( reset_flags f) override {
3011        base_type::reset_node(f);
3012     }
3013 };
3014 
3015 #include "detail/_flow_graph_node_set_impl.h"
3016 
3017 template< typename T >
3018 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3019 public:
3020     typedef T input_type;
3021     typedef T output_type;
3022     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3023     typedef typename sender<output_type>::successor_type successor_type;
3024 
3025     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
3026         : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3027     {
3028         fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3029                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3030     }
3031 
3032 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3033     template <typename... Args>
3034     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3035         make_edges_in_order(nodes, *this);
3036     }
3037 #endif
3038 
3039     //! Copy constructor; doesn't take anything from src; default won't work
3040     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3041 
3042     ~overwrite_node() {}
3043 
3044     bool register_successor( successor_type &s ) override {
3045         spin_mutex::scoped_lock l( my_mutex );
3046         if (my_buffer_is_valid && is_graph_active( my_graph )) {
3047             // We have a valid value that must be forwarded immediately.
3048             bool ret = s.try_put( my_buffer );
3049             if ( ret ) {
3050                 // We add the successor that accepted our put
3051                 my_successors.register_successor( s );
3052             } else {
3053                 // In case of reservation a race between the moment of reservation and register_successor can appear,
3054                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3055                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3056                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3057                 small_object_allocator allocator{};
3058                 typedef register_predecessor_task task_type;
3059                 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3060                 graph_reference().reserve_wait();
3061                 spawn_in_graph_arena( my_graph, *t );
3062             }
3063         } else {
3064             // No valid value yet, just add as successor
3065             my_successors.register_successor( s );
3066         }
3067         return true;
3068     }
3069 
3070     bool remove_successor( successor_type &s ) override {
3071         spin_mutex::scoped_lock l( my_mutex );
3072         my_successors.remove_successor(s);
3073         return true;
3074     }
3075 
3076     bool try_get( input_type &v ) override {
3077         spin_mutex::scoped_lock l( my_mutex );
3078         if ( my_buffer_is_valid ) {
3079             v = my_buffer;
3080             return true;
3081         }
3082         return false;
3083     }
3084 
3085     //! Reserves an item
3086     bool try_reserve( T &v ) override {
3087         return try_get(v);
3088     }
3089 
3090     //! Releases the reserved item
3091     bool try_release() override { return true; }
3092 
3093     //! Consumes the reserved item
3094     bool try_consume() override { return true; }
3095 
3096     bool is_valid() {
3097        spin_mutex::scoped_lock l( my_mutex );
3098        return my_buffer_is_valid;
3099     }
3100 
3101     void clear() {
3102        spin_mutex::scoped_lock l( my_mutex );
3103        my_buffer_is_valid = false;
3104     }
3105 
3106 protected:
3107 
3108     template< typename R, typename B > friend class run_and_put_task;
3109     template<typename X, typename Y> friend class broadcast_cache;
3110     template<typename X, typename Y> friend class round_robin_cache;
3111     graph_task* try_put_task( const input_type &v ) override {
3112         spin_mutex::scoped_lock l( my_mutex );
3113         return try_put_task_impl(v);
3114     }
3115 
3116     graph_task * try_put_task_impl(const input_type &v) {
3117         my_buffer = v;
3118         my_buffer_is_valid = true;
3119         graph_task* rtask = my_successors.try_put_task(v);
3120         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3121         return rtask;
3122     }
3123 
3124     graph& graph_reference() const override {
3125         return my_graph;
3126     }
3127 
3128     //! Breaks an infinite loop between the node reservation and register_successor call
3129     struct register_predecessor_task : public graph_task {
3130         register_predecessor_task(
3131             graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3132             : graph_task(g, allocator), o(owner), s(succ) {};
3133 
3134         task* execute(execution_data& ed) override {
3135             // TODO revamp: investigate why qualification is needed for register_successor() call
3136             using tbb::detail::d1::register_predecessor;
3137             using tbb::detail::d1::register_successor;
3138             if ( !register_predecessor(s, o) ) {
3139                 register_successor(o, s);
3140             }
3141             finalize<register_predecessor_task>(ed);
3142             return nullptr;
3143         }
3144 
3145         task* cancel(execution_data& ed) override {
3146             finalize<register_predecessor_task>(ed);
3147             return nullptr;
3148         }
3149 
3150         predecessor_type& o;
3151         successor_type& s;
3152     };
3153 
3154     spin_mutex my_mutex;
3155     broadcast_cache< input_type, null_rw_mutex > my_successors;
3156     input_type my_buffer;
3157     bool my_buffer_is_valid;
3158 
3159     void reset_node( reset_flags f) override {
3160         my_buffer_is_valid = false;
3161        if (f&rf_clear_edges) {
3162            my_successors.clear();
3163        }
3164     }
3165 };  // overwrite_node
3166 
3167 template< typename T >
3168 class write_once_node : public overwrite_node<T> {
3169 public:
3170     typedef T input_type;
3171     typedef T output_type;
3172     typedef overwrite_node<T> base_type;
3173     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3174     typedef typename sender<output_type>::successor_type successor_type;
3175 
3176     //! Constructor
3177     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3178         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3179                                  static_cast<receiver<input_type> *>(this),
3180                                  static_cast<sender<output_type> *>(this) );
3181     }
3182 
3183 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3184     template <typename... Args>
3185     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3186         make_edges_in_order(nodes, *this);
3187     }
3188 #endif
3189 
3190     //! Copy constructor: call base class copy constructor
3191     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3192         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3193                                  static_cast<receiver<input_type> *>(this),
3194                                  static_cast<sender<output_type> *>(this) );
3195     }
3196 
3197 protected:
3198     template< typename R, typename B > friend class run_and_put_task;
3199     template<typename X, typename Y> friend class broadcast_cache;
3200     template<typename X, typename Y> friend class round_robin_cache;
3201     graph_task *try_put_task( const T &v ) override {
3202         spin_mutex::scoped_lock l( this->my_mutex );
3203         return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
3204     }
3205 }; // write_once_node
3206 
3207 inline void set_name(const graph& g, const char *name) {
3208     fgt_graph_desc(&g, name);
3209 }
3210 
3211 template <typename Output>
3212 inline void set_name(const input_node<Output>& node, const char *name) {
3213     fgt_node_desc(&node, name);
3214 }
3215 
3216 template <typename Input, typename Output, typename Policy>
3217 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3218     fgt_node_desc(&node, name);
3219 }
3220 
3221 template <typename Output, typename Policy>
3222 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3223     fgt_node_desc(&node, name);
3224 }
3225 
3226 template <typename T>
3227 inline void set_name(const broadcast_node<T>& node, const char *name) {
3228     fgt_node_desc(&node, name);
3229 }
3230 
3231 template <typename T>
3232 inline void set_name(const buffer_node<T>& node, const char *name) {
3233     fgt_node_desc(&node, name);
3234 }
3235 
3236 template <typename T>
3237 inline void set_name(const queue_node<T>& node, const char *name) {
3238     fgt_node_desc(&node, name);
3239 }
3240 
3241 template <typename T>
3242 inline void set_name(const sequencer_node<T>& node, const char *name) {
3243     fgt_node_desc(&node, name);
3244 }
3245 
3246 template <typename T, typename Compare>
3247 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3248     fgt_node_desc(&node, name);
3249 }
3250 
3251 template <typename T, typename DecrementType>
3252 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3253     fgt_node_desc(&node, name);
3254 }
3255 
3256 template <typename OutputTuple, typename JP>
3257 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3258     fgt_node_desc(&node, name);
3259 }
3260 
3261 template <typename... Types>
3262 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3263     fgt_node_desc(&node, name);
3264 }
3265 
3266 template <typename T>
3267 inline void set_name(const overwrite_node<T>& node, const char *name) {
3268     fgt_node_desc(&node, name);
3269 }
3270 
3271 template <typename T>
3272 inline void set_name(const write_once_node<T>& node, const char *name) {
3273     fgt_node_desc(&node, name);
3274 }
3275 
3276 template<typename Input, typename Output, typename Policy>
3277 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3278     fgt_multioutput_node_desc(&node, name);
3279 }
3280 
3281 template<typename TupleType>
3282 inline void set_name(const split_node<TupleType>& node, const char *name) {
3283     fgt_multioutput_node_desc(&node, name);
3284 }
3285 
3286 template< typename InputTuple, typename OutputTuple >
3287 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3288     fgt_multiinput_multioutput_node_desc(&node, name);
3289 }
3290 
3291 template<typename Input, typename Output, typename Policy>
3292 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3293 {
3294     fgt_multioutput_node_desc(&node, name);
3295 }
3296 } // d1
3297 } // detail
3298 } // tbb
3299 
3300 
3301 // Include deduction guides for node classes
3302 #include "detail/_flow_graph_nodes_deduction.h"
3303 
3304 namespace tbb {
3305 namespace flow {
3306 inline namespace v1 {
3307     using detail::d1::receiver;
3308     using detail::d1::sender;
3309 
3310     using detail::d1::serial;
3311     using detail::d1::unlimited;
3312 
3313     using detail::d1::reset_flags;
3314     using detail::d1::rf_reset_protocol;
3315     using detail::d1::rf_reset_bodies;
3316     using detail::d1::rf_clear_edges;
3317 
3318     using detail::d1::graph;
3319     using detail::d1::graph_node;
3320     using detail::d1::continue_msg;
3321 
3322     using detail::d1::input_node;
3323     using detail::d1::function_node;
3324     using detail::d1::multifunction_node;
3325     using detail::d1::split_node;
3326     using detail::d1::output_port;
3327     using detail::d1::indexer_node;
3328     using detail::d1::tagged_msg;
3329     using detail::d1::cast_to;
3330     using detail::d1::is_a;
3331     using detail::d1::continue_node;
3332     using detail::d1::overwrite_node;
3333     using detail::d1::write_once_node;
3334     using detail::d1::broadcast_node;
3335     using detail::d1::buffer_node;
3336     using detail::d1::queue_node;
3337     using detail::d1::sequencer_node;
3338     using detail::d1::priority_queue_node;
3339     using detail::d1::limiter_node;
3340     using namespace detail::d1::graph_policy_namespace;
3341     using detail::d1::join_node;
3342     using detail::d1::input_port;
3343     using detail::d1::copy_body;
3344     using detail::d1::make_edge;
3345     using detail::d1::remove_edge;
3346     using detail::d1::tag_value;
3347     using detail::d1::composite_node;
3348     using detail::d1::async_node;
3349     using detail::d1::node_priority_t;
3350     using detail::d1::no_priority;
3351 
3352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3353     using detail::d1::follows;
3354     using detail::d1::precedes;
3355     using detail::d1::make_node_set;
3356     using detail::d1::make_edges;
3357 #endif
3358 
3359 } // v1
3360 } // flow
3361 
3362     using detail::d1::flow_control;
3363 
3364 namespace profiling {
3365     using detail::d1::set_name;
3366 } // profiling
3367 
3368 } // tbb
3369 
3370 
3371 #if TBB_USE_PROFILING_TOOLS  && ( __unix__ || __APPLE__ )
3372    // We don't do pragma pop here, since it still gives warning on the USER side
3373    #undef __TBB_NOINLINE_SYM
3374 #endif
3375 
3376 #endif // __TBB_flow_graph_H
3377