1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_node_impl_H
18 #define __TBB__flow_graph_node_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #include "_flow_graph_item_buffer_impl.h"
25 
26 template< typename T, typename A >
27 class function_input_queue : public item_buffer<T,A> {
28 public:
29     bool empty() const {
30         return this->buffer_empty();
31     }
32 
33     const T& front() const {
34         return this->item_buffer<T, A>::front();
35     }
36 
37     void pop() {
38         this->destroy_front();
39     }
40 
41     bool push( T& t ) {
42         return this->push_back( t );
43     }
44 };
45 
46 //! Input and scheduling for a function node that takes a type Input as input
47 //  The only up-ref is apply_body_impl, which should implement the function
48 //  call and any handling of the result.
49 template< typename Input, typename Policy, typename A, typename ImplType >
50 class function_input_base : public receiver<Input>, no_assign {
51     enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
52     };
53     typedef function_input_base<Input, Policy, A, ImplType> class_type;
54 
55 public:
56 
57     //! The input type of this receiver
58     typedef Input input_type;
59     typedef typename receiver<input_type>::predecessor_type predecessor_type;
60     typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
61     typedef function_input_queue<input_type, A> input_queue_type;
62     typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
63     static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
64 
65     //! Constructor for function_input_base
66     function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority )
67         : my_graph_ref(g), my_max_concurrency(max_concurrency)
68         , my_concurrency(0), my_priority(a_priority)
69         , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
70         , my_predecessors(this)
71         , forwarder_busy(false)
72     {
73         my_aggregator.initialize_handler(handler_type(this));
74     }
75 
76     //! Copy constructor
77     function_input_base( const function_input_base& src )
78         : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority) {}
79 
80     //! Destructor
81     // The queue is allocated by the constructor for {multi}function_node.
82     // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
83     // This would be an interface-breaking change.
84     virtual ~function_input_base() {
85         if ( my_queue ) delete my_queue;
86     }
87 
88     graph_task* try_put_task( const input_type& t) override {
89         return try_put_task_impl(t, has_policy<lightweight, Policy>());
90     }
91 
92     //! Adds src to the list of cached predecessors.
93     bool register_predecessor( predecessor_type &src ) override {
94         operation_type op_data(reg_pred);
95         op_data.r = &src;
96         my_aggregator.execute(&op_data);
97         return true;
98     }
99 
100     //! Removes src from the list of cached predecessors.
101     bool remove_predecessor( predecessor_type &src ) override {
102         operation_type op_data(rem_pred);
103         op_data.r = &src;
104         my_aggregator.execute(&op_data);
105         return true;
106     }
107 
108 protected:
109 
110     void reset_function_input_base( reset_flags f) {
111         my_concurrency = 0;
112         if(my_queue) {
113             my_queue->reset();
114         }
115         reset_receiver(f);
116         forwarder_busy = false;
117     }
118 
119     graph& my_graph_ref;
120     const size_t my_max_concurrency;
121     size_t my_concurrency;
122     node_priority_t my_priority;
123     input_queue_type *my_queue;
124     predecessor_cache<input_type, null_mutex > my_predecessors;
125 
126     void reset_receiver( reset_flags f) {
127         if( f & rf_clear_edges) my_predecessors.clear();
128         else
129             my_predecessors.reset();
130         __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
131     }
132 
133     graph& graph_reference() const override {
134         return my_graph_ref;
135     }
136 
137     graph_task* try_get_postponed_task(const input_type& i) {
138         operation_type op_data(i, app_body_bypass);  // tries to pop an item or get_item
139         my_aggregator.execute(&op_data);
140         return op_data.bypass_t;
141     }
142 
143 private:
144 
145     friend class apply_body_task_bypass< class_type, input_type >;
146     friend class forward_task_bypass< class_type >;
147 
148     class operation_type : public aggregated_operation< operation_type > {
149     public:
150         char type;
151         union {
152             input_type *elem;
153             predecessor_type *r;
154         };
155         graph_task* bypass_t;
156         operation_type(const input_type& e, op_type t) :
157             type(char(t)), elem(const_cast<input_type*>(&e)) {}
158         operation_type(op_type t) : type(char(t)), r(NULL) {}
159     };
160 
161     bool forwarder_busy;
162     typedef aggregating_functor<class_type, operation_type> handler_type;
163     friend class aggregating_functor<class_type, operation_type>;
164     aggregator< handler_type, operation_type > my_aggregator;
165 
166     graph_task* perform_queued_requests() {
167         graph_task* new_task = NULL;
168         if(my_queue) {
169             if(!my_queue->empty()) {
170                 ++my_concurrency;
171                 new_task = create_body_task(my_queue->front());
172 
173                 my_queue->pop();
174             }
175         }
176         else {
177             input_type i;
178             if(my_predecessors.get_item(i)) {
179                 ++my_concurrency;
180                 new_task = create_body_task(i);
181             }
182         }
183         return new_task;
184     }
185     void handle_operations(operation_type *op_list) {
186         operation_type* tmp;
187         while (op_list) {
188             tmp = op_list;
189             op_list = op_list->next;
190             switch (tmp->type) {
191             case reg_pred:
192                 my_predecessors.add(*(tmp->r));
193                 tmp->status.store(SUCCEEDED, std::memory_order_release);
194                 if (!forwarder_busy) {
195                     forwarder_busy = true;
196                     spawn_forward_task();
197                 }
198                 break;
199             case rem_pred:
200                 my_predecessors.remove(*(tmp->r));
201                 tmp->status.store(SUCCEEDED, std::memory_order_release);
202                 break;
203             case app_body_bypass: {
204                 tmp->bypass_t = NULL;
205                 __TBB_ASSERT(my_max_concurrency != 0, NULL);
206                 --my_concurrency;
207                 if(my_concurrency<my_max_concurrency)
208                     tmp->bypass_t = perform_queued_requests();
209                 tmp->status.store(SUCCEEDED, std::memory_order_release);
210             }
211                 break;
212             case tryput_bypass: internal_try_put_task(tmp);  break;
213             case try_fwd: internal_forward(tmp);  break;
214             case occupy_concurrency:
215                 if (my_concurrency < my_max_concurrency) {
216                     ++my_concurrency;
217                     tmp->status.store(SUCCEEDED, std::memory_order_release);
218                 } else {
219                     tmp->status.store(FAILED, std::memory_order_release);
220                 }
221                 break;
222             }
223         }
224     }
225 
226     //! Put to the node, but return the task instead of enqueueing it
227     void internal_try_put_task(operation_type *op) {
228         __TBB_ASSERT(my_max_concurrency != 0, NULL);
229         if (my_concurrency < my_max_concurrency) {
230             ++my_concurrency;
231             graph_task * new_task = create_body_task(*(op->elem));
232             op->bypass_t = new_task;
233             op->status.store(SUCCEEDED, std::memory_order_release);
234         } else if ( my_queue && my_queue->push(*(op->elem)) ) {
235             op->bypass_t = SUCCESSFULLY_ENQUEUED;
236             op->status.store(SUCCEEDED, std::memory_order_release);
237         } else {
238             op->bypass_t = NULL;
239             op->status.store(FAILED, std::memory_order_release);
240         }
241     }
242 
243     //! Creates tasks for postponed messages if available and if concurrency allows
244     void internal_forward(operation_type *op) {
245         op->bypass_t = NULL;
246         if (my_concurrency < my_max_concurrency)
247             op->bypass_t = perform_queued_requests();
248         if(op->bypass_t)
249             op->status.store(SUCCEEDED, std::memory_order_release);
250         else {
251             forwarder_busy = false;
252             op->status.store(FAILED, std::memory_order_release);
253         }
254     }
255 
256     graph_task* internal_try_put_bypass( const input_type& t ) {
257         operation_type op_data(t, tryput_bypass);
258         my_aggregator.execute(&op_data);
259         if( op_data.status == SUCCEEDED ) {
260             return op_data.bypass_t;
261         }
262         return NULL;
263     }
264 
265     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) {
266         if( my_max_concurrency == 0 ) {
267             return apply_body_bypass(t);
268         } else {
269             operation_type check_op(t, occupy_concurrency);
270             my_aggregator.execute(&check_op);
271             if( check_op.status == SUCCEEDED ) {
272                 return apply_body_bypass(t);
273             }
274             return internal_try_put_bypass(t);
275         }
276     }
277 
278     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) {
279         if( my_max_concurrency == 0 ) {
280             return create_body_task(t);
281         } else {
282             return internal_try_put_bypass(t);
283         }
284     }
285 
286     //! Applies the body to the provided input
287     //  then decides if more work is available
288     graph_task* apply_body_bypass( const input_type &i ) {
289         return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
290     }
291 
292     //! allocates a task to apply a body
293     graph_task* create_body_task( const input_type &input ) {
294         if (!is_graph_active(my_graph_ref)) {
295             return nullptr;
296         }
297         // TODO revamp: extract helper for common graph task allocation part
298         small_object_allocator allocator{};
299         typedef apply_body_task_bypass<class_type, input_type> task_type;
300         graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority );
301         graph_reference().reserve_wait();
302         return t;
303     }
304 
305     //! This is executed by an enqueued task, the "forwarder"
306     graph_task* forward_task() {
307         operation_type op_data(try_fwd);
308         graph_task* rval = NULL;
309         do {
310             op_data.status = WAIT;
311             my_aggregator.execute(&op_data);
312             if(op_data.status == SUCCEEDED) {
313                 graph_task* ttask = op_data.bypass_t;
314                 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
315                 rval = combine_tasks(my_graph_ref, rval, ttask);
316             }
317         } while (op_data.status == SUCCEEDED);
318         return rval;
319     }
320 
321     inline graph_task* create_forward_task() {
322         if (!is_graph_active(my_graph_ref)) {
323             return nullptr;
324         }
325         small_object_allocator allocator{};
326         typedef forward_task_bypass<class_type> task_type;
327         graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
328         graph_reference().reserve_wait();
329         return t;
330     }
331 
332     //! Spawns a task that calls forward()
333     inline void spawn_forward_task() {
334         graph_task* tp = create_forward_task();
335         if(tp) {
336             spawn_in_graph_arena(graph_reference(), *tp);
337         }
338     }
339 
340     node_priority_t priority() const override { return my_priority; }
341 };  // function_input_base
342 
343 //! Implements methods for a function node that takes a type Input as input and sends
344 //  a type Output to its successors.
345 template< typename Input, typename Output, typename Policy, typename A>
346 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
347 public:
348     typedef Input input_type;
349     typedef Output output_type;
350     typedef function_body<input_type, output_type> function_body_type;
351     typedef function_input<Input, Output, Policy,A> my_class;
352     typedef function_input_base<Input, Policy, A, my_class> base_type;
353     typedef function_input_queue<input_type, A> input_queue_type;
354 
355     // constructor
356     template<typename Body>
357     function_input(
358         graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
359       : base_type(g, max_concurrency, a_priority)
360       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
361       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
362     }
363 
364     //! Copy constructor
365     function_input( const function_input& src ) :
366         base_type(src),
367         my_body( src.my_init_body->clone() ),
368         my_init_body(src.my_init_body->clone() ) {
369     }
370 #if __INTEL_COMPILER <= 2021
371     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
372     // class while the parent class has the virtual keyword for the destrocutor.
373     virtual
374 #endif
375     ~function_input() {
376         delete my_body;
377         delete my_init_body;
378     }
379 
380     template< typename Body >
381     Body copy_function_object() {
382         function_body_type &body_ref = *this->my_body;
383         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
384     }
385 
386     output_type apply_body_impl( const input_type& i) {
387         // There is an extra copied needed to capture the
388         // body execution without the try_put
389         fgt_begin_body( my_body );
390         output_type v = (*my_body)(i);
391         fgt_end_body( my_body );
392         return v;
393     }
394 
395     //TODO: consider moving into the base class
396     graph_task* apply_body_impl_bypass( const input_type &i) {
397         output_type v = apply_body_impl(i);
398         graph_task* postponed_task = NULL;
399         if( base_type::my_max_concurrency != 0 ) {
400             postponed_task = base_type::try_get_postponed_task(i);
401             __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
402         }
403         if( postponed_task ) {
404             // make the task available for other workers since we do not know successors'
405             // execution policy
406             spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
407         }
408         graph_task* successor_task = successors().try_put_task(v);
409 #if _MSC_VER && !__INTEL_COMPILER
410 #pragma warning (push)
411 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
412 #endif
413         if(has_policy<lightweight, Policy>::value) {
414 #if _MSC_VER && !__INTEL_COMPILER
415 #pragma warning (pop)
416 #endif
417             if(!successor_task) {
418                 // Return confirmative status since current
419                 // node's body has been executed anyway
420                 successor_task = SUCCESSFULLY_ENQUEUED;
421             }
422         }
423         return successor_task;
424     }
425 
426 protected:
427 
428     void reset_function_input(reset_flags f) {
429         base_type::reset_function_input_base(f);
430         if(f & rf_reset_bodies) {
431             function_body_type *tmp = my_init_body->clone();
432             delete my_body;
433             my_body = tmp;
434         }
435     }
436 
437     function_body_type *my_body;
438     function_body_type *my_init_body;
439     virtual broadcast_cache<output_type > &successors() = 0;
440 
441 };  // function_input
442 
443 
444 // helper templates to clear the successor edges of the output ports of an multifunction_node
445 template<int N> struct clear_element {
446     template<typename P> static void clear_this(P &p) {
447         (void)std::get<N-1>(p).successors().clear();
448         clear_element<N-1>::clear_this(p);
449     }
450 #if TBB_USE_ASSERT
451     template<typename P> static bool this_empty(P &p) {
452         if(std::get<N-1>(p).successors().empty())
453             return clear_element<N-1>::this_empty(p);
454         return false;
455     }
456 #endif
457 };
458 
459 template<> struct clear_element<1> {
460     template<typename P> static void clear_this(P &p) {
461         (void)std::get<0>(p).successors().clear();
462     }
463 #if TBB_USE_ASSERT
464     template<typename P> static bool this_empty(P &p) {
465         return std::get<0>(p).successors().empty();
466     }
467 #endif
468 };
469 
470 template <typename OutputTuple>
471 struct init_output_ports {
472     template <typename... Args>
473     static OutputTuple call(graph& g, const std::tuple<Args...>&) {
474         return OutputTuple(Args(g)...);
475     }
476 }; // struct init_output_ports
477 
478 //! Implements methods for a function node that takes a type Input as input
479 //  and has a tuple of output ports specified.
480 template< typename Input, typename OutputPortSet, typename Policy, typename A>
481 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
482 public:
483     static const int N = std::tuple_size<OutputPortSet>::value;
484     typedef Input input_type;
485     typedef OutputPortSet output_ports_type;
486     typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
487     typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
488     typedef function_input_base<Input, Policy, A, my_class> base_type;
489     typedef function_input_queue<input_type, A> input_queue_type;
490 
491     // constructor
492     template<typename Body>
493     multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
494       : base_type(g, max_concurrency, a_priority)
495       , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
496       , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
497       , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
498     }
499 
500     //! Copy constructor
501     multifunction_input( const multifunction_input& src ) :
502         base_type(src),
503         my_body( src.my_init_body->clone() ),
504         my_init_body(src.my_init_body->clone() ),
505         my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
506     }
507 
508     ~multifunction_input() {
509         delete my_body;
510         delete my_init_body;
511     }
512 
513     template< typename Body >
514     Body copy_function_object() {
515         multifunction_body_type &body_ref = *this->my_body;
516         return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
517     }
518 
519     // for multifunction nodes we do not have a single successor as such.  So we just tell
520     // the task we were successful.
521     //TODO: consider moving common parts with implementation in function_input into separate function
522     graph_task* apply_body_impl_bypass( const input_type &i ) {
523         fgt_begin_body( my_body );
524         (*my_body)(i, my_output_ports);
525         fgt_end_body( my_body );
526         graph_task* ttask = NULL;
527         if(base_type::my_max_concurrency != 0) {
528             ttask = base_type::try_get_postponed_task(i);
529         }
530         return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
531     }
532 
533     output_ports_type &output_ports(){ return my_output_ports; }
534 
535 protected:
536 
537     void reset(reset_flags f) {
538         base_type::reset_function_input_base(f);
539         if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
540         if(f & rf_reset_bodies) {
541             multifunction_body_type* tmp = my_init_body->clone();
542             delete my_body;
543             my_body = tmp;
544         }
545         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
546     }
547 
548     multifunction_body_type *my_body;
549     multifunction_body_type *my_init_body;
550     output_ports_type my_output_ports;
551 
552 };  // multifunction_input
553 
554 // template to refer to an output port of a multifunction_node
555 template<size_t N, typename MOP>
556 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
557     return std::get<N>(op.output_ports());
558 }
559 
560 inline void check_task_and_spawn(graph& g, graph_task* t) {
561     if (t && t != SUCCESSFULLY_ENQUEUED) {
562         spawn_in_graph_arena(g, *t);
563     }
564 }
565 
566 // helper structs for split_node
567 template<int N>
568 struct emit_element {
569     template<typename T, typename P>
570     static graph_task* emit_this(graph& g, const T &t, P &p) {
571         // TODO: consider to collect all the tasks in task_list and spawn them all at once
572         graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
573         check_task_and_spawn(g, last_task);
574         return emit_element<N-1>::emit_this(g,t,p);
575     }
576 };
577 
578 template<>
579 struct emit_element<1> {
580     template<typename T, typename P>
581     static graph_task* emit_this(graph& g, const T &t, P &p) {
582         graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
583         check_task_and_spawn(g, last_task);
584         return SUCCESSFULLY_ENQUEUED;
585     }
586 };
587 
588 //! Implements methods for an executable node that takes continue_msg as input
589 template< typename Output, typename Policy>
590 class continue_input : public continue_receiver {
591 public:
592 
593     //! The input type of this receiver
594     typedef continue_msg input_type;
595 
596     //! The output type of this receiver
597     typedef Output output_type;
598     typedef function_body<input_type, output_type> function_body_type;
599     typedef continue_input<output_type, Policy> class_type;
600 
601     template< typename Body >
602     continue_input( graph &g, Body& body, node_priority_t a_priority )
603         : continue_receiver(/*number_of_predecessors=*/0, a_priority)
604         , my_graph_ref(g)
605         , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
606         , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
607     { }
608 
609     template< typename Body >
610     continue_input( graph &g, int number_of_predecessors,
611                     Body& body, node_priority_t a_priority )
612       : continue_receiver( number_of_predecessors, a_priority )
613       , my_graph_ref(g)
614       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
615       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
616     { }
617 
618     continue_input( const continue_input& src ) : continue_receiver(src),
619                                                   my_graph_ref(src.my_graph_ref),
620                                                   my_body( src.my_init_body->clone() ),
621                                                   my_init_body( src.my_init_body->clone() ) {}
622 
623     ~continue_input() {
624         delete my_body;
625         delete my_init_body;
626     }
627 
628     template< typename Body >
629     Body copy_function_object() {
630         function_body_type &body_ref = *my_body;
631         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
632     }
633 
634     void reset_receiver( reset_flags f) override {
635         continue_receiver::reset_receiver(f);
636         if(f & rf_reset_bodies) {
637             function_body_type *tmp = my_init_body->clone();
638             delete my_body;
639             my_body = tmp;
640         }
641     }
642 
643 protected:
644 
645     graph& my_graph_ref;
646     function_body_type *my_body;
647     function_body_type *my_init_body;
648 
649     virtual broadcast_cache<output_type > &successors() = 0;
650 
651     friend class apply_body_task_bypass< class_type, continue_msg >;
652 
653     //! Applies the body to the provided input
654     graph_task* apply_body_bypass( input_type ) {
655         // There is an extra copied needed to capture the
656         // body execution without the try_put
657         fgt_begin_body( my_body );
658         output_type v = (*my_body)( continue_msg() );
659         fgt_end_body( my_body );
660         return successors().try_put_task( v );
661     }
662 
663     graph_task* execute() override {
664         if(!is_graph_active(my_graph_ref)) {
665             return NULL;
666         }
667 #if _MSC_VER && !__INTEL_COMPILER
668 #pragma warning (push)
669 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
670 #endif
671         if(has_policy<lightweight, Policy>::value) {
672 #if _MSC_VER && !__INTEL_COMPILER
673 #pragma warning (pop)
674 #endif
675             return apply_body_bypass( continue_msg() );
676         }
677         else {
678             small_object_allocator allocator{};
679             typedef apply_body_task_bypass<class_type, continue_msg> task_type;
680             graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
681             graph_reference().reserve_wait();
682             return t;
683         }
684     }
685 
686     graph& graph_reference() const override {
687         return my_graph_ref;
688     }
689 };  // continue_input
690 
691 //! Implements methods for both executable and function nodes that puts Output to its successors
692 template< typename Output >
693 class function_output : public sender<Output> {
694 public:
695 
696     template<int N> friend struct clear_element;
697     typedef Output output_type;
698     typedef typename sender<output_type>::successor_type successor_type;
699     typedef broadcast_cache<output_type> broadcast_cache_type;
700 
701     function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
702     function_output(const function_output& other) = delete;
703 
704     //! Adds a new successor to this node
705     bool register_successor( successor_type &r ) override {
706         successors().register_successor( r );
707         return true;
708     }
709 
710     //! Removes a successor from this node
711     bool remove_successor( successor_type &r ) override {
712         successors().remove_successor( r );
713         return true;
714     }
715 
716     broadcast_cache_type &successors() { return my_successors; }
717 
718     graph& graph_reference() const { return my_graph_ref; }
719 protected:
720     broadcast_cache_type my_successors;
721     graph& my_graph_ref;
722 };  // function_output
723 
724 template< typename Output >
725 class multifunction_output : public function_output<Output> {
726 public:
727     typedef Output output_type;
728     typedef function_output<output_type> base_type;
729     using base_type::my_successors;
730 
731     multifunction_output(graph& g) : base_type(g) {}
732     multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
733 
734     bool try_put(const output_type &i) {
735         graph_task *res = try_put_task(i);
736         if( !res ) return false;
737         if( res != SUCCESSFULLY_ENQUEUED ) {
738             // wrapping in task_arena::execute() is not needed since the method is called from
739             // inside task::execute()
740             spawn_in_graph_arena(graph_reference(), *res);
741         }
742         return true;
743     }
744 
745     using base_type::graph_reference;
746 
747 protected:
748 
749     graph_task* try_put_task(const output_type &i) {
750         return my_successors.try_put_task(i);
751     }
752 
753     template <int N> friend struct emit_element;
754 
755 };  // multifunction_output
756 
757 //composite_node
758 template<typename CompositeType>
759 void add_nodes_impl(CompositeType*, bool) {}
760 
761 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
762 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
763     void *addr = const_cast<NodeType1 *>(&n1);
764 
765     fgt_alias_port(c_node, addr, visible);
766     add_nodes_impl(c_node, visible, n...);
767 }
768 
769 #endif // __TBB__flow_graph_node_impl_H
770