xref: /oneTBB/include/oneapi/tbb/flow_graph.h (revision bb45f092)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_H
18 #define __TBB_flow_graph_H
19 
20 #include <atomic>
21 #include <memory>
22 #include <type_traits>
23 
24 #include "detail/_config.h"
25 #include "detail/_namespace_injection.h"
26 #include "spin_mutex.h"
27 #include "null_mutex.h"
28 #include "spin_rw_mutex.h"
29 #include "null_rw_mutex.h"
30 #include "detail/_pipeline_filters.h"
31 #include "detail/_task.h"
32 #include "detail/_small_object_pool.h"
33 #include "cache_aligned_allocator.h"
34 #include "detail/_exception.h"
35 #include "detail/_template_helpers.h"
36 #include "detail/_aggregator.h"
37 #include "detail/_allocator_traits.h"
38 #include "detail/_utils.h"
39 #include "profiling.h"
40 #include "task_arena.h"
41 
42 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
43    #if __INTEL_COMPILER
44        // Disabled warning "routine is both inline and noinline"
45        #pragma warning (push)
46        #pragma warning( disable: 2196 )
47    #endif
48    #define __TBB_NOINLINE_SYM __attribute__((noinline))
49 #else
50    #define __TBB_NOINLINE_SYM
51 #endif
52 
53 #include <tuple>
54 #include <list>
55 #include <queue>
56 #if __TBB_CPP20_CONCEPTS_PRESENT
57 #include <concepts>
58 #endif
59 
60 /** @file
61   \brief The graph related classes and functions
62 
63   There are some applications that best express dependencies as messages
64   passed between nodes in a graph.  These messages may contain data or
65   simply act as signals that a predecessors has completed. The graph
66   class and its associated node classes can be used to express such
67   applications.
68 */
69 
70 namespace tbb {
71 namespace detail {
72 
73 namespace d1 {
74 
75 //! An enumeration the provides the two most common concurrency levels: unlimited and serial
76 enum concurrency { unlimited = 0, serial = 1 };
77 
78 //! A generic null type
79 struct null_type {};
80 
81 //! An empty class used for messages that mean "I'm done"
82 class continue_msg {};
83 
84 } // namespace d1
85 
86 #if __TBB_CPP20_CONCEPTS_PRESENT
87 namespace d0 {
88 
89 template <typename ReturnType, typename OutputType>
90 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
91                                 std::same_as<OutputType, ReturnType>;
92 
93 template <typename Body, typename Output>
94 concept continue_node_body = std::copy_constructible<Body> &&
95                              requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
96                                  { body(v) } -> node_body_return_type<Output>;
97                              };
98 
99 template <typename Body, typename Input, typename Output>
100 concept function_node_body = std::copy_constructible<Body> &&
101                              requires( Body& body, const Input& v ) {
102                                  { body(v) } -> node_body_return_type<Output>;
103                              };
104 
105 template <typename FunctionObject, typename Input, typename Key>
106 concept join_node_function_object = std::copy_constructible<FunctionObject> &&
107                                     requires( FunctionObject& func, const Input& v ) {
108                                         { func(v) } -> adaptive_same_as<Key>;
109                                     };
110 
111 template <typename Body, typename Output>
112 concept input_node_body = std::copy_constructible<Body> &&
113                           requires( Body& body, tbb::detail::d1::flow_control& fc ) {
114                               { body(fc) } -> adaptive_same_as<Output>;
115                           };
116 
117 template <typename Body, typename Input, typename OutputPortsType>
118 concept multifunction_node_body = std::copy_constructible<Body> &&
119                                   requires( Body& body, const Input& v, OutputPortsType& p ) {
120                                       body(v, p);
121                                   };
122 
123 template <typename Sequencer, typename Value>
124 concept sequencer = std::copy_constructible<Sequencer> &&
125                     requires( Sequencer& seq, const Value& value ) {
126                         { seq(value) } -> adaptive_same_as<std::size_t>;
127                     };
128 
129 template <typename Body, typename Input, typename GatewayType>
130 concept async_node_body = std::copy_constructible<Body> &&
131                           requires( Body& body, const Input& v, GatewayType& gateway ) {
132                               body(v, gateway);
133                           };
134 
135 } // namespace d0
136 #endif // __TBB_CPP20_CONCEPTS_PRESENT
137 
138 namespace d1 {
139 
140 //! Forward declaration section
141 template< typename T > class sender;
142 template< typename T > class receiver;
143 class continue_receiver;
144 
145 template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
146 
147 template<typename T, typename M> class successor_cache;
148 template<typename T, typename M> class broadcast_cache;
149 template<typename T, typename M> class round_robin_cache;
150 template<typename T, typename M> class predecessor_cache;
151 template<typename T, typename M> class reservable_predecessor_cache;
152 
153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
154 namespace order {
155 struct following;
156 struct preceding;
157 }
158 template<typename Order, typename... Args> struct node_set;
159 #endif
160 
161 
162 } // namespace d1
163 } // namespace detail
164 } // namespace tbb
165 
166 //! The graph class
167 #include "detail/_flow_graph_impl.h"
168 
169 namespace tbb {
170 namespace detail {
171 namespace d1 {
172 
173 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
174     if (second->priority > first->priority)
175         return std::make_pair(second, first);
176     return std::make_pair(first, second);
177 }
178 
179 // submit task if necessary. Returns the non-enqueued task if there is one.
180 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
181     // if no RHS task, don't change left.
182     if (right == nullptr) return left;
183     // right != nullptr
184     if (left == nullptr) return right;
185     if (left == SUCCESSFULLY_ENQUEUED) return right;
186     // left contains a task
187     if (right != SUCCESSFULLY_ENQUEUED) {
188         // both are valid tasks
189         auto tasks_pair = order_tasks(left, right);
190         spawn_in_graph_arena(g, *tasks_pair.first);
191         return tasks_pair.second;
192     }
193     return left;
194 }
195 
196 //! Pure virtual template class that defines a sender of messages of type T
197 template< typename T >
198 class sender {
199 public:
200     virtual ~sender() {}
201 
202     //! Request an item from the sender
203     virtual bool try_get( T & ) { return false; }
204 
205     //! Reserves an item in the sender
206     virtual bool try_reserve( T & ) { return false; }
207 
208     //! Releases the reserved item
209     virtual bool try_release( ) { return false; }
210 
211     //! Consumes the reserved item
212     virtual bool try_consume( ) { return false; }
213 
214 protected:
215     //! The output type of this sender
216     typedef T output_type;
217 
218     //! The successor type for this node
219     typedef receiver<T> successor_type;
220 
221     //! Add a new successor to this node
222     virtual bool register_successor( successor_type &r ) = 0;
223 
224     //! Removes a successor from this node
225     virtual bool remove_successor( successor_type &r ) = 0;
226 
227     template<typename C>
228     friend bool register_successor(sender<C>& s, receiver<C>& r);
229 
230     template<typename C>
231     friend bool remove_successor  (sender<C>& s, receiver<C>& r);
232 };  // class sender<T>
233 
234 template<typename C>
235 bool register_successor(sender<C>& s, receiver<C>& r) {
236     return s.register_successor(r);
237 }
238 
239 template<typename C>
240 bool remove_successor(sender<C>& s, receiver<C>& r) {
241     return s.remove_successor(r);
242 }
243 
244 //! Pure virtual template class that defines a receiver of messages of type T
245 template< typename T >
246 class receiver {
247 public:
248     //! Destructor
249     virtual ~receiver() {}
250 
251     //! Put an item to the receiver
252     bool try_put( const T& t ) {
253         graph_task *res = try_put_task(t);
254         if (!res) return false;
255         if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
256         return true;
257     }
258 
259     //! put item to successor; return task to run the successor if possible.
260 protected:
261     //! The input type of this receiver
262     typedef T input_type;
263 
264     //! The predecessor type for this node
265     typedef sender<T> predecessor_type;
266 
267     template< typename R, typename B > friend class run_and_put_task;
268     template< typename X, typename Y > friend class broadcast_cache;
269     template< typename X, typename Y > friend class round_robin_cache;
270     virtual graph_task *try_put_task(const T& t) = 0;
271     virtual graph& graph_reference() const = 0;
272 
273     template<typename TT, typename M> friend class successor_cache;
274     virtual bool is_continue_receiver() { return false; }
275 
276     // TODO revamp: reconsider the inheritance and move node priority out of receiver
277     virtual node_priority_t priority() const { return no_priority; }
278 
279     //! Add a predecessor to the node
280     virtual bool register_predecessor( predecessor_type & ) { return false; }
281 
282     //! Remove a predecessor from the node
283     virtual bool remove_predecessor( predecessor_type & ) { return false; }
284 
285     template <typename C>
286     friend bool register_predecessor(receiver<C>& r, sender<C>& s);
287     template <typename C>
288     friend bool remove_predecessor  (receiver<C>& r, sender<C>& s);
289 }; // class receiver<T>
290 
291 template <typename C>
292 bool register_predecessor(receiver<C>& r, sender<C>& s) {
293     return r.register_predecessor(s);
294 }
295 
296 template <typename C>
297 bool remove_predecessor(receiver<C>& r, sender<C>& s) {
298     return r.remove_predecessor(s);
299 }
300 
301 //! Base class for receivers of completion messages
302 /** These receivers automatically reset, but cannot be explicitly waited on */
303 class continue_receiver : public receiver< continue_msg > {
304 protected:
305 
306     //! Constructor
307     explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
308         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
309         my_current_count = 0;
310         my_priority = a_priority;
311     }
312 
313     //! Copy constructor
314     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
315         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
316         my_current_count = 0;
317         my_priority = src.my_priority;
318     }
319 
320     //! Increments the trigger threshold
321     bool register_predecessor( predecessor_type & ) override {
322         spin_mutex::scoped_lock l(my_mutex);
323         ++my_predecessor_count;
324         return true;
325     }
326 
327     //! Decrements the trigger threshold
328     /** Does not check to see if the removal of the predecessor now makes the current count
329         exceed the new threshold.  So removing a predecessor while the graph is active can cause
330         unexpected results. */
331     bool remove_predecessor( predecessor_type & ) override {
332         spin_mutex::scoped_lock l(my_mutex);
333         --my_predecessor_count;
334         return true;
335     }
336 
337     //! The input type
338     typedef continue_msg input_type;
339 
340     //! The predecessor type for this node
341     typedef receiver<input_type>::predecessor_type predecessor_type;
342 
343     template< typename R, typename B > friend class run_and_put_task;
344     template<typename X, typename Y> friend class broadcast_cache;
345     template<typename X, typename Y> friend class round_robin_cache;
346     // execute body is supposed to be too small to create a task for.
347     graph_task* try_put_task( const input_type & ) override {
348         {
349             spin_mutex::scoped_lock l(my_mutex);
350             if ( ++my_current_count < my_predecessor_count )
351                 return SUCCESSFULLY_ENQUEUED;
352             else
353                 my_current_count = 0;
354         }
355         graph_task* res = execute();
356         return res? res : SUCCESSFULLY_ENQUEUED;
357     }
358 
359     spin_mutex my_mutex;
360     int my_predecessor_count;
361     int my_current_count;
362     int my_initial_predecessor_count;
363     node_priority_t my_priority;
364     // the friend declaration in the base class did not eliminate the "protected class"
365     // error in gcc 4.1.2
366     template<typename U, typename V> friend class limiter_node;
367 
368     virtual void reset_receiver( reset_flags f ) {
369         my_current_count = 0;
370         if (f & rf_clear_edges) {
371             my_predecessor_count = my_initial_predecessor_count;
372         }
373     }
374 
375     //! Does whatever should happen when the threshold is reached
376     /** This should be very fast or else spawn a task.  This is
377         called while the sender is blocked in the try_put(). */
378     virtual graph_task* execute() = 0;
379     template<typename TT, typename M> friend class successor_cache;
380     bool is_continue_receiver() override { return true; }
381 
382     node_priority_t priority() const override { return my_priority; }
383 }; // class continue_receiver
384 
385 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
386     template <typename K, typename T>
387     K key_from_message( const T &t ) {
388         return t.key();
389     }
390 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
391 
392 } // d1
393 } // detail
394 } // tbb
395 
396 #include "detail/_flow_graph_trace_impl.h"
397 #include "detail/_hash_compare.h"
398 
399 namespace tbb {
400 namespace detail {
401 namespace d1 {
402 
403 #include "detail/_flow_graph_body_impl.h"
404 #include "detail/_flow_graph_cache_impl.h"
405 #include "detail/_flow_graph_types_impl.h"
406 
407 using namespace graph_policy_namespace;
408 
409 template <typename C, typename N>
410 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
411 {
412     if (begin) current_node = my_graph->my_nodes;
413     //else it is an end iterator by default
414 }
415 
416 template <typename C, typename N>
417 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
418     __TBB_ASSERT(current_node, "graph_iterator at end");
419     return *operator->();
420 }
421 
422 template <typename C, typename N>
423 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
424     return current_node;
425 }
426 
427 template <typename C, typename N>
428 void graph_iterator<C,N>::internal_forward() {
429     if (current_node) current_node = current_node->next;
430 }
431 
432 //! Constructs a graph with isolated task_group_context
433 inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
434     prepare_task_arena();
435     own_context = true;
436     cancelled = false;
437     caught_exception = false;
438     my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
439     fgt_graph(this);
440     my_is_active = true;
441 }
442 
443 inline graph::graph(task_group_context& use_this_context) :
444     my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
445     prepare_task_arena();
446     own_context = false;
447     cancelled = false;
448     caught_exception = false;
449     fgt_graph(this);
450     my_is_active = true;
451 }
452 
453 inline graph::~graph() {
454     wait_for_all();
455     if (own_context) {
456         my_context->~task_group_context();
457         r1::cache_aligned_deallocate(my_context);
458     }
459     delete my_task_arena;
460 }
461 
462 inline void graph::reserve_wait() {
463     my_wait_context.reserve();
464     fgt_reserve_wait(this);
465 }
466 
467 inline void graph::release_wait() {
468     fgt_release_wait(this);
469     my_wait_context.release();
470 }
471 
472 inline void graph::register_node(graph_node *n) {
473     n->next = nullptr;
474     {
475         spin_mutex::scoped_lock lock(nodelist_mutex);
476         n->prev = my_nodes_last;
477         if (my_nodes_last) my_nodes_last->next = n;
478         my_nodes_last = n;
479         if (!my_nodes) my_nodes = n;
480     }
481 }
482 
483 inline void graph::remove_node(graph_node *n) {
484     {
485         spin_mutex::scoped_lock lock(nodelist_mutex);
486         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
487         if (n->prev) n->prev->next = n->next;
488         if (n->next) n->next->prev = n->prev;
489         if (my_nodes_last == n) my_nodes_last = n->prev;
490         if (my_nodes == n) my_nodes = n->next;
491     }
492     n->prev = n->next = nullptr;
493 }
494 
495 inline void graph::reset( reset_flags f ) {
496     // reset context
497     deactivate_graph(*this);
498 
499     my_context->reset();
500     cancelled = false;
501     caught_exception = false;
502     // reset all the nodes comprising the graph
503     for(iterator ii = begin(); ii != end(); ++ii) {
504         graph_node *my_p = &(*ii);
505         my_p->reset_node(f);
506     }
507     // Reattach the arena. Might be useful to run the graph in a particular task_arena
508     // while not limiting graph lifetime to a single task_arena::execute() call.
509     prepare_task_arena( /*reinit=*/true );
510     activate_graph(*this);
511 }
512 
513 inline void graph::cancel() {
514     my_context->cancel_group_execution();
515 }
516 
517 inline graph::iterator graph::begin() { return iterator(this, true); }
518 
519 inline graph::iterator graph::end() { return iterator(this, false); }
520 
521 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
522 
523 inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
524 
525 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
526 
527 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
528 
529 inline graph_node::graph_node(graph& g) : my_graph(g) {
530     my_graph.register_node(this);
531 }
532 
533 inline graph_node::~graph_node() {
534     my_graph.remove_node(this);
535 }
536 
537 #include "detail/_flow_graph_node_impl.h"
538 
539 
540 //! An executable node that acts as a source, i.e. it has no predecessors
541 
542 template < typename Output >
543     __TBB_requires(std::copyable<Output>)
544 class input_node : public graph_node, public sender< Output > {
545 public:
546     //! The type of the output message, which is complete
547     typedef Output output_type;
548 
549     //! The type of successors of this node
550     typedef typename sender<output_type>::successor_type successor_type;
551 
552     // Input node has no input type
553     typedef null_type input_type;
554 
555     //! Constructor for a node with a successor
556     template< typename Body >
557         __TBB_requires(input_node_body<Body, Output>)
558      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
559          : graph_node(g), my_active(false)
560          , my_body( new input_body_leaf< output_type, Body>(body) )
561          , my_init_body( new input_body_leaf< output_type, Body>(body) )
562          , my_successors(this), my_reserved(false), my_has_cached_item(false)
563     {
564         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
565                            static_cast<sender<output_type> *>(this), this->my_body);
566     }
567 
568 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
569     template <typename Body, typename... Successors>
570         __TBB_requires(input_node_body<Body, Output>)
571     input_node( const node_set<order::preceding, Successors...>& successors, Body body )
572         : input_node(successors.graph_reference(), body)
573     {
574         make_edges(*this, successors);
575     }
576 #endif
577 
578     //! Copy constructor
579     __TBB_NOINLINE_SYM input_node( const input_node& src )
580         : graph_node(src.my_graph), sender<Output>()
581         , my_active(false)
582         , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
583         , my_successors(this), my_reserved(false), my_has_cached_item(false)
584     {
585         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
586                            static_cast<sender<output_type> *>(this), this->my_body);
587     }
588 
589     //! The destructor
590     ~input_node() { delete my_body; delete my_init_body; }
591 
592     //! Add a new successor to this node
593     bool register_successor( successor_type &r ) override {
594         spin_mutex::scoped_lock lock(my_mutex);
595         my_successors.register_successor(r);
596         if ( my_active )
597             spawn_put();
598         return true;
599     }
600 
601     //! Removes a successor from this node
602     bool remove_successor( successor_type &r ) override {
603         spin_mutex::scoped_lock lock(my_mutex);
604         my_successors.remove_successor(r);
605         return true;
606     }
607 
608     //! Request an item from the node
609     bool try_get( output_type &v ) override {
610         spin_mutex::scoped_lock lock(my_mutex);
611         if ( my_reserved )
612             return false;
613 
614         if ( my_has_cached_item ) {
615             v = my_cached_item;
616             my_has_cached_item = false;
617             return true;
618         }
619         // we've been asked to provide an item, but we have none.  enqueue a task to
620         // provide one.
621         if ( my_active )
622             spawn_put();
623         return false;
624     }
625 
626     //! Reserves an item.
627     bool try_reserve( output_type &v ) override {
628         spin_mutex::scoped_lock lock(my_mutex);
629         if ( my_reserved ) {
630             return false;
631         }
632 
633         if ( my_has_cached_item ) {
634             v = my_cached_item;
635             my_reserved = true;
636             return true;
637         } else {
638             return false;
639         }
640     }
641 
642     //! Release a reserved item.
643     /** true = item has been released and so remains in sender, dest must request or reserve future items */
644     bool try_release( ) override {
645         spin_mutex::scoped_lock lock(my_mutex);
646         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
647         my_reserved = false;
648         if(!my_successors.empty())
649             spawn_put();
650         return true;
651     }
652 
653     //! Consumes a reserved item
654     bool try_consume( ) override {
655         spin_mutex::scoped_lock lock(my_mutex);
656         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
657         my_reserved = false;
658         my_has_cached_item = false;
659         if ( !my_successors.empty() ) {
660             spawn_put();
661         }
662         return true;
663     }
664 
665     //! Activates a node that was created in the inactive state
666     void activate() {
667         spin_mutex::scoped_lock lock(my_mutex);
668         my_active = true;
669         if (!my_successors.empty())
670             spawn_put();
671     }
672 
673     template<typename Body>
674     Body copy_function_object() {
675         input_body<output_type> &body_ref = *this->my_body;
676         return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
677     }
678 
679 protected:
680 
681     //! resets the input_node to its initial state
682     void reset_node( reset_flags f) override {
683         my_active = false;
684         my_reserved = false;
685         my_has_cached_item = false;
686 
687         if(f & rf_clear_edges) my_successors.clear();
688         if(f & rf_reset_bodies) {
689             input_body<output_type> *tmp = my_init_body->clone();
690             delete my_body;
691             my_body = tmp;
692         }
693     }
694 
695 private:
696     spin_mutex my_mutex;
697     bool my_active;
698     input_body<output_type> *my_body;
699     input_body<output_type> *my_init_body;
700     broadcast_cache< output_type > my_successors;
701     bool my_reserved;
702     bool my_has_cached_item;
703     output_type my_cached_item;
704 
705     // used by apply_body_bypass, can invoke body of node.
706     bool try_reserve_apply_body(output_type &v) {
707         spin_mutex::scoped_lock lock(my_mutex);
708         if ( my_reserved ) {
709             return false;
710         }
711         if ( !my_has_cached_item ) {
712             flow_control control;
713 
714             fgt_begin_body( my_body );
715 
716             my_cached_item = (*my_body)(control);
717             my_has_cached_item = !control.is_pipeline_stopped;
718 
719             fgt_end_body( my_body );
720         }
721         if ( my_has_cached_item ) {
722             v = my_cached_item;
723             my_reserved = true;
724             return true;
725         } else {
726             return false;
727         }
728     }
729 
730     graph_task* create_put_task() {
731         small_object_allocator allocator{};
732         typedef input_node_task_bypass< input_node<output_type> > task_type;
733         graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
734         my_graph.reserve_wait();
735         return t;
736     }
737 
738     //! Spawns a task that applies the body
739     void spawn_put( ) {
740         if(is_graph_active(this->my_graph)) {
741             spawn_in_graph_arena(this->my_graph, *create_put_task());
742         }
743     }
744 
745     friend class input_node_task_bypass< input_node<output_type> >;
746     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
747     graph_task* apply_body_bypass( ) {
748         output_type v;
749         if ( !try_reserve_apply_body(v) )
750             return nullptr;
751 
752         graph_task *last_task = my_successors.try_put_task(v);
753         if ( last_task )
754             try_consume();
755         else
756             try_release();
757         return last_task;
758     }
759 };  // class input_node
760 
761 //! Implements a function node that supports Input -> Output
762 template<typename Input, typename Output = continue_msg, typename Policy = queueing>
763     __TBB_requires(std::default_initializable<Input> &&
764                    std::copy_constructible<Input> &&
765                    std::copy_constructible<Output>)
766 class function_node
767     : public graph_node
768     , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
769     , public function_output<Output>
770 {
771     typedef cache_aligned_allocator<Input> internals_allocator;
772 
773 public:
774     typedef Input input_type;
775     typedef Output output_type;
776     typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
777     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
778     typedef function_output<output_type> fOutput_type;
779     typedef typename input_impl_type::predecessor_type predecessor_type;
780     typedef typename fOutput_type::successor_type successor_type;
781 
782     using input_impl_type::my_predecessors;
783 
784     //! Constructor
785     // input_queue_type is allocated here, but destroyed in the function_input_base.
786     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
787     // be done in one place.  This would be an interface-breaking change.
788     template< typename Body >
789         __TBB_requires(function_node_body<Body, Input, Output>)
790      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
791                    Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
792         : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
793           fOutput_type(g) {
794         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
795                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
796     }
797 
798     template <typename Body>
799         __TBB_requires(function_node_body<Body, Input, Output>)
800     function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
801         : function_node(g, concurrency, body, Policy(), a_priority) {}
802 
803 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
804     template <typename Body, typename... Args>
805         __TBB_requires(function_node_body<Body, Input, Output>)
806     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
807                    Policy p = Policy(), node_priority_t a_priority = no_priority )
808         : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
809         make_edges_in_order(nodes, *this);
810     }
811 
812     template <typename Body, typename... Args>
813         __TBB_requires(function_node_body<Body, Input, Output>)
814     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
815         : function_node(nodes, concurrency, body, Policy(), a_priority) {}
816 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
817 
818     //! Copy constructor
819     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
820         graph_node(src.my_graph),
821         input_impl_type(src),
822         fOutput_type(src.my_graph) {
823         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
824                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
825     }
826 
827 protected:
828     template< typename R, typename B > friend class run_and_put_task;
829     template<typename X, typename Y> friend class broadcast_cache;
830     template<typename X, typename Y> friend class round_robin_cache;
831     using input_impl_type::try_put_task;
832 
833     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
834 
835     void reset_node(reset_flags f) override {
836         input_impl_type::reset_function_input(f);
837         // TODO: use clear() instead.
838         if(f & rf_clear_edges) {
839             successors().clear();
840             my_predecessors.clear();
841         }
842         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
843         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
844     }
845 
846 };  // class function_node
847 
848 //! implements a function node that supports Input -> (set of outputs)
849 // Output is a tuple of output types.
850 template<typename Input, typename Output, typename Policy = queueing>
851     __TBB_requires(std::default_initializable<Input> &&
852                    std::copy_constructible<Input>)
853 class multifunction_node :
854     public graph_node,
855     public multifunction_input
856     <
857         Input,
858         typename wrap_tuple_elements<
859             std::tuple_size<Output>::value,  // #elements in tuple
860             multifunction_output,  // wrap this around each element
861             Output // the tuple providing the types
862         >::type,
863         Policy,
864         cache_aligned_allocator<Input>
865     >
866 {
867     typedef cache_aligned_allocator<Input> internals_allocator;
868 
869 protected:
870     static const int N = std::tuple_size<Output>::value;
871 public:
872     typedef Input input_type;
873     typedef null_type output_type;
874     typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
875     typedef multifunction_input<
876         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
877     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
878 private:
879     using input_impl_type::my_predecessors;
880 public:
881     template<typename Body>
882         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
883     __TBB_NOINLINE_SYM multifunction_node(
884         graph &g, size_t concurrency,
885         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
886     ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
887         fgt_multioutput_node_with_body<N>(
888             CODEPTR(), FLOW_MULTIFUNCTION_NODE,
889             &this->my_graph, static_cast<receiver<input_type> *>(this),
890             this->output_ports(), this->my_body
891         );
892     }
893 
894     template <typename Body>
895         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
896     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
897         : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
898 
899 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
900     template <typename Body, typename... Args>
901         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
902     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
903                        Policy p = Policy(), node_priority_t a_priority = no_priority)
904         : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
905         make_edges_in_order(nodes, *this);
906     }
907 
908     template <typename Body, typename... Args>
909         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
910     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
911         : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
912 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
913 
914     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
915         graph_node(other.my_graph), input_impl_type(other) {
916         fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
917                 &this->my_graph, static_cast<receiver<input_type> *>(this),
918                 this->output_ports(), this->my_body );
919     }
920 
921     // all the guts are in multifunction_input...
922 protected:
923     void reset_node(reset_flags f) override { input_impl_type::reset(f); }
924 };  // multifunction_node
925 
926 //! split_node: accepts a tuple as input, forwards each element of the tuple to its
927 //  successors.  The node has unlimited concurrency, so it does not reject inputs.
928 template<typename TupleType>
929 class split_node : public graph_node, public receiver<TupleType> {
930     static const int N = std::tuple_size<TupleType>::value;
931     typedef receiver<TupleType> base_type;
932 public:
933     typedef TupleType input_type;
934     typedef typename wrap_tuple_elements<
935             N,  // #elements in tuple
936             multifunction_output,  // wrap this around each element
937             TupleType // the tuple providing the types
938         >::type  output_ports_type;
939 
940     __TBB_NOINLINE_SYM explicit split_node(graph &g)
941         : graph_node(g),
942           my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
943     {
944         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
945             static_cast<receiver<input_type> *>(this), this->output_ports());
946     }
947 
948 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
949     template <typename... Args>
950     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
951         make_edges_in_order(nodes, *this);
952     }
953 #endif
954 
955     __TBB_NOINLINE_SYM split_node(const split_node& other)
956         : graph_node(other.my_graph), base_type(other),
957           my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
958     {
959         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
960             static_cast<receiver<input_type> *>(this), this->output_ports());
961     }
962 
963     output_ports_type &output_ports() { return my_output_ports; }
964 
965 protected:
966     graph_task *try_put_task(const TupleType& t) override {
967         // Sending split messages in parallel is not justified, as overheads would prevail.
968         // Also, we do not have successors here. So we just tell the task returned here is successful.
969         return emit_element<N>::emit_this(this->my_graph, t, output_ports());
970     }
971     void reset_node(reset_flags f) override {
972         if (f & rf_clear_edges)
973             clear_element<N>::clear_this(my_output_ports);
974 
975         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
976     }
977     graph& graph_reference() const override {
978         return my_graph;
979     }
980 
981 private:
982     output_ports_type my_output_ports;
983 };
984 
985 //! Implements an executable node that supports continue_msg -> Output
986 template <typename Output, typename Policy = Policy<void> >
987     __TBB_requires(std::copy_constructible<Output>)
988 class continue_node : public graph_node, public continue_input<Output, Policy>,
989                       public function_output<Output> {
990 public:
991     typedef continue_msg input_type;
992     typedef Output output_type;
993     typedef continue_input<Output, Policy> input_impl_type;
994     typedef function_output<output_type> fOutput_type;
995     typedef typename input_impl_type::predecessor_type predecessor_type;
996     typedef typename fOutput_type::successor_type successor_type;
997 
998     //! Constructor for executable node with continue_msg -> Output
999     template <typename Body >
1000         __TBB_requires(continue_node_body<Body, Output>)
1001     __TBB_NOINLINE_SYM continue_node(
1002         graph &g,
1003         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1004     ) : graph_node(g), input_impl_type( g, body, a_priority ),
1005         fOutput_type(g) {
1006         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1007 
1008                                            static_cast<receiver<input_type> *>(this),
1009                                            static_cast<sender<output_type> *>(this), this->my_body );
1010     }
1011 
1012     template <typename Body>
1013         __TBB_requires(continue_node_body<Body, Output>)
1014     continue_node( graph& g, Body body, node_priority_t a_priority )
1015         : continue_node(g, body, Policy(), a_priority) {}
1016 
1017 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1018     template <typename Body, typename... Args>
1019         __TBB_requires(continue_node_body<Body, Output>)
1020     continue_node( const node_set<Args...>& nodes, Body body,
1021                    Policy p = Policy(), node_priority_t a_priority = no_priority )
1022         : continue_node(nodes.graph_reference(), body, p, a_priority ) {
1023         make_edges_in_order(nodes, *this);
1024     }
1025     template <typename Body, typename... Args>
1026         __TBB_requires(continue_node_body<Body, Output>)
1027     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
1028         : continue_node(nodes, body, Policy(), a_priority) {}
1029 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1030 
1031     //! Constructor for executable node with continue_msg -> Output
1032     template <typename Body >
1033         __TBB_requires(continue_node_body<Body, Output>)
1034     __TBB_NOINLINE_SYM continue_node(
1035         graph &g, int number_of_predecessors,
1036         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
1037     ) : graph_node(g)
1038       , input_impl_type(g, number_of_predecessors, body, a_priority),
1039         fOutput_type(g) {
1040         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1041                                            static_cast<receiver<input_type> *>(this),
1042                                            static_cast<sender<output_type> *>(this), this->my_body );
1043     }
1044 
1045     template <typename Body>
1046         __TBB_requires(continue_node_body<Body, Output>)
1047     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
1048         : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
1049 
1050 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1051     template <typename Body, typename... Args>
1052         __TBB_requires(continue_node_body<Body, Output>)
1053     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1054                    Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
1055         : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
1056         make_edges_in_order(nodes, *this);
1057     }
1058 
1059     template <typename Body, typename... Args>
1060         __TBB_requires(continue_node_body<Body, Output>)
1061     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
1062                    Body body, node_priority_t a_priority )
1063         : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
1064 #endif
1065 
1066     //! Copy constructor
1067     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
1068         graph_node(src.my_graph), input_impl_type(src),
1069         function_output<Output>(src.my_graph) {
1070         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
1071                                            static_cast<receiver<input_type> *>(this),
1072                                            static_cast<sender<output_type> *>(this), this->my_body );
1073     }
1074 
1075 protected:
1076     template< typename R, typename B > friend class run_and_put_task;
1077     template<typename X, typename Y> friend class broadcast_cache;
1078     template<typename X, typename Y> friend class round_robin_cache;
1079     using input_impl_type::try_put_task;
1080     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
1081 
1082     void reset_node(reset_flags f) override {
1083         input_impl_type::reset_receiver(f);
1084         if(f & rf_clear_edges)successors().clear();
1085         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
1086     }
1087 };  // continue_node
1088 
1089 //! Forwards messages of type T to all successors
1090 template <typename T>
1091 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
1092 public:
1093     typedef T input_type;
1094     typedef T output_type;
1095     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1096     typedef typename sender<output_type>::successor_type successor_type;
1097 private:
1098     broadcast_cache<input_type> my_successors;
1099 public:
1100 
1101     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
1102         fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
1103                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1104     }
1105 
1106 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1107     template <typename... Args>
1108     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
1109         make_edges_in_order(nodes, *this);
1110     }
1111 #endif
1112 
1113     // Copy constructor
1114     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
1115 
1116     //! Adds a successor
1117     bool register_successor( successor_type &r ) override {
1118         my_successors.register_successor( r );
1119         return true;
1120     }
1121 
1122     //! Removes s as a successor
1123     bool remove_successor( successor_type &r ) override {
1124         my_successors.remove_successor( r );
1125         return true;
1126     }
1127 
1128 protected:
1129     template< typename R, typename B > friend class run_and_put_task;
1130     template<typename X, typename Y> friend class broadcast_cache;
1131     template<typename X, typename Y> friend class round_robin_cache;
1132     //! build a task to run the successor if possible.  Default is old behavior.
1133     graph_task *try_put_task(const T& t) override {
1134         graph_task *new_task = my_successors.try_put_task(t);
1135         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
1136         return new_task;
1137     }
1138 
1139     graph& graph_reference() const override {
1140         return my_graph;
1141     }
1142 
1143     void reset_node(reset_flags f) override {
1144         if (f&rf_clear_edges) {
1145            my_successors.clear();
1146         }
1147         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
1148     }
1149 };  // broadcast_node
1150 
1151 //! Forwards messages in arbitrary order
1152 template <typename T>
1153 class buffer_node
1154     : public graph_node
1155     , public reservable_item_buffer< T, cache_aligned_allocator<T> >
1156     , public receiver<T>, public sender<T>
1157 {
1158     typedef cache_aligned_allocator<T> internals_allocator;
1159 
1160 public:
1161     typedef T input_type;
1162     typedef T output_type;
1163     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1164     typedef typename sender<output_type>::successor_type successor_type;
1165     typedef buffer_node<T> class_type;
1166 
1167 protected:
1168     typedef size_t size_type;
1169     round_robin_cache< T, null_rw_mutex > my_successors;
1170 
1171     friend class forward_task_bypass< class_type >;
1172 
1173     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1174     };
1175 
1176     // implements the aggregator_operation concept
1177     class buffer_operation : public aggregated_operation< buffer_operation > {
1178     public:
1179         char type;
1180         T* elem;
1181         graph_task* ltask;
1182         successor_type *r;
1183 
1184         buffer_operation(const T& e, op_type t) : type(char(t))
1185                                                   , elem(const_cast<T*>(&e)) , ltask(nullptr)
1186                                                   , r(nullptr)
1187         {}
1188         buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
1189     };
1190 
1191     bool forwarder_busy;
1192     typedef aggregating_functor<class_type, buffer_operation> handler_type;
1193     friend class aggregating_functor<class_type, buffer_operation>;
1194     aggregator< handler_type, buffer_operation> my_aggregator;
1195 
1196     virtual void handle_operations(buffer_operation *op_list) {
1197         handle_operations_impl(op_list, this);
1198     }
1199 
1200     template<typename derived_type>
1201     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1202         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1203 
1204         buffer_operation *tmp = nullptr;
1205         bool try_forwarding = false;
1206         while (op_list) {
1207             tmp = op_list;
1208             op_list = op_list->next;
1209             switch (tmp->type) {
1210             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
1211             case rem_succ: internal_rem_succ(tmp); break;
1212             case req_item: internal_pop(tmp); break;
1213             case res_item: internal_reserve(tmp); break;
1214             case rel_res:  internal_release(tmp); try_forwarding = true; break;
1215             case con_res:  internal_consume(tmp); try_forwarding = true; break;
1216             case put_item: try_forwarding = internal_push(tmp); break;
1217             case try_fwd_task: internal_forward_task(tmp); break;
1218             }
1219         }
1220 
1221         derived->order();
1222 
1223         if (try_forwarding && !forwarder_busy) {
1224             if(is_graph_active(this->my_graph)) {
1225                 forwarder_busy = true;
1226                 typedef forward_task_bypass<class_type> task_type;
1227                 small_object_allocator allocator{};
1228                 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
1229                 my_graph.reserve_wait();
1230                 // tmp should point to the last item handled by the aggregator.  This is the operation
1231                 // the handling thread enqueued.  So modifying that record will be okay.
1232                 // TODO revamp: check that the issue is still present
1233                 // workaround for icc bug  (at least 12.0 and 13.0)
1234                 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
1235                 //        argument types are: (graph, graph_task *, graph_task *)
1236                 graph_task *z = tmp->ltask;
1237                 graph &g = this->my_graph;
1238                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
1239             }
1240         }
1241     }  // handle_operations
1242 
1243     inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
1244         return op_data.ltask;
1245     }
1246 
1247     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1248         graph_task *ft = grab_forwarding_task(op_data);
1249         if(ft) {
1250             spawn_in_graph_arena(graph_reference(), *ft);
1251             return true;
1252         }
1253         return false;
1254     }
1255 
1256     //! This is executed by an enqueued task, the "forwarder"
1257     virtual graph_task *forward_task() {
1258         buffer_operation op_data(try_fwd_task);
1259         graph_task *last_task = nullptr;
1260         do {
1261             op_data.status = WAIT;
1262             op_data.ltask = nullptr;
1263             my_aggregator.execute(&op_data);
1264 
1265             // workaround for icc bug
1266             graph_task *xtask = op_data.ltask;
1267             graph& g = this->my_graph;
1268             last_task = combine_tasks(g, last_task, xtask);
1269         } while (op_data.status ==SUCCEEDED);
1270         return last_task;
1271     }
1272 
1273     //! Register successor
1274     virtual void internal_reg_succ(buffer_operation *op) {
1275         __TBB_ASSERT(op->r, nullptr);
1276         my_successors.register_successor(*(op->r));
1277         op->status.store(SUCCEEDED, std::memory_order_release);
1278     }
1279 
1280     //! Remove successor
1281     virtual void internal_rem_succ(buffer_operation *op) {
1282         __TBB_ASSERT(op->r, nullptr);
1283         my_successors.remove_successor(*(op->r));
1284         op->status.store(SUCCEEDED, std::memory_order_release);
1285     }
1286 
1287 private:
1288     void order() {}
1289 
1290     bool is_item_valid() {
1291         return this->my_item_valid(this->my_tail - 1);
1292     }
1293 
1294     void try_put_and_add_task(graph_task*& last_task) {
1295         graph_task *new_task = my_successors.try_put_task(this->back());
1296         if (new_task) {
1297             // workaround for icc bug
1298             graph& g = this->my_graph;
1299             last_task = combine_tasks(g, last_task, new_task);
1300             this->destroy_back();
1301         }
1302     }
1303 
1304 protected:
1305     //! Tries to forward valid items to successors
1306     virtual void internal_forward_task(buffer_operation *op) {
1307         internal_forward_task_impl(op, this);
1308     }
1309 
1310     template<typename derived_type>
1311     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
1312         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
1313 
1314         if (this->my_reserved || !derived->is_item_valid()) {
1315             op->status.store(FAILED, std::memory_order_release);
1316             this->forwarder_busy = false;
1317             return;
1318         }
1319         // Try forwarding, giving each successor a chance
1320         graph_task* last_task = nullptr;
1321         size_type counter = my_successors.size();
1322         for (; counter > 0 && derived->is_item_valid(); --counter)
1323             derived->try_put_and_add_task(last_task);
1324 
1325         op->ltask = last_task;  // return task
1326         if (last_task && !counter) {
1327             op->status.store(SUCCEEDED, std::memory_order_release);
1328         }
1329         else {
1330             op->status.store(FAILED, std::memory_order_release);
1331             forwarder_busy = false;
1332         }
1333     }
1334 
1335     virtual bool internal_push(buffer_operation *op) {
1336         __TBB_ASSERT(op->elem, nullptr);
1337         this->push_back(*(op->elem));
1338         op->status.store(SUCCEEDED, std::memory_order_release);
1339         return true;
1340     }
1341 
1342     virtual void internal_pop(buffer_operation *op) {
1343         __TBB_ASSERT(op->elem, nullptr);
1344         if(this->pop_back(*(op->elem))) {
1345             op->status.store(SUCCEEDED, std::memory_order_release);
1346         }
1347         else {
1348             op->status.store(FAILED, std::memory_order_release);
1349         }
1350     }
1351 
1352     virtual void internal_reserve(buffer_operation *op) {
1353         __TBB_ASSERT(op->elem, nullptr);
1354         if(this->reserve_front(*(op->elem))) {
1355             op->status.store(SUCCEEDED, std::memory_order_release);
1356         }
1357         else {
1358             op->status.store(FAILED, std::memory_order_release);
1359         }
1360     }
1361 
1362     virtual void internal_consume(buffer_operation *op) {
1363         this->consume_front();
1364         op->status.store(SUCCEEDED, std::memory_order_release);
1365     }
1366 
1367     virtual void internal_release(buffer_operation *op) {
1368         this->release_front();
1369         op->status.store(SUCCEEDED, std::memory_order_release);
1370     }
1371 
1372 public:
1373     //! Constructor
1374     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
1375         : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
1376           sender<T>(), my_successors(this), forwarder_busy(false)
1377     {
1378         my_aggregator.initialize_handler(handler_type(this));
1379         fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
1380                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
1381     }
1382 
1383 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1384     template <typename... Args>
1385     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
1386         make_edges_in_order(nodes, *this);
1387     }
1388 #endif
1389 
1390     //! Copy constructor
1391     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
1392 
1393     //
1394     // message sender implementation
1395     //
1396 
1397     //! Adds a new successor.
1398     /** Adds successor r to the list of successors; may forward tasks.  */
1399     bool register_successor( successor_type &r ) override {
1400         buffer_operation op_data(reg_succ);
1401         op_data.r = &r;
1402         my_aggregator.execute(&op_data);
1403         (void)enqueue_forwarding_task(op_data);
1404         return true;
1405     }
1406 
1407     //! Removes a successor.
1408     /** Removes successor r from the list of successors.
1409         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
1410     bool remove_successor( successor_type &r ) override {
1411         // TODO revamp: investigate why full qualification is necessary here
1412         tbb::detail::d1::remove_predecessor(r, *this);
1413         buffer_operation op_data(rem_succ);
1414         op_data.r = &r;
1415         my_aggregator.execute(&op_data);
1416         // even though this operation does not cause a forward, if we are the handler, and
1417         // a forward is scheduled, we may be the first to reach this point after the aggregator,
1418         // and so should check for the task.
1419         (void)enqueue_forwarding_task(op_data);
1420         return true;
1421     }
1422 
1423     //! Request an item from the buffer_node
1424     /**  true = v contains the returned item<BR>
1425          false = no item has been returned */
1426     bool try_get( T &v ) override {
1427         buffer_operation op_data(req_item);
1428         op_data.elem = &v;
1429         my_aggregator.execute(&op_data);
1430         (void)enqueue_forwarding_task(op_data);
1431         return (op_data.status==SUCCEEDED);
1432     }
1433 
1434     //! Reserves an item.
1435     /**  false = no item can be reserved<BR>
1436          true = an item is reserved */
1437     bool try_reserve( T &v ) override {
1438         buffer_operation op_data(res_item);
1439         op_data.elem = &v;
1440         my_aggregator.execute(&op_data);
1441         (void)enqueue_forwarding_task(op_data);
1442         return (op_data.status==SUCCEEDED);
1443     }
1444 
1445     //! Release a reserved item.
1446     /**  true = item has been released and so remains in sender */
1447     bool try_release() override {
1448         buffer_operation op_data(rel_res);
1449         my_aggregator.execute(&op_data);
1450         (void)enqueue_forwarding_task(op_data);
1451         return true;
1452     }
1453 
1454     //! Consumes a reserved item.
1455     /** true = item is removed from sender and reservation removed */
1456     bool try_consume() override {
1457         buffer_operation op_data(con_res);
1458         my_aggregator.execute(&op_data);
1459         (void)enqueue_forwarding_task(op_data);
1460         return true;
1461     }
1462 
1463 protected:
1464 
1465     template< typename R, typename B > friend class run_and_put_task;
1466     template<typename X, typename Y> friend class broadcast_cache;
1467     template<typename X, typename Y> friend class round_robin_cache;
1468     //! receive an item, return a task *if possible
1469     graph_task *try_put_task(const T &t) override {
1470         buffer_operation op_data(t, put_item);
1471         my_aggregator.execute(&op_data);
1472         graph_task *ft = grab_forwarding_task(op_data);
1473         // sequencer_nodes can return failure (if an item has been previously inserted)
1474         // We have to spawn the returned task if our own operation fails.
1475 
1476         if(ft && op_data.status ==FAILED) {
1477             // we haven't succeeded queueing the item, but for some reason the
1478             // call returned a task (if another request resulted in a successful
1479             // forward this could happen.)  Queue the task and reset the pointer.
1480             spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
1481         }
1482         else if(!ft && op_data.status ==SUCCEEDED) {
1483             ft = SUCCESSFULLY_ENQUEUED;
1484         }
1485         return ft;
1486     }
1487 
1488     graph& graph_reference() const override {
1489         return my_graph;
1490     }
1491 
1492 protected:
1493     void reset_node( reset_flags f) override {
1494         reservable_item_buffer<T, internals_allocator>::reset();
1495         // TODO: just clear structures
1496         if (f&rf_clear_edges) {
1497             my_successors.clear();
1498         }
1499         forwarder_busy = false;
1500     }
1501 };  // buffer_node
1502 
1503 //! Forwards messages in FIFO order
1504 template <typename T>
1505 class queue_node : public buffer_node<T> {
1506 protected:
1507     typedef buffer_node<T> base_type;
1508     typedef typename base_type::size_type size_type;
1509     typedef typename base_type::buffer_operation queue_operation;
1510     typedef queue_node class_type;
1511 
1512 private:
1513     template<typename> friend class buffer_node;
1514 
1515     bool is_item_valid() {
1516         return this->my_item_valid(this->my_head);
1517     }
1518 
1519     void try_put_and_add_task(graph_task*& last_task) {
1520         graph_task *new_task = this->my_successors.try_put_task(this->front());
1521         if (new_task) {
1522             // workaround for icc bug
1523             graph& graph_ref = this->graph_reference();
1524             last_task = combine_tasks(graph_ref, last_task, new_task);
1525             this->destroy_front();
1526         }
1527     }
1528 
1529 protected:
1530     void internal_forward_task(queue_operation *op) override {
1531         this->internal_forward_task_impl(op, this);
1532     }
1533 
1534     void internal_pop(queue_operation *op) override {
1535         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
1536             op->status.store(FAILED, std::memory_order_release);
1537         }
1538         else {
1539             this->pop_front(*(op->elem));
1540             op->status.store(SUCCEEDED, std::memory_order_release);
1541         }
1542     }
1543     void internal_reserve(queue_operation *op) override {
1544         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
1545             op->status.store(FAILED, std::memory_order_release);
1546         }
1547         else {
1548             this->reserve_front(*(op->elem));
1549             op->status.store(SUCCEEDED, std::memory_order_release);
1550         }
1551     }
1552     void internal_consume(queue_operation *op) override {
1553         this->consume_front();
1554         op->status.store(SUCCEEDED, std::memory_order_release);
1555     }
1556 
1557 public:
1558     typedef T input_type;
1559     typedef T output_type;
1560     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1561     typedef typename sender<output_type>::successor_type successor_type;
1562 
1563     //! Constructor
1564     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
1565         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1566                                  static_cast<receiver<input_type> *>(this),
1567                                  static_cast<sender<output_type> *>(this) );
1568     }
1569 
1570 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1571     template <typename... Args>
1572     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
1573         make_edges_in_order(nodes, *this);
1574     }
1575 #endif
1576 
1577     //! Copy constructor
1578     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
1579         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
1580                                  static_cast<receiver<input_type> *>(this),
1581                                  static_cast<sender<output_type> *>(this) );
1582     }
1583 
1584 
1585 protected:
1586     void reset_node( reset_flags f) override {
1587         base_type::reset_node(f);
1588     }
1589 };  // queue_node
1590 
1591 //! Forwards messages in sequence order
1592 template <typename T>
1593     __TBB_requires(std::copyable<T>)
1594 class sequencer_node : public queue_node<T> {
1595     function_body< T, size_t > *my_sequencer;
1596     // my_sequencer should be a benign function and must be callable
1597     // from a parallel context.  Does this mean it needn't be reset?
1598 public:
1599     typedef T input_type;
1600     typedef T output_type;
1601     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1602     typedef typename sender<output_type>::successor_type successor_type;
1603 
1604     //! Constructor
1605     template< typename Sequencer >
1606         __TBB_requires(sequencer<Sequencer, T>)
1607     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
1608         my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
1609         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1610                                  static_cast<receiver<input_type> *>(this),
1611                                  static_cast<sender<output_type> *>(this) );
1612     }
1613 
1614 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1615     template <typename Sequencer, typename... Args>
1616         __TBB_requires(sequencer<Sequencer, T>)
1617     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
1618         : sequencer_node(nodes.graph_reference(), s) {
1619         make_edges_in_order(nodes, *this);
1620     }
1621 #endif
1622 
1623     //! Copy constructor
1624     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
1625         my_sequencer( src.my_sequencer->clone() ) {
1626         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
1627                                  static_cast<receiver<input_type> *>(this),
1628                                  static_cast<sender<output_type> *>(this) );
1629     }
1630 
1631     //! Destructor
1632     ~sequencer_node() { delete my_sequencer; }
1633 
1634 protected:
1635     typedef typename buffer_node<T>::size_type size_type;
1636     typedef typename buffer_node<T>::buffer_operation sequencer_operation;
1637 
1638 private:
1639     bool internal_push(sequencer_operation *op) override {
1640         size_type tag = (*my_sequencer)(*(op->elem));
1641 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
1642         if (tag < this->my_head) {
1643             // have already emitted a message with this tag
1644             op->status.store(FAILED, std::memory_order_release);
1645             return false;
1646         }
1647 #endif
1648         // cannot modify this->my_tail now; the buffer would be inconsistent.
1649         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
1650 
1651         if (this->size(new_tail) > this->capacity()) {
1652             this->grow_my_array(this->size(new_tail));
1653         }
1654         this->my_tail = new_tail;
1655 
1656         const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
1657         op->status.store(res, std::memory_order_release);
1658         return res ==SUCCEEDED;
1659     }
1660 };  // sequencer_node
1661 
1662 //! Forwards messages in priority order
1663 template<typename T, typename Compare = std::less<T>>
1664 class priority_queue_node : public buffer_node<T> {
1665 public:
1666     typedef T input_type;
1667     typedef T output_type;
1668     typedef buffer_node<T> base_type;
1669     typedef priority_queue_node class_type;
1670     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1671     typedef typename sender<output_type>::successor_type successor_type;
1672 
1673     //! Constructor
1674     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
1675         : buffer_node<T>(g), compare(comp), mark(0) {
1676         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1677                                  static_cast<receiver<input_type> *>(this),
1678                                  static_cast<sender<output_type> *>(this) );
1679     }
1680 
1681 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
1682     template <typename... Args>
1683     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
1684         : priority_queue_node(nodes.graph_reference(), comp) {
1685         make_edges_in_order(nodes, *this);
1686     }
1687 #endif
1688 
1689     //! Copy constructor
1690     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
1691         : buffer_node<T>(src), mark(0)
1692     {
1693         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
1694                                  static_cast<receiver<input_type> *>(this),
1695                                  static_cast<sender<output_type> *>(this) );
1696     }
1697 
1698 protected:
1699 
1700     void reset_node( reset_flags f) override {
1701         mark = 0;
1702         base_type::reset_node(f);
1703     }
1704 
1705     typedef typename buffer_node<T>::size_type size_type;
1706     typedef typename buffer_node<T>::item_type item_type;
1707     typedef typename buffer_node<T>::buffer_operation prio_operation;
1708 
1709     //! Tries to forward valid items to successors
1710     void internal_forward_task(prio_operation *op) override {
1711         this->internal_forward_task_impl(op, this);
1712     }
1713 
1714     void handle_operations(prio_operation *op_list) override {
1715         this->handle_operations_impl(op_list, this);
1716     }
1717 
1718     bool internal_push(prio_operation *op) override {
1719         prio_push(*(op->elem));
1720         op->status.store(SUCCEEDED, std::memory_order_release);
1721         return true;
1722     }
1723 
1724     void internal_pop(prio_operation *op) override {
1725         // if empty or already reserved, don't pop
1726         if ( this->my_reserved == true || this->my_tail == 0 ) {
1727             op->status.store(FAILED, std::memory_order_release);
1728             return;
1729         }
1730 
1731         *(op->elem) = prio();
1732         op->status.store(SUCCEEDED, std::memory_order_release);
1733         prio_pop();
1734 
1735     }
1736 
1737     // pops the highest-priority item, saves copy
1738     void internal_reserve(prio_operation *op) override {
1739         if (this->my_reserved == true || this->my_tail == 0) {
1740             op->status.store(FAILED, std::memory_order_release);
1741             return;
1742         }
1743         this->my_reserved = true;
1744         *(op->elem) = prio();
1745         reserved_item = *(op->elem);
1746         op->status.store(SUCCEEDED, std::memory_order_release);
1747         prio_pop();
1748     }
1749 
1750     void internal_consume(prio_operation *op) override {
1751         op->status.store(SUCCEEDED, std::memory_order_release);
1752         this->my_reserved = false;
1753         reserved_item = input_type();
1754     }
1755 
1756     void internal_release(prio_operation *op) override {
1757         op->status.store(SUCCEEDED, std::memory_order_release);
1758         prio_push(reserved_item);
1759         this->my_reserved = false;
1760         reserved_item = input_type();
1761     }
1762 
1763 private:
1764     template<typename> friend class buffer_node;
1765 
1766     void order() {
1767         if (mark < this->my_tail) heapify();
1768         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
1769     }
1770 
1771     bool is_item_valid() {
1772         return this->my_tail > 0;
1773     }
1774 
1775     void try_put_and_add_task(graph_task*& last_task) {
1776         graph_task * new_task = this->my_successors.try_put_task(this->prio());
1777         if (new_task) {
1778             // workaround for icc bug
1779             graph& graph_ref = this->graph_reference();
1780             last_task = combine_tasks(graph_ref, last_task, new_task);
1781             prio_pop();
1782         }
1783     }
1784 
1785 private:
1786     Compare compare;
1787     size_type mark;
1788 
1789     input_type reserved_item;
1790 
1791     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
1792     bool prio_use_tail() {
1793         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
1794         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
1795     }
1796 
1797     // prio_push: checks that the item will fit, expand array if necessary, put at end
1798     void prio_push(const T &src) {
1799         if ( this->my_tail >= this->my_array_size )
1800             this->grow_my_array( this->my_tail + 1 );
1801         (void) this->place_item(this->my_tail, src);
1802         ++(this->my_tail);
1803         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
1804     }
1805 
1806     // prio_pop: deletes highest priority item from the array, and if it is item
1807     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
1808     // and mark.  Assumes the array has already been tested for emptiness; no failure.
1809     void prio_pop()  {
1810         if (prio_use_tail()) {
1811             // there are newly pushed elements; last one higher than top
1812             // copy the data
1813             this->destroy_item(this->my_tail-1);
1814             --(this->my_tail);
1815             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1816             return;
1817         }
1818         this->destroy_item(0);
1819         if(this->my_tail > 1) {
1820             // push the last element down heap
1821             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
1822             this->move_item(0,this->my_tail - 1);
1823         }
1824         --(this->my_tail);
1825         if(mark > this->my_tail) --mark;
1826         if (this->my_tail > 1) // don't reheap for heap of size 1
1827             reheap();
1828         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
1829     }
1830 
1831     const T& prio() {
1832         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
1833     }
1834 
1835     // turn array into heap
1836     void heapify() {
1837         if(this->my_tail == 0) {
1838             mark = 0;
1839             return;
1840         }
1841         if (!mark) mark = 1;
1842         for (; mark<this->my_tail; ++mark) { // for each unheaped element
1843             size_type cur_pos = mark;
1844             input_type to_place;
1845             this->fetch_item(mark,to_place);
1846             do { // push to_place up the heap
1847                 size_type parent = (cur_pos-1)>>1;
1848                 if (!compare(this->get_my_item(parent), to_place))
1849                     break;
1850                 this->move_item(cur_pos, parent);
1851                 cur_pos = parent;
1852             } while( cur_pos );
1853             (void) this->place_item(cur_pos, to_place);
1854         }
1855     }
1856 
1857     // otherwise heapified array with new root element; rearrange to heap
1858     void reheap() {
1859         size_type cur_pos=0, child=1;
1860         while (child < mark) {
1861             size_type target = child;
1862             if (child+1<mark &&
1863                 compare(this->get_my_item(child),
1864                         this->get_my_item(child+1)))
1865                 ++target;
1866             // target now has the higher priority child
1867             if (compare(this->get_my_item(target),
1868                         this->get_my_item(cur_pos)))
1869                 break;
1870             // swap
1871             this->swap_items(cur_pos, target);
1872             cur_pos = target;
1873             child = (cur_pos<<1)+1;
1874         }
1875     }
1876 };  // priority_queue_node
1877 
1878 //! Forwards messages only if the threshold has not been reached
1879 /** This node forwards items until its threshold is reached.
1880     It contains no buffering.  If the downstream node rejects, the
1881     message is dropped. */
1882 template< typename T, typename DecrementType=continue_msg >
1883 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
1884 public:
1885     typedef T input_type;
1886     typedef T output_type;
1887     typedef typename receiver<input_type>::predecessor_type predecessor_type;
1888     typedef typename sender<output_type>::successor_type successor_type;
1889     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
1890 
1891 private:
1892     size_t my_threshold;
1893     size_t my_count; // number of successful puts
1894     size_t my_tries; // number of active put attempts
1895     size_t my_future_decrement; // number of active decrement
1896     reservable_predecessor_cache< T, spin_mutex > my_predecessors;
1897     spin_mutex my_mutex;
1898     broadcast_cache< T > my_successors;
1899 
1900     //! The internal receiver< DecrementType > that adjusts the count
1901     threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
1902 
1903     graph_task* decrement_counter( long long delta ) {
1904         if ( delta > 0 && size_t(delta) > my_threshold ) {
1905             delta = my_threshold;
1906         }
1907 
1908         {
1909             spin_mutex::scoped_lock lock(my_mutex);
1910             if ( delta > 0 && size_t(delta) > my_count ) {
1911                 if( my_tries > 0 ) {
1912                     my_future_decrement += (size_t(delta) - my_count);
1913                 }
1914                 my_count = 0;
1915             }
1916             else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
1917                 my_count = my_threshold;
1918             }
1919             else {
1920                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
1921             }
1922             __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
1923         }
1924         return forward_task();
1925     }
1926 
1927     // Let threshold_regulator call decrement_counter()
1928     friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
1929 
1930     friend class forward_task_bypass< limiter_node<T,DecrementType> >;
1931 
1932     bool check_conditions() {  // always called under lock
1933         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
1934     }
1935 
1936     // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED
1937     graph_task* forward_task() {
1938         input_type v;
1939         graph_task* rval = nullptr;
1940         bool reserved = false;
1941 
1942         {
1943             spin_mutex::scoped_lock lock(my_mutex);
1944             if ( check_conditions() )
1945                 ++my_tries;
1946             else
1947                 return nullptr;
1948         }
1949 
1950         //SUCCESS
1951         // if we can reserve and can put, we consume the reservation
1952         // we increment the count and decrement the tries
1953         if ( (my_predecessors.try_reserve(v)) == true ) {
1954             reserved = true;
1955             if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
1956                 {
1957                     spin_mutex::scoped_lock lock(my_mutex);
1958                     ++my_count;
1959                     if ( my_future_decrement ) {
1960                         if ( my_count > my_future_decrement ) {
1961                             my_count -= my_future_decrement;
1962                             my_future_decrement = 0;
1963                         }
1964                         else {
1965                             my_future_decrement -= my_count;
1966                             my_count = 0;
1967                         }
1968                     }
1969                     --my_tries;
1970                     my_predecessors.try_consume();
1971                     if ( check_conditions() ) {
1972                         if ( is_graph_active(this->my_graph) ) {
1973                             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1974                             small_object_allocator allocator{};
1975                             graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
1976                             my_graph.reserve_wait();
1977                             spawn_in_graph_arena(graph_reference(), *rtask);
1978                         }
1979                     }
1980                 }
1981                 return rval;
1982             }
1983         }
1984         //FAILURE
1985         //if we can't reserve, we decrement the tries
1986         //if we can reserve but can't put, we decrement the tries and release the reservation
1987         {
1988             spin_mutex::scoped_lock lock(my_mutex);
1989             --my_tries;
1990             if (reserved) my_predecessors.try_release();
1991             if ( check_conditions() ) {
1992                 if ( is_graph_active(this->my_graph) ) {
1993                     small_object_allocator allocator{};
1994                     typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
1995                     graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
1996                     my_graph.reserve_wait();
1997                     __TBB_ASSERT(!rval, "Have two tasks to handle");
1998                     return t;
1999                 }
2000             }
2001             return rval;
2002         }
2003     }
2004 
2005     void initialize() {
2006         fgt_node(
2007             CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
2008             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
2009             static_cast<sender<output_type> *>(this)
2010         );
2011     }
2012 
2013 public:
2014     //! Constructor
2015     limiter_node(graph &g, size_t threshold)
2016         : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
2017         my_predecessors(this), my_successors(this), decrement(this)
2018     {
2019         initialize();
2020     }
2021 
2022 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2023     template <typename... Args>
2024     limiter_node(const node_set<Args...>& nodes, size_t threshold)
2025         : limiter_node(nodes.graph_reference(), threshold) {
2026         make_edges_in_order(nodes, *this);
2027     }
2028 #endif
2029 
2030     //! Copy constructor
2031     limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
2032 
2033     //! The interface for accessing internal receiver< DecrementType > that adjusts the count
2034     receiver<DecrementType>& decrementer() { return decrement; }
2035 
2036     //! Replace the current successor with this new successor
2037     bool register_successor( successor_type &r ) override {
2038         spin_mutex::scoped_lock lock(my_mutex);
2039         bool was_empty = my_successors.empty();
2040         my_successors.register_successor(r);
2041         //spawn a forward task if this is the only successor
2042         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
2043             if ( is_graph_active(this->my_graph) ) {
2044                 small_object_allocator allocator{};
2045                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2046                 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2047                 my_graph.reserve_wait();
2048                 spawn_in_graph_arena(graph_reference(), *t);
2049             }
2050         }
2051         return true;
2052     }
2053 
2054     //! Removes a successor from this node
2055     /** r.remove_predecessor(*this) is also called. */
2056     bool remove_successor( successor_type &r ) override {
2057         // TODO revamp: investigate why qualification is needed for remove_predecessor() call
2058         tbb::detail::d1::remove_predecessor(r, *this);
2059         my_successors.remove_successor(r);
2060         return true;
2061     }
2062 
2063     //! Adds src to the list of cached predecessors.
2064     bool register_predecessor( predecessor_type &src ) override {
2065         spin_mutex::scoped_lock lock(my_mutex);
2066         my_predecessors.add( src );
2067         if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
2068             small_object_allocator allocator{};
2069             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2070             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
2071             my_graph.reserve_wait();
2072             spawn_in_graph_arena(graph_reference(), *t);
2073         }
2074         return true;
2075     }
2076 
2077     //! Removes src from the list of cached predecessors.
2078     bool remove_predecessor( predecessor_type &src ) override {
2079         my_predecessors.remove( src );
2080         return true;
2081     }
2082 
2083 protected:
2084 
2085     template< typename R, typename B > friend class run_and_put_task;
2086     template<typename X, typename Y> friend class broadcast_cache;
2087     template<typename X, typename Y> friend class round_robin_cache;
2088     //! Puts an item to this receiver
2089     graph_task* try_put_task( const T &t ) override {
2090         {
2091             spin_mutex::scoped_lock lock(my_mutex);
2092             if ( my_count + my_tries >= my_threshold )
2093                 return nullptr;
2094             else
2095                 ++my_tries;
2096         }
2097 
2098         graph_task* rtask = my_successors.try_put_task(t);
2099         if ( !rtask ) {  // try_put_task failed.
2100             spin_mutex::scoped_lock lock(my_mutex);
2101             --my_tries;
2102             if (check_conditions() && is_graph_active(this->my_graph)) {
2103                 small_object_allocator allocator{};
2104                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
2105                 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
2106                 my_graph.reserve_wait();
2107             }
2108         }
2109         else {
2110             spin_mutex::scoped_lock lock(my_mutex);
2111             ++my_count;
2112             if ( my_future_decrement ) {
2113                 if ( my_count > my_future_decrement ) {
2114                     my_count -= my_future_decrement;
2115                     my_future_decrement = 0;
2116                 }
2117                 else {
2118                     my_future_decrement -= my_count;
2119                     my_count = 0;
2120                 }
2121             }
2122             --my_tries;
2123         }
2124         return rtask;
2125     }
2126 
2127     graph& graph_reference() const override { return my_graph; }
2128 
2129     void reset_node( reset_flags f ) override {
2130         my_count = 0;
2131         if ( f & rf_clear_edges ) {
2132             my_predecessors.clear();
2133             my_successors.clear();
2134         }
2135         else {
2136             my_predecessors.reset();
2137         }
2138         decrement.reset_receiver(f);
2139     }
2140 };  // limiter_node
2141 
2142 #include "detail/_flow_graph_join_impl.h"
2143 
2144 template<typename OutputTuple, typename JP=queueing> class join_node;
2145 
2146 template<typename OutputTuple>
2147 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
2148 private:
2149     static const int N = std::tuple_size<OutputTuple>::value;
2150     typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
2151 public:
2152     typedef OutputTuple output_type;
2153     typedef typename unfolded_type::input_ports_type input_ports_type;
2154      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2155         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2156                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2157     }
2158 
2159 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2160     template <typename... Args>
2161     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
2162         make_edges_in_order(nodes, *this);
2163     }
2164 #endif
2165 
2166     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2167         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
2168                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2169     }
2170 
2171 };
2172 
2173 template<typename OutputTuple>
2174 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
2175 private:
2176     static const int N = std::tuple_size<OutputTuple>::value;
2177     typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
2178 public:
2179     typedef OutputTuple output_type;
2180     typedef typename unfolded_type::input_ports_type input_ports_type;
2181      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
2182         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2183                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2184     }
2185 
2186 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2187     template <typename... Args>
2188     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
2189         make_edges_in_order(nodes, *this);
2190     }
2191 #endif
2192 
2193     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2194         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
2195                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
2196     }
2197 
2198 };
2199 
2200 #if __TBB_CPP20_CONCEPTS_PRESENT
2201 // Helper function which is well-formed only if all of the elements in OutputTuple
2202 // satisfies join_node_function_object<body[i], tuple[i], K>
2203 template <typename OutputTuple, typename K,
2204           typename... Functions, std::size_t... Idx>
2205 void join_node_function_objects_helper( std::index_sequence<Idx...> )
2206     requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2207              (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2208 
2209 template <typename OutputTuple, typename K, typename... Functions>
2210 concept join_node_functions = requires {
2211     join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2212 };
2213 
2214 #endif
2215 
2216 // template for key_matching join_node
2217 // tag_matching join_node is a specialization of key_matching, and is source-compatible.
2218 template<typename OutputTuple, typename K, typename KHash>
2219 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
2220       key_matching_port, OutputTuple, key_matching<K,KHash> > {
2221 private:
2222     static const int N = std::tuple_size<OutputTuple>::value;
2223     typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
2224 public:
2225     typedef OutputTuple output_type;
2226     typedef typename unfolded_type::input_ports_type input_ports_type;
2227 
2228 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
2229     join_node(graph &g) : unfolded_type(g) {}
2230 #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
2231 
2232     template<typename __TBB_B0, typename __TBB_B1>
2233         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
2234      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
2235         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2236                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2237     }
2238     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
2239         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
2240      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2241         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2242                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2243     }
2244     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
2245         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
2246      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2247         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2248                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2249     }
2250     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
2251         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
2252      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2253             unfolded_type(g, b0, b1, b2, b3, b4) {
2254         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2255                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2256     }
2257 #if __TBB_VARIADIC_MAX >= 6
2258     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2259         typename __TBB_B5>
2260         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
2261      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2262             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2263         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2264                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2265     }
2266 #endif
2267 #if __TBB_VARIADIC_MAX >= 7
2268     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2269         typename __TBB_B5, typename __TBB_B6>
2270         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
2271      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2272             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2273         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2274                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2275     }
2276 #endif
2277 #if __TBB_VARIADIC_MAX >= 8
2278     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2279         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
2280         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
2281      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2282             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2283         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2284                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2285     }
2286 #endif
2287 #if __TBB_VARIADIC_MAX >= 9
2288     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2289         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
2290         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
2291      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2292             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2293         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2294                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2295     }
2296 #endif
2297 #if __TBB_VARIADIC_MAX >= 10
2298     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
2299         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
2300         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
2301      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2302             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2303         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2304                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2305     }
2306 #endif
2307 
2308 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2309     template <
2310 #if (__clang_major__ == 3 && __clang_minor__ == 4)
2311         // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2312         template<typename...> class node_set,
2313 #endif
2314         typename... Args, typename... Bodies
2315     >
2316     __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
2317     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
2318         : join_node(nodes.graph_reference(), bodies...) {
2319         make_edges_in_order(nodes, *this);
2320     }
2321 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2322 
2323     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
2324         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
2325                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2326     }
2327 
2328 };
2329 
2330 // indexer node
2331 #include "detail/_flow_graph_indexer_impl.h"
2332 
2333 // TODO: Implement interface with variadic template or tuple
2334 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
2335                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
2336                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
2337 
2338 //indexer node specializations
2339 template<typename T0>
2340 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
2341 private:
2342     static const int N = 1;
2343 public:
2344     typedef std::tuple<T0> InputTuple;
2345     typedef tagged_msg<size_t, T0> output_type;
2346     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2347     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2348         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2349                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2350     }
2351 
2352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2353     template <typename... Args>
2354     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2355         make_edges_in_order(nodes, *this);
2356     }
2357 #endif
2358 
2359     // Copy constructor
2360     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2361         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2362                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2363     }
2364 };
2365 
2366 template<typename T0, typename T1>
2367 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
2368 private:
2369     static const int N = 2;
2370 public:
2371     typedef std::tuple<T0, T1> InputTuple;
2372     typedef tagged_msg<size_t, T0, T1> output_type;
2373     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2374     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2375         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2376                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2377     }
2378 
2379 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2380     template <typename... Args>
2381     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2382         make_edges_in_order(nodes, *this);
2383     }
2384 #endif
2385 
2386     // Copy constructor
2387     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2388         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2389                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2390     }
2391 
2392 };
2393 
2394 template<typename T0, typename T1, typename T2>
2395 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
2396 private:
2397     static const int N = 3;
2398 public:
2399     typedef std::tuple<T0, T1, T2> InputTuple;
2400     typedef tagged_msg<size_t, T0, T1, T2> output_type;
2401     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2402     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2403         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2404                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2405     }
2406 
2407 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2408     template <typename... Args>
2409     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2410         make_edges_in_order(nodes, *this);
2411     }
2412 #endif
2413 
2414     // Copy constructor
2415     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2416         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2417                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2418     }
2419 
2420 };
2421 
2422 template<typename T0, typename T1, typename T2, typename T3>
2423 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
2424 private:
2425     static const int N = 4;
2426 public:
2427     typedef std::tuple<T0, T1, T2, T3> InputTuple;
2428     typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
2429     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2430     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2431         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2432                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2433     }
2434 
2435 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2436     template <typename... Args>
2437     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2438         make_edges_in_order(nodes, *this);
2439     }
2440 #endif
2441 
2442     // Copy constructor
2443     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2444         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2445                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2446     }
2447 
2448 };
2449 
2450 template<typename T0, typename T1, typename T2, typename T3, typename T4>
2451 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
2452 private:
2453     static const int N = 5;
2454 public:
2455     typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
2456     typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
2457     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2458     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2459         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2460                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2461     }
2462 
2463 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2464     template <typename... Args>
2465     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2466         make_edges_in_order(nodes, *this);
2467     }
2468 #endif
2469 
2470     // Copy constructor
2471     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2472         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2473                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2474     }
2475 
2476 };
2477 
2478 #if __TBB_VARIADIC_MAX >= 6
2479 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
2480 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
2481 private:
2482     static const int N = 6;
2483 public:
2484     typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
2485     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
2486     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2487     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2488         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2489                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2490     }
2491 
2492 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2493     template <typename... Args>
2494     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2495         make_edges_in_order(nodes, *this);
2496     }
2497 #endif
2498 
2499     // Copy constructor
2500     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2501         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2502                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2503     }
2504 
2505 };
2506 #endif //variadic max 6
2507 
2508 #if __TBB_VARIADIC_MAX >= 7
2509 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2510          typename T6>
2511 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
2512 private:
2513     static const int N = 7;
2514 public:
2515     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
2516     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
2517     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2518     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2519         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2520                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2521     }
2522 
2523 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2524     template <typename... Args>
2525     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2526         make_edges_in_order(nodes, *this);
2527     }
2528 #endif
2529 
2530     // Copy constructor
2531     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2532         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2533                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2534     }
2535 
2536 };
2537 #endif //variadic max 7
2538 
2539 #if __TBB_VARIADIC_MAX >= 8
2540 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2541          typename T6, typename T7>
2542 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
2543 private:
2544     static const int N = 8;
2545 public:
2546     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
2547     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
2548     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2549     indexer_node(graph& g) : unfolded_type(g) {
2550         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2551                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2552     }
2553 
2554 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2555     template <typename... Args>
2556     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2557         make_edges_in_order(nodes, *this);
2558     }
2559 #endif
2560 
2561     // Copy constructor
2562     indexer_node( const indexer_node& other ) : unfolded_type(other) {
2563         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2564                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2565     }
2566 
2567 };
2568 #endif //variadic max 8
2569 
2570 #if __TBB_VARIADIC_MAX >= 9
2571 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2572          typename T6, typename T7, typename T8>
2573 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
2574 private:
2575     static const int N = 9;
2576 public:
2577     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
2578     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
2579     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2580     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2581         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2582                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2583     }
2584 
2585 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2586     template <typename... Args>
2587     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2588         make_edges_in_order(nodes, *this);
2589     }
2590 #endif
2591 
2592     // Copy constructor
2593     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2594         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2595                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2596     }
2597 
2598 };
2599 #endif //variadic max 9
2600 
2601 #if __TBB_VARIADIC_MAX >= 10
2602 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
2603          typename T6, typename T7, typename T8, typename T9>
2604 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
2605 private:
2606     static const int N = 10;
2607 public:
2608     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
2609     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
2610     typedef unfolded_indexer_node<InputTuple> unfolded_type;
2611     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
2612         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2613                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2614     }
2615 
2616 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2617     template <typename... Args>
2618     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
2619         make_edges_in_order(nodes, *this);
2620     }
2621 #endif
2622 
2623     // Copy constructor
2624     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
2625         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
2626                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
2627     }
2628 
2629 };
2630 #endif //variadic max 10
2631 
2632 template< typename T >
2633 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
2634     register_successor(p, s);
2635     fgt_make_edge( &p, &s );
2636 }
2637 
2638 //! Makes an edge between a single predecessor and a single successor
2639 template< typename T >
2640 inline void make_edge( sender<T> &p, receiver<T> &s ) {
2641     internal_make_edge( p, s );
2642 }
2643 
2644 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
2645 template< typename T, typename V,
2646           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2647 inline void make_edge( T& output, V& input) {
2648     make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2649 }
2650 
2651 //Makes an edge from port 0 of a multi-output predecessor to a receiver.
2652 template< typename T, typename R,
2653           typename = typename T::output_ports_type >
2654 inline void make_edge( T& output, receiver<R>& input) {
2655      make_edge(std::get<0>(output.output_ports()), input);
2656 }
2657 
2658 //Makes an edge from a sender to port 0 of a multi-input successor.
2659 template< typename S,  typename V,
2660           typename = typename V::input_ports_type >
2661 inline void make_edge( sender<S>& output, V& input) {
2662      make_edge(output, std::get<0>(input.input_ports()));
2663 }
2664 
2665 template< typename T >
2666 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
2667     remove_successor( p, s );
2668     fgt_remove_edge( &p, &s );
2669 }
2670 
2671 //! Removes an edge between a single predecessor and a single successor
2672 template< typename T >
2673 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
2674     internal_remove_edge( p, s );
2675 }
2676 
2677 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
2678 template< typename T, typename V,
2679           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
2680 inline void remove_edge( T& output, V& input) {
2681     remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
2682 }
2683 
2684 //Removes an edge between port 0 of a multi-output predecessor and a receiver.
2685 template< typename T, typename R,
2686           typename = typename T::output_ports_type >
2687 inline void remove_edge( T& output, receiver<R>& input) {
2688      remove_edge(std::get<0>(output.output_ports()), input);
2689 }
2690 //Removes an edge between a sender and port 0 of a multi-input successor.
2691 template< typename S,  typename V,
2692           typename = typename V::input_ports_type >
2693 inline void remove_edge( sender<S>& output, V& input) {
2694      remove_edge(output, std::get<0>(input.input_ports()));
2695 }
2696 
2697 //! Returns a copy of the body from a function or continue node
2698 template< typename Body, typename Node >
2699 Body copy_body( Node &n ) {
2700     return n.template copy_function_object<Body>();
2701 }
2702 
2703 //composite_node
2704 template< typename InputTuple, typename OutputTuple > class composite_node;
2705 
2706 template< typename... InputTypes, typename... OutputTypes>
2707 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
2708 
2709 public:
2710     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2711     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2712 
2713 private:
2714     std::unique_ptr<input_ports_type> my_input_ports;
2715     std::unique_ptr<output_ports_type> my_output_ports;
2716 
2717     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2718     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2719 
2720 protected:
2721     void reset_node(reset_flags) override {}
2722 
2723 public:
2724     composite_node( graph &g ) : graph_node(g) {
2725         fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
2726     }
2727 
2728     template<typename T1, typename T2>
2729     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
2730         static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2731         static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2732 
2733         fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2734         fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2735 
2736         my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
2737         my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
2738     }
2739 
2740     template< typename... NodeTypes >
2741     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2742 
2743     template< typename... NodeTypes >
2744     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2745 
2746 
2747     input_ports_type& input_ports() {
2748          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2749          return *my_input_ports;
2750     }
2751 
2752     output_ports_type& output_ports() {
2753          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2754          return *my_output_ports;
2755     }
2756 };  // class composite_node
2757 
2758 //composite_node with only input ports
2759 template< typename... InputTypes>
2760 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
2761 public:
2762     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
2763 
2764 private:
2765     std::unique_ptr<input_ports_type> my_input_ports;
2766     static const size_t NUM_INPUTS = sizeof...(InputTypes);
2767 
2768 protected:
2769     void reset_node(reset_flags) override {}
2770 
2771 public:
2772     composite_node( graph &g ) : graph_node(g) {
2773         fgt_composite( CODEPTR(), this, &g );
2774     }
2775 
2776    template<typename T>
2777    void set_external_ports(T&& input_ports_tuple) {
2778        static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
2779 
2780        fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
2781 
2782        my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
2783    }
2784 
2785     template< typename... NodeTypes >
2786     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2787 
2788     template< typename... NodeTypes >
2789     void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2790 
2791 
2792     input_ports_type& input_ports() {
2793          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
2794          return *my_input_ports;
2795     }
2796 
2797 };  // class composite_node
2798 
2799 //composite_nodes with only output_ports
2800 template<typename... OutputTypes>
2801 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
2802 public:
2803     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
2804 
2805 private:
2806     std::unique_ptr<output_ports_type> my_output_ports;
2807     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
2808 
2809 protected:
2810     void reset_node(reset_flags) override {}
2811 
2812 public:
2813     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
2814         fgt_composite( CODEPTR(), this, &g );
2815     }
2816 
2817    template<typename T>
2818    void set_external_ports(T&& output_ports_tuple) {
2819        static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
2820 
2821        fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
2822 
2823        my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
2824    }
2825 
2826     template<typename... NodeTypes >
2827     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
2828 
2829     template<typename... NodeTypes >
2830     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
2831 
2832 
2833     output_ports_type& output_ports() {
2834          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
2835          return *my_output_ports;
2836     }
2837 
2838 };  // class composite_node
2839 
2840 template<typename Gateway>
2841 class async_body_base: no_assign {
2842 public:
2843     typedef Gateway gateway_type;
2844 
2845     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
2846     void set_gateway(gateway_type *gateway) {
2847         my_gateway = gateway;
2848     }
2849 
2850 protected:
2851     gateway_type *my_gateway;
2852 };
2853 
2854 template<typename Input, typename Ports, typename Gateway, typename Body>
2855 class async_body: public async_body_base<Gateway> {
2856 private:
2857     Body my_body;
2858 
2859 public:
2860     typedef async_body_base<Gateway> base_type;
2861     typedef Gateway gateway_type;
2862 
2863     async_body(const Body &body, gateway_type *gateway)
2864         : base_type(gateway), my_body(body) { }
2865 
2866     void operator()( const Input &v, Ports & ) noexcept(noexcept(my_body(v, std::declval<gateway_type&>()))) {
2867         my_body(v, *this->my_gateway);
2868     }
2869 
2870     Body get_body() { return my_body; }
2871 };
2872 
2873 //! Implements async node
2874 template < typename Input, typename Output,
2875            typename Policy = queueing_lightweight >
2876     __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
2877 class async_node
2878     : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
2879 {
2880     typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
2881     typedef multifunction_input<
2882         Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
2883 
2884 public:
2885     typedef Input input_type;
2886     typedef Output output_type;
2887     typedef receiver<input_type> receiver_type;
2888     typedef receiver<output_type> successor_type;
2889     typedef sender<input_type> predecessor_type;
2890     typedef receiver_gateway<output_type> gateway_type;
2891     typedef async_body_base<gateway_type> async_body_base_type;
2892     typedef typename base_type::output_ports_type output_ports_type;
2893 
2894 private:
2895     class receiver_gateway_impl: public receiver_gateway<Output> {
2896     public:
2897         receiver_gateway_impl(async_node* node): my_node(node) {}
2898         void reserve_wait() override {
2899             fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
2900             my_node->my_graph.reserve_wait();
2901         }
2902 
2903         void release_wait() override {
2904             async_node* n = my_node;
2905             graph* g = &n->my_graph;
2906             g->release_wait();
2907             fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
2908         }
2909 
2910         //! Implements gateway_type::try_put for an external activity to submit a message to FG
2911         bool try_put(const Output &i) override {
2912             return my_node->try_put_impl(i);
2913         }
2914 
2915     private:
2916         async_node* my_node;
2917     } my_gateway;
2918 
2919     //The substitute of 'this' for member construction, to prevent compiler warnings
2920     async_node* self() { return this; }
2921 
2922     //! Implements gateway_type::try_put for an external activity to submit a message to FG
2923     bool try_put_impl(const Output &i) {
2924         multifunction_output<Output> &port_0 = output_port<0>(*this);
2925         broadcast_cache<output_type>& port_successors = port_0.successors();
2926         fgt_async_try_put_begin(this, &port_0);
2927         // TODO revamp: change to std::list<graph_task*>
2928         graph_task_list tasks;
2929         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
2930         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
2931                       "Return status is inconsistent with the method operation." );
2932 
2933         while( !tasks.empty() ) {
2934             enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
2935         }
2936         fgt_async_try_put_end(this, &port_0);
2937         return is_at_least_one_put_successful;
2938     }
2939 
2940 public:
2941     template<typename Body>
2942         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2943     __TBB_NOINLINE_SYM async_node(
2944         graph &g, size_t concurrency,
2945         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
2946     ) : base_type(
2947         g, concurrency,
2948         async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
2949         (body, &my_gateway), a_priority ), my_gateway(self()) {
2950         fgt_multioutput_node_with_body<1>(
2951             CODEPTR(), FLOW_ASYNC_NODE,
2952             &this->my_graph, static_cast<receiver<input_type> *>(this),
2953             this->output_ports(), this->my_body
2954         );
2955     }
2956 
2957     template <typename Body>
2958         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2959     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2960         : async_node(g, concurrency, body, Policy(), a_priority) {}
2961 
2962 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2963     template <typename Body, typename... Args>
2964         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2965     __TBB_NOINLINE_SYM async_node(
2966         const node_set<Args...>& nodes, size_t concurrency, Body body,
2967         Policy = Policy(), node_priority_t a_priority = no_priority )
2968         : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
2969         make_edges_in_order(nodes, *this);
2970     }
2971 
2972     template <typename Body, typename... Args>
2973         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2974     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2975         : async_node(nodes, concurrency, body, Policy(), a_priority) {}
2976 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2977 
2978     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
2979         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
2980         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
2981 
2982         fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
2983                 &this->my_graph, static_cast<receiver<input_type> *>(this),
2984                 this->output_ports(), this->my_body );
2985     }
2986 
2987     gateway_type& gateway() {
2988         return my_gateway;
2989     }
2990 
2991     // Define sender< Output >
2992 
2993     //! Add a new successor to this node
2994     bool register_successor(successor_type&) override {
2995         __TBB_ASSERT(false, "Successors must be registered only via ports");
2996         return false;
2997     }
2998 
2999     //! Removes a successor from this node
3000     bool remove_successor(successor_type&) override {
3001         __TBB_ASSERT(false, "Successors must be removed only via ports");
3002         return false;
3003     }
3004 
3005     template<typename Body>
3006     Body copy_function_object() {
3007         typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
3008         typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
3009         mfn_body_type &body_ref = *this->my_body;
3010         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
3011         return ab.get_body();
3012     }
3013 
3014 protected:
3015 
3016     void reset_node( reset_flags f) override {
3017        base_type::reset_node(f);
3018     }
3019 };
3020 
3021 #include "detail/_flow_graph_node_set_impl.h"
3022 
3023 template< typename T >
3024 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
3025 public:
3026     typedef T input_type;
3027     typedef T output_type;
3028     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3029     typedef typename sender<output_type>::successor_type successor_type;
3030 
3031     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
3032         : graph_node(g), my_successors(this), my_buffer_is_valid(false)
3033     {
3034         fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
3035                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
3036     }
3037 
3038 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3039     template <typename... Args>
3040     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
3041         make_edges_in_order(nodes, *this);
3042     }
3043 #endif
3044 
3045     //! Copy constructor; doesn't take anything from src; default won't work
3046     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
3047 
3048     ~overwrite_node() {}
3049 
3050     bool register_successor( successor_type &s ) override {
3051         spin_mutex::scoped_lock l( my_mutex );
3052         if (my_buffer_is_valid && is_graph_active( my_graph )) {
3053             // We have a valid value that must be forwarded immediately.
3054             bool ret = s.try_put( my_buffer );
3055             if ( ret ) {
3056                 // We add the successor that accepted our put
3057                 my_successors.register_successor( s );
3058             } else {
3059                 // In case of reservation a race between the moment of reservation and register_successor can appear,
3060                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
3061                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
3062                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
3063                 small_object_allocator allocator{};
3064                 typedef register_predecessor_task task_type;
3065                 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
3066                 graph_reference().reserve_wait();
3067                 spawn_in_graph_arena( my_graph, *t );
3068             }
3069         } else {
3070             // No valid value yet, just add as successor
3071             my_successors.register_successor( s );
3072         }
3073         return true;
3074     }
3075 
3076     bool remove_successor( successor_type &s ) override {
3077         spin_mutex::scoped_lock l( my_mutex );
3078         my_successors.remove_successor(s);
3079         return true;
3080     }
3081 
3082     bool try_get( input_type &v ) override {
3083         spin_mutex::scoped_lock l( my_mutex );
3084         if ( my_buffer_is_valid ) {
3085             v = my_buffer;
3086             return true;
3087         }
3088         return false;
3089     }
3090 
3091     //! Reserves an item
3092     bool try_reserve( T &v ) override {
3093         return try_get(v);
3094     }
3095 
3096     //! Releases the reserved item
3097     bool try_release() override { return true; }
3098 
3099     //! Consumes the reserved item
3100     bool try_consume() override { return true; }
3101 
3102     bool is_valid() {
3103        spin_mutex::scoped_lock l( my_mutex );
3104        return my_buffer_is_valid;
3105     }
3106 
3107     void clear() {
3108        spin_mutex::scoped_lock l( my_mutex );
3109        my_buffer_is_valid = false;
3110     }
3111 
3112 protected:
3113 
3114     template< typename R, typename B > friend class run_and_put_task;
3115     template<typename X, typename Y> friend class broadcast_cache;
3116     template<typename X, typename Y> friend class round_robin_cache;
3117     graph_task* try_put_task( const input_type &v ) override {
3118         spin_mutex::scoped_lock l( my_mutex );
3119         return try_put_task_impl(v);
3120     }
3121 
3122     graph_task * try_put_task_impl(const input_type &v) {
3123         my_buffer = v;
3124         my_buffer_is_valid = true;
3125         graph_task* rtask = my_successors.try_put_task(v);
3126         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
3127         return rtask;
3128     }
3129 
3130     graph& graph_reference() const override {
3131         return my_graph;
3132     }
3133 
3134     //! Breaks an infinite loop between the node reservation and register_successor call
3135     struct register_predecessor_task : public graph_task {
3136         register_predecessor_task(
3137             graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
3138             : graph_task(g, allocator), o(owner), s(succ) {};
3139 
3140         task* execute(execution_data& ed) override {
3141             // TODO revamp: investigate why qualification is needed for register_successor() call
3142             using tbb::detail::d1::register_predecessor;
3143             using tbb::detail::d1::register_successor;
3144             if ( !register_predecessor(s, o) ) {
3145                 register_successor(o, s);
3146             }
3147             finalize<register_predecessor_task>(ed);
3148             return nullptr;
3149         }
3150 
3151         task* cancel(execution_data& ed) override {
3152             finalize<register_predecessor_task>(ed);
3153             return nullptr;
3154         }
3155 
3156         predecessor_type& o;
3157         successor_type& s;
3158     };
3159 
3160     spin_mutex my_mutex;
3161     broadcast_cache< input_type, null_rw_mutex > my_successors;
3162     input_type my_buffer;
3163     bool my_buffer_is_valid;
3164 
3165     void reset_node( reset_flags f) override {
3166         my_buffer_is_valid = false;
3167        if (f&rf_clear_edges) {
3168            my_successors.clear();
3169        }
3170     }
3171 };  // overwrite_node
3172 
3173 template< typename T >
3174 class write_once_node : public overwrite_node<T> {
3175 public:
3176     typedef T input_type;
3177     typedef T output_type;
3178     typedef overwrite_node<T> base_type;
3179     typedef typename receiver<input_type>::predecessor_type predecessor_type;
3180     typedef typename sender<output_type>::successor_type successor_type;
3181 
3182     //! Constructor
3183     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
3184         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3185                                  static_cast<receiver<input_type> *>(this),
3186                                  static_cast<sender<output_type> *>(this) );
3187     }
3188 
3189 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3190     template <typename... Args>
3191     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
3192         make_edges_in_order(nodes, *this);
3193     }
3194 #endif
3195 
3196     //! Copy constructor: call base class copy constructor
3197     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
3198         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
3199                                  static_cast<receiver<input_type> *>(this),
3200                                  static_cast<sender<output_type> *>(this) );
3201     }
3202 
3203 protected:
3204     template< typename R, typename B > friend class run_and_put_task;
3205     template<typename X, typename Y> friend class broadcast_cache;
3206     template<typename X, typename Y> friend class round_robin_cache;
3207     graph_task *try_put_task( const T &v ) override {
3208         spin_mutex::scoped_lock l( this->my_mutex );
3209         return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
3210     }
3211 }; // write_once_node
3212 
3213 inline void set_name(const graph& g, const char *name) {
3214     fgt_graph_desc(&g, name);
3215 }
3216 
3217 template <typename Output>
3218 inline void set_name(const input_node<Output>& node, const char *name) {
3219     fgt_node_desc(&node, name);
3220 }
3221 
3222 template <typename Input, typename Output, typename Policy>
3223 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
3224     fgt_node_desc(&node, name);
3225 }
3226 
3227 template <typename Output, typename Policy>
3228 inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
3229     fgt_node_desc(&node, name);
3230 }
3231 
3232 template <typename T>
3233 inline void set_name(const broadcast_node<T>& node, const char *name) {
3234     fgt_node_desc(&node, name);
3235 }
3236 
3237 template <typename T>
3238 inline void set_name(const buffer_node<T>& node, const char *name) {
3239     fgt_node_desc(&node, name);
3240 }
3241 
3242 template <typename T>
3243 inline void set_name(const queue_node<T>& node, const char *name) {
3244     fgt_node_desc(&node, name);
3245 }
3246 
3247 template <typename T>
3248 inline void set_name(const sequencer_node<T>& node, const char *name) {
3249     fgt_node_desc(&node, name);
3250 }
3251 
3252 template <typename T, typename Compare>
3253 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
3254     fgt_node_desc(&node, name);
3255 }
3256 
3257 template <typename T, typename DecrementType>
3258 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
3259     fgt_node_desc(&node, name);
3260 }
3261 
3262 template <typename OutputTuple, typename JP>
3263 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
3264     fgt_node_desc(&node, name);
3265 }
3266 
3267 template <typename... Types>
3268 inline void set_name(const indexer_node<Types...>& node, const char *name) {
3269     fgt_node_desc(&node, name);
3270 }
3271 
3272 template <typename T>
3273 inline void set_name(const overwrite_node<T>& node, const char *name) {
3274     fgt_node_desc(&node, name);
3275 }
3276 
3277 template <typename T>
3278 inline void set_name(const write_once_node<T>& node, const char *name) {
3279     fgt_node_desc(&node, name);
3280 }
3281 
3282 template<typename Input, typename Output, typename Policy>
3283 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
3284     fgt_multioutput_node_desc(&node, name);
3285 }
3286 
3287 template<typename TupleType>
3288 inline void set_name(const split_node<TupleType>& node, const char *name) {
3289     fgt_multioutput_node_desc(&node, name);
3290 }
3291 
3292 template< typename InputTuple, typename OutputTuple >
3293 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
3294     fgt_multiinput_multioutput_node_desc(&node, name);
3295 }
3296 
3297 template<typename Input, typename Output, typename Policy>
3298 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
3299 {
3300     fgt_multioutput_node_desc(&node, name);
3301 }
3302 } // d1
3303 } // detail
3304 } // tbb
3305 
3306 
3307 // Include deduction guides for node classes
3308 #include "detail/_flow_graph_nodes_deduction.h"
3309 
3310 namespace tbb {
3311 namespace flow {
3312 inline namespace v1 {
3313     using detail::d1::receiver;
3314     using detail::d1::sender;
3315 
3316     using detail::d1::serial;
3317     using detail::d1::unlimited;
3318 
3319     using detail::d1::reset_flags;
3320     using detail::d1::rf_reset_protocol;
3321     using detail::d1::rf_reset_bodies;
3322     using detail::d1::rf_clear_edges;
3323 
3324     using detail::d1::graph;
3325     using detail::d1::graph_node;
3326     using detail::d1::continue_msg;
3327 
3328     using detail::d1::input_node;
3329     using detail::d1::function_node;
3330     using detail::d1::multifunction_node;
3331     using detail::d1::split_node;
3332     using detail::d1::output_port;
3333     using detail::d1::indexer_node;
3334     using detail::d1::tagged_msg;
3335     using detail::d1::cast_to;
3336     using detail::d1::is_a;
3337     using detail::d1::continue_node;
3338     using detail::d1::overwrite_node;
3339     using detail::d1::write_once_node;
3340     using detail::d1::broadcast_node;
3341     using detail::d1::buffer_node;
3342     using detail::d1::queue_node;
3343     using detail::d1::sequencer_node;
3344     using detail::d1::priority_queue_node;
3345     using detail::d1::limiter_node;
3346     using namespace detail::d1::graph_policy_namespace;
3347     using detail::d1::join_node;
3348     using detail::d1::input_port;
3349     using detail::d1::copy_body;
3350     using detail::d1::make_edge;
3351     using detail::d1::remove_edge;
3352     using detail::d1::tag_value;
3353     using detail::d1::composite_node;
3354     using detail::d1::async_node;
3355     using detail::d1::node_priority_t;
3356     using detail::d1::no_priority;
3357 
3358 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
3359     using detail::d1::follows;
3360     using detail::d1::precedes;
3361     using detail::d1::make_node_set;
3362     using detail::d1::make_edges;
3363 #endif
3364 
3365 } // v1
3366 } // flow
3367 
3368     using detail::d1::flow_control;
3369 
3370 namespace profiling {
3371     using detail::d1::set_name;
3372 } // profiling
3373 
3374 } // tbb
3375 
3376 
3377 #if TBB_USE_PROFILING_TOOLS  && ( __unix__ || __APPLE__ )
3378    // We don't do pragma pop here, since it still gives warning on the USER side
3379    #undef __TBB_NOINLINE_SYM
3380 #endif
3381 
3382 #endif // __TBB_flow_graph_H
3383