1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_node_impl_H
18 #define __TBB__flow_graph_node_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #include "_flow_graph_item_buffer_impl.h"
25 
26 template< typename T, typename A >
27 class function_input_queue : public item_buffer<T,A> {
28 public:
29     bool empty() const {
30         return this->buffer_empty();
31     }
32 
33     const T& front() const {
34         return this->item_buffer<T, A>::front();
35     }
36 
37     void pop() {
38         this->destroy_front();
39     }
40 
41     bool push( T& t ) {
42         return this->push_back( t );
43     }
44 };
45 
46 //! Input and scheduling for a function node that takes a type Input as input
47 //  The only up-ref is apply_body_impl, which should implement the function
48 //  call and any handling of the result.
49 template< typename Input, typename Policy, typename A, typename ImplType >
50 class function_input_base : public receiver<Input>, no_assign {
51     enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
52     };
53     typedef function_input_base<Input, Policy, A, ImplType> class_type;
54 
55 public:
56 
57     //! The input type of this receiver
58     typedef Input input_type;
59     typedef typename receiver<input_type>::predecessor_type predecessor_type;
60     typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
61     typedef function_input_queue<input_type, A> input_queue_type;
62     typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
63     static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
64 
65     //! Constructor for function_input_base
66     function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority )
67         : my_graph_ref(g), my_max_concurrency(max_concurrency)
68         , my_concurrency(0), my_priority(a_priority)
69         , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : NULL)
70         , my_predecessors(this)
71         , forwarder_busy(false)
72     {
73         my_aggregator.initialize_handler(handler_type(this));
74     }
75 
76     //! Copy constructor
77     function_input_base( const function_input_base& src )
78         : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority) {}
79 
80     //! Destructor
81     // The queue is allocated by the constructor for {multi}function_node.
82     // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
83     // This would be an interface-breaking change.
84     virtual ~function_input_base() {
85         delete my_queue;
86         my_queue = nullptr;
87     }
88 
89     graph_task* try_put_task( const input_type& t) override {
90         return try_put_task_impl(t, has_policy<lightweight, Policy>());
91     }
92 
93     //! Adds src to the list of cached predecessors.
94     bool register_predecessor( predecessor_type &src ) override {
95         operation_type op_data(reg_pred);
96         op_data.r = &src;
97         my_aggregator.execute(&op_data);
98         return true;
99     }
100 
101     //! Removes src from the list of cached predecessors.
102     bool remove_predecessor( predecessor_type &src ) override {
103         operation_type op_data(rem_pred);
104         op_data.r = &src;
105         my_aggregator.execute(&op_data);
106         return true;
107     }
108 
109 protected:
110 
111     void reset_function_input_base( reset_flags f) {
112         my_concurrency = 0;
113         if(my_queue) {
114             my_queue->reset();
115         }
116         reset_receiver(f);
117         forwarder_busy = false;
118     }
119 
120     graph& my_graph_ref;
121     const size_t my_max_concurrency;
122     size_t my_concurrency;
123     node_priority_t my_priority;
124     input_queue_type *my_queue;
125     predecessor_cache<input_type, null_mutex > my_predecessors;
126 
127     void reset_receiver( reset_flags f) {
128         if( f & rf_clear_edges) my_predecessors.clear();
129         else
130             my_predecessors.reset();
131         __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
132     }
133 
134     graph& graph_reference() const override {
135         return my_graph_ref;
136     }
137 
138     graph_task* try_get_postponed_task(const input_type& i) {
139         operation_type op_data(i, app_body_bypass);  // tries to pop an item or get_item
140         my_aggregator.execute(&op_data);
141         return op_data.bypass_t;
142     }
143 
144 private:
145 
146     friend class apply_body_task_bypass< class_type, input_type >;
147     friend class forward_task_bypass< class_type >;
148 
149     class operation_type : public aggregated_operation< operation_type > {
150     public:
151         char type;
152         union {
153             input_type *elem;
154             predecessor_type *r;
155         };
156         graph_task* bypass_t;
157         operation_type(const input_type& e, op_type t) :
158             type(char(t)), elem(const_cast<input_type*>(&e)) {}
159         operation_type(op_type t) : type(char(t)), r(NULL) {}
160     };
161 
162     bool forwarder_busy;
163     typedef aggregating_functor<class_type, operation_type> handler_type;
164     friend class aggregating_functor<class_type, operation_type>;
165     aggregator< handler_type, operation_type > my_aggregator;
166 
167     graph_task* perform_queued_requests() {
168         graph_task* new_task = NULL;
169         if(my_queue) {
170             if(!my_queue->empty()) {
171                 ++my_concurrency;
172                 new_task = create_body_task(my_queue->front());
173 
174                 my_queue->pop();
175             }
176         }
177         else {
178             input_type i;
179             if(my_predecessors.get_item(i)) {
180                 ++my_concurrency;
181                 new_task = create_body_task(i);
182             }
183         }
184         return new_task;
185     }
186     void handle_operations(operation_type *op_list) {
187         operation_type* tmp;
188         while (op_list) {
189             tmp = op_list;
190             op_list = op_list->next;
191             switch (tmp->type) {
192             case reg_pred:
193                 my_predecessors.add(*(tmp->r));
194                 tmp->status.store(SUCCEEDED, std::memory_order_release);
195                 if (!forwarder_busy) {
196                     forwarder_busy = true;
197                     spawn_forward_task();
198                 }
199                 break;
200             case rem_pred:
201                 my_predecessors.remove(*(tmp->r));
202                 tmp->status.store(SUCCEEDED, std::memory_order_release);
203                 break;
204             case app_body_bypass: {
205                 tmp->bypass_t = NULL;
206                 __TBB_ASSERT(my_max_concurrency != 0, NULL);
207                 --my_concurrency;
208                 if(my_concurrency<my_max_concurrency)
209                     tmp->bypass_t = perform_queued_requests();
210                 tmp->status.store(SUCCEEDED, std::memory_order_release);
211             }
212                 break;
213             case tryput_bypass: internal_try_put_task(tmp);  break;
214             case try_fwd: internal_forward(tmp);  break;
215             case occupy_concurrency:
216                 if (my_concurrency < my_max_concurrency) {
217                     ++my_concurrency;
218                     tmp->status.store(SUCCEEDED, std::memory_order_release);
219                 } else {
220                     tmp->status.store(FAILED, std::memory_order_release);
221                 }
222                 break;
223             }
224         }
225     }
226 
227     //! Put to the node, but return the task instead of enqueueing it
228     void internal_try_put_task(operation_type *op) {
229         __TBB_ASSERT(my_max_concurrency != 0, NULL);
230         if (my_concurrency < my_max_concurrency) {
231             ++my_concurrency;
232             graph_task * new_task = create_body_task(*(op->elem));
233             op->bypass_t = new_task;
234             op->status.store(SUCCEEDED, std::memory_order_release);
235         } else if ( my_queue && my_queue->push(*(op->elem)) ) {
236             op->bypass_t = SUCCESSFULLY_ENQUEUED;
237             op->status.store(SUCCEEDED, std::memory_order_release);
238         } else {
239             op->bypass_t = NULL;
240             op->status.store(FAILED, std::memory_order_release);
241         }
242     }
243 
244     //! Creates tasks for postponed messages if available and if concurrency allows
245     void internal_forward(operation_type *op) {
246         op->bypass_t = NULL;
247         if (my_concurrency < my_max_concurrency)
248             op->bypass_t = perform_queued_requests();
249         if(op->bypass_t)
250             op->status.store(SUCCEEDED, std::memory_order_release);
251         else {
252             forwarder_busy = false;
253             op->status.store(FAILED, std::memory_order_release);
254         }
255     }
256 
257     graph_task* internal_try_put_bypass( const input_type& t ) {
258         operation_type op_data(t, tryput_bypass);
259         my_aggregator.execute(&op_data);
260         if( op_data.status == SUCCEEDED ) {
261             return op_data.bypass_t;
262         }
263         return NULL;
264     }
265 
266     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) {
267         if( my_max_concurrency == 0 ) {
268             return apply_body_bypass(t);
269         } else {
270             operation_type check_op(t, occupy_concurrency);
271             my_aggregator.execute(&check_op);
272             if( check_op.status == SUCCEEDED ) {
273                 return apply_body_bypass(t);
274             }
275             return internal_try_put_bypass(t);
276         }
277     }
278 
279     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) {
280         if( my_max_concurrency == 0 ) {
281             return create_body_task(t);
282         } else {
283             return internal_try_put_bypass(t);
284         }
285     }
286 
287     //! Applies the body to the provided input
288     //  then decides if more work is available
289     graph_task* apply_body_bypass( const input_type &i ) {
290         return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
291     }
292 
293     //! allocates a task to apply a body
294     graph_task* create_body_task( const input_type &input ) {
295         if (!is_graph_active(my_graph_ref)) {
296             return nullptr;
297         }
298         // TODO revamp: extract helper for common graph task allocation part
299         small_object_allocator allocator{};
300         typedef apply_body_task_bypass<class_type, input_type> task_type;
301         graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority );
302         graph_reference().reserve_wait();
303         return t;
304     }
305 
306     //! This is executed by an enqueued task, the "forwarder"
307     graph_task* forward_task() {
308         operation_type op_data(try_fwd);
309         graph_task* rval = NULL;
310         do {
311             op_data.status = WAIT;
312             my_aggregator.execute(&op_data);
313             if(op_data.status == SUCCEEDED) {
314                 graph_task* ttask = op_data.bypass_t;
315                 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, NULL );
316                 rval = combine_tasks(my_graph_ref, rval, ttask);
317             }
318         } while (op_data.status == SUCCEEDED);
319         return rval;
320     }
321 
322     inline graph_task* create_forward_task() {
323         if (!is_graph_active(my_graph_ref)) {
324             return nullptr;
325         }
326         small_object_allocator allocator{};
327         typedef forward_task_bypass<class_type> task_type;
328         graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
329         graph_reference().reserve_wait();
330         return t;
331     }
332 
333     //! Spawns a task that calls forward()
334     inline void spawn_forward_task() {
335         graph_task* tp = create_forward_task();
336         if(tp) {
337             spawn_in_graph_arena(graph_reference(), *tp);
338         }
339     }
340 
341     node_priority_t priority() const override { return my_priority; }
342 };  // function_input_base
343 
344 //! Implements methods for a function node that takes a type Input as input and sends
345 //  a type Output to its successors.
346 template< typename Input, typename Output, typename Policy, typename A>
347 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
348 public:
349     typedef Input input_type;
350     typedef Output output_type;
351     typedef function_body<input_type, output_type> function_body_type;
352     typedef function_input<Input, Output, Policy,A> my_class;
353     typedef function_input_base<Input, Policy, A, my_class> base_type;
354     typedef function_input_queue<input_type, A> input_queue_type;
355 
356     // constructor
357     template<typename Body>
358     function_input(
359         graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
360       : base_type(g, max_concurrency, a_priority)
361       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
362       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
363     }
364 
365     //! Copy constructor
366     function_input( const function_input& src ) :
367         base_type(src),
368         my_body( src.my_init_body->clone() ),
369         my_init_body(src.my_init_body->clone() ) {
370     }
371 #if __INTEL_COMPILER <= 2021
372     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
373     // class while the parent class has the virtual keyword for the destrocutor.
374     virtual
375 #endif
376     ~function_input() {
377         delete my_body;
378         delete my_init_body;
379     }
380 
381     template< typename Body >
382     Body copy_function_object() {
383         function_body_type &body_ref = *this->my_body;
384         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
385     }
386 
387     output_type apply_body_impl( const input_type& i) {
388         // There is an extra copied needed to capture the
389         // body execution without the try_put
390         fgt_begin_body( my_body );
391         output_type v = (*my_body)(i);
392         fgt_end_body( my_body );
393         return v;
394     }
395 
396     //TODO: consider moving into the base class
397     graph_task* apply_body_impl_bypass( const input_type &i) {
398         output_type v = apply_body_impl(i);
399         graph_task* postponed_task = NULL;
400         if( base_type::my_max_concurrency != 0 ) {
401             postponed_task = base_type::try_get_postponed_task(i);
402             __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, NULL );
403         }
404         if( postponed_task ) {
405             // make the task available for other workers since we do not know successors'
406             // execution policy
407             spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
408         }
409         graph_task* successor_task = successors().try_put_task(v);
410 #if _MSC_VER && !__INTEL_COMPILER
411 #pragma warning (push)
412 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
413 #endif
414         if(has_policy<lightweight, Policy>::value) {
415 #if _MSC_VER && !__INTEL_COMPILER
416 #pragma warning (pop)
417 #endif
418             if(!successor_task) {
419                 // Return confirmative status since current
420                 // node's body has been executed anyway
421                 successor_task = SUCCESSFULLY_ENQUEUED;
422             }
423         }
424         return successor_task;
425     }
426 
427 protected:
428 
429     void reset_function_input(reset_flags f) {
430         base_type::reset_function_input_base(f);
431         if(f & rf_reset_bodies) {
432             function_body_type *tmp = my_init_body->clone();
433             delete my_body;
434             my_body = tmp;
435         }
436     }
437 
438     function_body_type *my_body;
439     function_body_type *my_init_body;
440     virtual broadcast_cache<output_type > &successors() = 0;
441 
442 };  // function_input
443 
444 
445 // helper templates to clear the successor edges of the output ports of an multifunction_node
446 template<int N> struct clear_element {
447     template<typename P> static void clear_this(P &p) {
448         (void)std::get<N-1>(p).successors().clear();
449         clear_element<N-1>::clear_this(p);
450     }
451 #if TBB_USE_ASSERT
452     template<typename P> static bool this_empty(P &p) {
453         if(std::get<N-1>(p).successors().empty())
454             return clear_element<N-1>::this_empty(p);
455         return false;
456     }
457 #endif
458 };
459 
460 template<> struct clear_element<1> {
461     template<typename P> static void clear_this(P &p) {
462         (void)std::get<0>(p).successors().clear();
463     }
464 #if TBB_USE_ASSERT
465     template<typename P> static bool this_empty(P &p) {
466         return std::get<0>(p).successors().empty();
467     }
468 #endif
469 };
470 
471 template <typename OutputTuple>
472 struct init_output_ports {
473     template <typename... Args>
474     static OutputTuple call(graph& g, const std::tuple<Args...>&) {
475         return OutputTuple(Args(g)...);
476     }
477 }; // struct init_output_ports
478 
479 //! Implements methods for a function node that takes a type Input as input
480 //  and has a tuple of output ports specified.
481 template< typename Input, typename OutputPortSet, typename Policy, typename A>
482 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
483 public:
484     static const int N = std::tuple_size<OutputPortSet>::value;
485     typedef Input input_type;
486     typedef OutputPortSet output_ports_type;
487     typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
488     typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
489     typedef function_input_base<Input, Policy, A, my_class> base_type;
490     typedef function_input_queue<input_type, A> input_queue_type;
491 
492     // constructor
493     template<typename Body>
494     multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
495       : base_type(g, max_concurrency, a_priority)
496       , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
497       , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
498       , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
499     }
500 
501     //! Copy constructor
502     multifunction_input( const multifunction_input& src ) :
503         base_type(src),
504         my_body( src.my_init_body->clone() ),
505         my_init_body(src.my_init_body->clone() ),
506         my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
507     }
508 
509     ~multifunction_input() {
510         delete my_body;
511         delete my_init_body;
512     }
513 
514     template< typename Body >
515     Body copy_function_object() {
516         multifunction_body_type &body_ref = *this->my_body;
517         return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
518     }
519 
520     // for multifunction nodes we do not have a single successor as such.  So we just tell
521     // the task we were successful.
522     //TODO: consider moving common parts with implementation in function_input into separate function
523     graph_task* apply_body_impl_bypass( const input_type &i ) {
524         fgt_begin_body( my_body );
525         (*my_body)(i, my_output_ports);
526         fgt_end_body( my_body );
527         graph_task* ttask = NULL;
528         if(base_type::my_max_concurrency != 0) {
529             ttask = base_type::try_get_postponed_task(i);
530         }
531         return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
532     }
533 
534     output_ports_type &output_ports(){ return my_output_ports; }
535 
536 protected:
537 
538     void reset(reset_flags f) {
539         base_type::reset_function_input_base(f);
540         if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
541         if(f & rf_reset_bodies) {
542             multifunction_body_type* tmp = my_init_body->clone();
543             delete my_body;
544             my_body = tmp;
545         }
546         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
547     }
548 
549     multifunction_body_type *my_body;
550     multifunction_body_type *my_init_body;
551     output_ports_type my_output_ports;
552 
553 };  // multifunction_input
554 
555 // template to refer to an output port of a multifunction_node
556 template<size_t N, typename MOP>
557 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
558     return std::get<N>(op.output_ports());
559 }
560 
561 inline void check_task_and_spawn(graph& g, graph_task* t) {
562     if (t && t != SUCCESSFULLY_ENQUEUED) {
563         spawn_in_graph_arena(g, *t);
564     }
565 }
566 
567 // helper structs for split_node
568 template<int N>
569 struct emit_element {
570     template<typename T, typename P>
571     static graph_task* emit_this(graph& g, const T &t, P &p) {
572         // TODO: consider to collect all the tasks in task_list and spawn them all at once
573         graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
574         check_task_and_spawn(g, last_task);
575         return emit_element<N-1>::emit_this(g,t,p);
576     }
577 };
578 
579 template<>
580 struct emit_element<1> {
581     template<typename T, typename P>
582     static graph_task* emit_this(graph& g, const T &t, P &p) {
583         graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
584         check_task_and_spawn(g, last_task);
585         return SUCCESSFULLY_ENQUEUED;
586     }
587 };
588 
589 //! Implements methods for an executable node that takes continue_msg as input
590 template< typename Output, typename Policy>
591 class continue_input : public continue_receiver {
592 public:
593 
594     //! The input type of this receiver
595     typedef continue_msg input_type;
596 
597     //! The output type of this receiver
598     typedef Output output_type;
599     typedef function_body<input_type, output_type> function_body_type;
600     typedef continue_input<output_type, Policy> class_type;
601 
602     template< typename Body >
603     continue_input( graph &g, Body& body, node_priority_t a_priority )
604         : continue_receiver(/*number_of_predecessors=*/0, a_priority)
605         , my_graph_ref(g)
606         , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
607         , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
608     { }
609 
610     template< typename Body >
611     continue_input( graph &g, int number_of_predecessors,
612                     Body& body, node_priority_t a_priority )
613       : continue_receiver( number_of_predecessors, a_priority )
614       , my_graph_ref(g)
615       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
616       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
617     { }
618 
619     continue_input( const continue_input& src ) : continue_receiver(src),
620                                                   my_graph_ref(src.my_graph_ref),
621                                                   my_body( src.my_init_body->clone() ),
622                                                   my_init_body( src.my_init_body->clone() ) {}
623 
624     ~continue_input() {
625         delete my_body;
626         delete my_init_body;
627     }
628 
629     template< typename Body >
630     Body copy_function_object() {
631         function_body_type &body_ref = *my_body;
632         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
633     }
634 
635     void reset_receiver( reset_flags f) override {
636         continue_receiver::reset_receiver(f);
637         if(f & rf_reset_bodies) {
638             function_body_type *tmp = my_init_body->clone();
639             delete my_body;
640             my_body = tmp;
641         }
642     }
643 
644 protected:
645 
646     graph& my_graph_ref;
647     function_body_type *my_body;
648     function_body_type *my_init_body;
649 
650     virtual broadcast_cache<output_type > &successors() = 0;
651 
652     friend class apply_body_task_bypass< class_type, continue_msg >;
653 
654     //! Applies the body to the provided input
655     graph_task* apply_body_bypass( input_type ) {
656         // There is an extra copied needed to capture the
657         // body execution without the try_put
658         fgt_begin_body( my_body );
659         output_type v = (*my_body)( continue_msg() );
660         fgt_end_body( my_body );
661         return successors().try_put_task( v );
662     }
663 
664     graph_task* execute() override {
665         if(!is_graph_active(my_graph_ref)) {
666             return NULL;
667         }
668 #if _MSC_VER && !__INTEL_COMPILER
669 #pragma warning (push)
670 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
671 #endif
672         if(has_policy<lightweight, Policy>::value) {
673 #if _MSC_VER && !__INTEL_COMPILER
674 #pragma warning (pop)
675 #endif
676             return apply_body_bypass( continue_msg() );
677         }
678         else {
679             small_object_allocator allocator{};
680             typedef apply_body_task_bypass<class_type, continue_msg> task_type;
681             graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
682             graph_reference().reserve_wait();
683             return t;
684         }
685     }
686 
687     graph& graph_reference() const override {
688         return my_graph_ref;
689     }
690 };  // continue_input
691 
692 //! Implements methods for both executable and function nodes that puts Output to its successors
693 template< typename Output >
694 class function_output : public sender<Output> {
695 public:
696 
697     template<int N> friend struct clear_element;
698     typedef Output output_type;
699     typedef typename sender<output_type>::successor_type successor_type;
700     typedef broadcast_cache<output_type> broadcast_cache_type;
701 
702     function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
703     function_output(const function_output& other) = delete;
704 
705     //! Adds a new successor to this node
706     bool register_successor( successor_type &r ) override {
707         successors().register_successor( r );
708         return true;
709     }
710 
711     //! Removes a successor from this node
712     bool remove_successor( successor_type &r ) override {
713         successors().remove_successor( r );
714         return true;
715     }
716 
717     broadcast_cache_type &successors() { return my_successors; }
718 
719     graph& graph_reference() const { return my_graph_ref; }
720 protected:
721     broadcast_cache_type my_successors;
722     graph& my_graph_ref;
723 };  // function_output
724 
725 template< typename Output >
726 class multifunction_output : public function_output<Output> {
727 public:
728     typedef Output output_type;
729     typedef function_output<output_type> base_type;
730     using base_type::my_successors;
731 
732     multifunction_output(graph& g) : base_type(g) {}
733     multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
734 
735     bool try_put(const output_type &i) {
736         graph_task *res = try_put_task(i);
737         if( !res ) return false;
738         if( res != SUCCESSFULLY_ENQUEUED ) {
739             // wrapping in task_arena::execute() is not needed since the method is called from
740             // inside task::execute()
741             spawn_in_graph_arena(graph_reference(), *res);
742         }
743         return true;
744     }
745 
746     using base_type::graph_reference;
747 
748 protected:
749 
750     graph_task* try_put_task(const output_type &i) {
751         return my_successors.try_put_task(i);
752     }
753 
754     template <int N> friend struct emit_element;
755 
756 };  // multifunction_output
757 
758 //composite_node
759 template<typename CompositeType>
760 void add_nodes_impl(CompositeType*, bool) {}
761 
762 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
763 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
764     void *addr = const_cast<NodeType1 *>(&n1);
765 
766     fgt_alias_port(c_node, addr, visible);
767     add_nodes_impl(c_node, visible, n...);
768 }
769 
770 #endif // __TBB__flow_graph_node_impl_H
771