1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_node_impl_H
18 #define __TBB__flow_graph_node_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 #include "_flow_graph_item_buffer_impl.h"
25 
26 template< typename T, typename A >
27 class function_input_queue : public item_buffer<T,A> {
28 public:
empty()29     bool empty() const {
30         return this->buffer_empty();
31     }
32 
front()33     const T& front() const {
34         return this->item_buffer<T, A>::front();
35     }
36 
pop()37     void pop() {
38         this->destroy_front();
39     }
40 
push(T & t)41     bool push( T& t ) {
42         return this->push_back( t );
43     }
44 };
45 
46 //! Input and scheduling for a function node that takes a type Input as input
47 //  The only up-ref is apply_body_impl, which should implement the function
48 //  call and any handling of the result.
49 template< typename Input, typename Policy, typename A, typename ImplType >
50 class function_input_base : public receiver<Input>, no_assign {
51     enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
52     };
53     typedef function_input_base<Input, Policy, A, ImplType> class_type;
54 
55 public:
56 
57     //! The input type of this receiver
58     typedef Input input_type;
59     typedef typename receiver<input_type>::predecessor_type predecessor_type;
60     typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
61     typedef function_input_queue<input_type, A> input_queue_type;
62     typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
63     static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
64 
65     //! Constructor for function_input_base
function_input_base(graph & g,size_t max_concurrency,node_priority_t a_priority,bool is_no_throw)66     function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
67         : my_graph_ref(g), my_max_concurrency(max_concurrency)
68         , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
69         , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr)
70         , my_predecessors(this)
71         , forwarder_busy(false)
72     {
73         my_aggregator.initialize_handler(handler_type(this));
74     }
75 
76     //! Copy constructor
function_input_base(const function_input_base & src)77     function_input_base( const function_input_base& src )
78         : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}
79 
80     //! Destructor
81     // The queue is allocated by the constructor for {multi}function_node.
82     // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
83     // This would be an interface-breaking change.
~function_input_base()84     virtual ~function_input_base() {
85         delete my_queue;
86         my_queue = nullptr;
87     }
88 
try_put_task(const input_type & t)89     graph_task* try_put_task( const input_type& t) override {
90         if ( my_is_no_throw )
91             return try_put_task_impl(t, has_policy<lightweight, Policy>());
92         else
93             return try_put_task_impl(t, std::false_type());
94     }
95 
96     //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)97     bool register_predecessor( predecessor_type &src ) override {
98         operation_type op_data(reg_pred);
99         op_data.r = &src;
100         my_aggregator.execute(&op_data);
101         return true;
102     }
103 
104     //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)105     bool remove_predecessor( predecessor_type &src ) override {
106         operation_type op_data(rem_pred);
107         op_data.r = &src;
108         my_aggregator.execute(&op_data);
109         return true;
110     }
111 
112 protected:
113 
reset_function_input_base(reset_flags f)114     void reset_function_input_base( reset_flags f) {
115         my_concurrency = 0;
116         if(my_queue) {
117             my_queue->reset();
118         }
119         reset_receiver(f);
120         forwarder_busy = false;
121     }
122 
123     graph& my_graph_ref;
124     const size_t my_max_concurrency;
125     size_t my_concurrency;
126     node_priority_t my_priority;
127     const bool my_is_no_throw;
128     input_queue_type *my_queue;
129     predecessor_cache<input_type, null_mutex > my_predecessors;
130 
reset_receiver(reset_flags f)131     void reset_receiver( reset_flags f) {
132         if( f & rf_clear_edges) my_predecessors.clear();
133         else
134             my_predecessors.reset();
135         __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
136     }
137 
graph_reference()138     graph& graph_reference() const override {
139         return my_graph_ref;
140     }
141 
try_get_postponed_task(const input_type & i)142     graph_task* try_get_postponed_task(const input_type& i) {
143         operation_type op_data(i, app_body_bypass);  // tries to pop an item or get_item
144         my_aggregator.execute(&op_data);
145         return op_data.bypass_t;
146     }
147 
148 private:
149 
150     friend class apply_body_task_bypass< class_type, input_type >;
151     friend class forward_task_bypass< class_type >;
152 
153     class operation_type : public aggregated_operation< operation_type > {
154     public:
155         char type;
156         union {
157             input_type *elem;
158             predecessor_type *r;
159         };
160         graph_task* bypass_t;
operation_type(const input_type & e,op_type t)161         operation_type(const input_type& e, op_type t) :
162             type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr) {}
operation_type(op_type t)163         operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {}
164     };
165 
166     bool forwarder_busy;
167     typedef aggregating_functor<class_type, operation_type> handler_type;
168     friend class aggregating_functor<class_type, operation_type>;
169     aggregator< handler_type, operation_type > my_aggregator;
170 
perform_queued_requests()171     graph_task* perform_queued_requests() {
172         graph_task* new_task = nullptr;
173         if(my_queue) {
174             if(!my_queue->empty()) {
175                 ++my_concurrency;
176                 new_task = create_body_task(my_queue->front());
177 
178                 my_queue->pop();
179             }
180         }
181         else {
182             input_type i;
183             if(my_predecessors.get_item(i)) {
184                 ++my_concurrency;
185                 new_task = create_body_task(i);
186             }
187         }
188         return new_task;
189     }
handle_operations(operation_type * op_list)190     void handle_operations(operation_type *op_list) {
191         operation_type* tmp;
192         while (op_list) {
193             tmp = op_list;
194             op_list = op_list->next;
195             switch (tmp->type) {
196             case reg_pred:
197                 my_predecessors.add(*(tmp->r));
198                 tmp->status.store(SUCCEEDED, std::memory_order_release);
199                 if (!forwarder_busy) {
200                     forwarder_busy = true;
201                     spawn_forward_task();
202                 }
203                 break;
204             case rem_pred:
205                 my_predecessors.remove(*(tmp->r));
206                 tmp->status.store(SUCCEEDED, std::memory_order_release);
207                 break;
208             case app_body_bypass: {
209                 tmp->bypass_t = nullptr;
210                 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
211                 --my_concurrency;
212                 if(my_concurrency<my_max_concurrency)
213                     tmp->bypass_t = perform_queued_requests();
214                 tmp->status.store(SUCCEEDED, std::memory_order_release);
215             }
216                 break;
217             case tryput_bypass: internal_try_put_task(tmp);  break;
218             case try_fwd: internal_forward(tmp);  break;
219             case occupy_concurrency:
220                 if (my_concurrency < my_max_concurrency) {
221                     ++my_concurrency;
222                     tmp->status.store(SUCCEEDED, std::memory_order_release);
223                 } else {
224                     tmp->status.store(FAILED, std::memory_order_release);
225                 }
226                 break;
227             }
228         }
229     }
230 
231     //! Put to the node, but return the task instead of enqueueing it
internal_try_put_task(operation_type * op)232     void internal_try_put_task(operation_type *op) {
233         __TBB_ASSERT(my_max_concurrency != 0, nullptr);
234         if (my_concurrency < my_max_concurrency) {
235             ++my_concurrency;
236             graph_task * new_task = create_body_task(*(op->elem));
237             op->bypass_t = new_task;
238             op->status.store(SUCCEEDED, std::memory_order_release);
239         } else if ( my_queue && my_queue->push(*(op->elem)) ) {
240             op->bypass_t = SUCCESSFULLY_ENQUEUED;
241             op->status.store(SUCCEEDED, std::memory_order_release);
242         } else {
243             op->bypass_t = nullptr;
244             op->status.store(FAILED, std::memory_order_release);
245         }
246     }
247 
248     //! Creates tasks for postponed messages if available and if concurrency allows
internal_forward(operation_type * op)249     void internal_forward(operation_type *op) {
250         op->bypass_t = nullptr;
251         if (my_concurrency < my_max_concurrency)
252             op->bypass_t = perform_queued_requests();
253         if(op->bypass_t)
254             op->status.store(SUCCEEDED, std::memory_order_release);
255         else {
256             forwarder_busy = false;
257             op->status.store(FAILED, std::memory_order_release);
258         }
259     }
260 
internal_try_put_bypass(const input_type & t)261     graph_task* internal_try_put_bypass( const input_type& t ) {
262         operation_type op_data(t, tryput_bypass);
263         my_aggregator.execute(&op_data);
264         if( op_data.status == SUCCEEDED ) {
265             return op_data.bypass_t;
266         }
267         return nullptr;
268     }
269 
try_put_task_impl(const input_type & t,std::true_type)270     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) {
271         if( my_max_concurrency == 0 ) {
272             return apply_body_bypass(t);
273         } else {
274             operation_type check_op(t, occupy_concurrency);
275             my_aggregator.execute(&check_op);
276             if( check_op.status == SUCCEEDED ) {
277                 return apply_body_bypass(t);
278             }
279             return internal_try_put_bypass(t);
280         }
281     }
282 
try_put_task_impl(const input_type & t,std::false_type)283     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) {
284         if( my_max_concurrency == 0 ) {
285             return create_body_task(t);
286         } else {
287             return internal_try_put_bypass(t);
288         }
289     }
290 
291     //! Applies the body to the provided input
292     //  then decides if more work is available
apply_body_bypass(const input_type & i)293     graph_task* apply_body_bypass( const input_type &i ) {
294         return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
295     }
296 
297     //! allocates a task to apply a body
create_body_task(const input_type & input)298     graph_task* create_body_task( const input_type &input ) {
299         if (!is_graph_active(my_graph_ref)) {
300             return nullptr;
301         }
302         // TODO revamp: extract helper for common graph task allocation part
303         small_object_allocator allocator{};
304         typedef apply_body_task_bypass<class_type, input_type> task_type;
305         graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority );
306         graph_reference().reserve_wait();
307         return t;
308     }
309 
310     //! This is executed by an enqueued task, the "forwarder"
forward_task()311     graph_task* forward_task() {
312         operation_type op_data(try_fwd);
313         graph_task* rval = nullptr;
314         do {
315             op_data.status = WAIT;
316             my_aggregator.execute(&op_data);
317             if(op_data.status == SUCCEEDED) {
318                 graph_task* ttask = op_data.bypass_t;
319                 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr);
320                 rval = combine_tasks(my_graph_ref, rval, ttask);
321             }
322         } while (op_data.status == SUCCEEDED);
323         return rval;
324     }
325 
create_forward_task()326     inline graph_task* create_forward_task() {
327         if (!is_graph_active(my_graph_ref)) {
328             return nullptr;
329         }
330         small_object_allocator allocator{};
331         typedef forward_task_bypass<class_type> task_type;
332         graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
333         graph_reference().reserve_wait();
334         return t;
335     }
336 
337     //! Spawns a task that calls forward()
spawn_forward_task()338     inline void spawn_forward_task() {
339         graph_task* tp = create_forward_task();
340         if(tp) {
341             spawn_in_graph_arena(graph_reference(), *tp);
342         }
343     }
344 
priority()345     node_priority_t priority() const override { return my_priority; }
346 };  // function_input_base
347 
348 //! Implements methods for a function node that takes a type Input as input and sends
349 //  a type Output to its successors.
350 template< typename Input, typename Output, typename Policy, typename A>
351 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
352 public:
353     typedef Input input_type;
354     typedef Output output_type;
355     typedef function_body<input_type, output_type> function_body_type;
356     typedef function_input<Input, Output, Policy,A> my_class;
357     typedef function_input_base<Input, Policy, A, my_class> base_type;
358     typedef function_input_queue<input_type, A> input_queue_type;
359 
360     // constructor
361     template<typename Body>
function_input(graph & g,size_t max_concurrency,Body & body,node_priority_t a_priority)362     function_input(
363         graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
364       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type())))
365       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
366       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
367     }
368 
369     //! Copy constructor
function_input(const function_input & src)370     function_input( const function_input& src ) :
371         base_type(src),
372         my_body( src.my_init_body->clone() ),
373         my_init_body(src.my_init_body->clone() ) {
374     }
375 #if __INTEL_COMPILER <= 2021
376     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
377     // class while the parent class has the virtual keyword for the destrocutor.
378     virtual
379 #endif
~function_input()380     ~function_input() {
381         delete my_body;
382         delete my_init_body;
383     }
384 
385     template< typename Body >
copy_function_object()386     Body copy_function_object() {
387         function_body_type &body_ref = *this->my_body;
388         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
389     }
390 
apply_body_impl(const input_type & i)391     output_type apply_body_impl( const input_type& i) {
392         // There is an extra copied needed to capture the
393         // body execution without the try_put
394         fgt_begin_body( my_body );
395         output_type v = tbb::detail::invoke(*my_body, i);
396         fgt_end_body( my_body );
397         return v;
398     }
399 
400     //TODO: consider moving into the base class
apply_body_impl_bypass(const input_type & i)401     graph_task* apply_body_impl_bypass( const input_type &i) {
402         output_type v = apply_body_impl(i);
403         graph_task* postponed_task = nullptr;
404         if( base_type::my_max_concurrency != 0 ) {
405             postponed_task = base_type::try_get_postponed_task(i);
406             __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr);
407         }
408         if( postponed_task ) {
409             // make the task available for other workers since we do not know successors'
410             // execution policy
411             spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
412         }
413         graph_task* successor_task = successors().try_put_task(v);
414 #if _MSC_VER && !__INTEL_COMPILER
415 #pragma warning (push)
416 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
417 #endif
418         if(has_policy<lightweight, Policy>::value) {
419 #if _MSC_VER && !__INTEL_COMPILER
420 #pragma warning (pop)
421 #endif
422             if(!successor_task) {
423                 // Return confirmative status since current
424                 // node's body has been executed anyway
425                 successor_task = SUCCESSFULLY_ENQUEUED;
426             }
427         }
428         return successor_task;
429     }
430 
431 protected:
432 
reset_function_input(reset_flags f)433     void reset_function_input(reset_flags f) {
434         base_type::reset_function_input_base(f);
435         if(f & rf_reset_bodies) {
436             function_body_type *tmp = my_init_body->clone();
437             delete my_body;
438             my_body = tmp;
439         }
440     }
441 
442     function_body_type *my_body;
443     function_body_type *my_init_body;
444     virtual broadcast_cache<output_type > &successors() = 0;
445 
446 };  // function_input
447 
448 
449 // helper templates to clear the successor edges of the output ports of an multifunction_node
450 template<int N> struct clear_element {
clear_thisclear_element451     template<typename P> static void clear_this(P &p) {
452         (void)std::get<N-1>(p).successors().clear();
453         clear_element<N-1>::clear_this(p);
454     }
455 #if TBB_USE_ASSERT
this_emptyclear_element456     template<typename P> static bool this_empty(P &p) {
457         if(std::get<N-1>(p).successors().empty())
458             return clear_element<N-1>::this_empty(p);
459         return false;
460     }
461 #endif
462 };
463 
464 template<> struct clear_element<1> {
465     template<typename P> static void clear_this(P &p) {
466         (void)std::get<0>(p).successors().clear();
467     }
468 #if TBB_USE_ASSERT
469     template<typename P> static bool this_empty(P &p) {
470         return std::get<0>(p).successors().empty();
471     }
472 #endif
473 };
474 
475 template <typename OutputTuple>
476 struct init_output_ports {
477     template <typename... Args>
478     static OutputTuple call(graph& g, const std::tuple<Args...>&) {
479         return OutputTuple(Args(g)...);
480     }
481 }; // struct init_output_ports
482 
483 //! Implements methods for a function node that takes a type Input as input
484 //  and has a tuple of output ports specified.
485 template< typename Input, typename OutputPortSet, typename Policy, typename A>
486 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
487 public:
488     static const int N = std::tuple_size<OutputPortSet>::value;
489     typedef Input input_type;
490     typedef OutputPortSet output_ports_type;
491     typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
492     typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
493     typedef function_input_base<Input, Policy, A, my_class> base_type;
494     typedef function_input_queue<input_type, A> input_queue_type;
495 
496     // constructor
497     template<typename Body>
498     multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
499       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports)))
500       , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
501       , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
502       , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
503     }
504 
505     //! Copy constructor
506     multifunction_input( const multifunction_input& src ) :
507         base_type(src),
508         my_body( src.my_init_body->clone() ),
509         my_init_body(src.my_init_body->clone() ),
510         my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
511     }
512 
513     ~multifunction_input() {
514         delete my_body;
515         delete my_init_body;
516     }
517 
518     template< typename Body >
519     Body copy_function_object() {
520         multifunction_body_type &body_ref = *this->my_body;
521         return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
522     }
523 
524     // for multifunction nodes we do not have a single successor as such.  So we just tell
525     // the task we were successful.
526     //TODO: consider moving common parts with implementation in function_input into separate function
527     graph_task* apply_body_impl_bypass( const input_type &i ) {
528         fgt_begin_body( my_body );
529         (*my_body)(i, my_output_ports);
530         fgt_end_body( my_body );
531         graph_task* ttask = nullptr;
532         if(base_type::my_max_concurrency != 0) {
533             ttask = base_type::try_get_postponed_task(i);
534         }
535         return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
536     }
537 
538     output_ports_type &output_ports(){ return my_output_ports; }
539 
540 protected:
541 
542     void reset(reset_flags f) {
543         base_type::reset_function_input_base(f);
544         if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
545         if(f & rf_reset_bodies) {
546             multifunction_body_type* tmp = my_init_body->clone();
547             delete my_body;
548             my_body = tmp;
549         }
550         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
551     }
552 
553     multifunction_body_type *my_body;
554     multifunction_body_type *my_init_body;
555     output_ports_type my_output_ports;
556 
557 };  // multifunction_input
558 
559 // template to refer to an output port of a multifunction_node
560 template<size_t N, typename MOP>
561 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
562     return std::get<N>(op.output_ports());
563 }
564 
565 inline void check_task_and_spawn(graph& g, graph_task* t) {
566     if (t && t != SUCCESSFULLY_ENQUEUED) {
567         spawn_in_graph_arena(g, *t);
568     }
569 }
570 
571 // helper structs for split_node
572 template<int N>
573 struct emit_element {
574     template<typename T, typename P>
575     static graph_task* emit_this(graph& g, const T &t, P &p) {
576         // TODO: consider to collect all the tasks in task_list and spawn them all at once
577         graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
578         check_task_and_spawn(g, last_task);
579         return emit_element<N-1>::emit_this(g,t,p);
580     }
581 };
582 
583 template<>
584 struct emit_element<1> {
585     template<typename T, typename P>
586     static graph_task* emit_this(graph& g, const T &t, P &p) {
587         graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
588         check_task_and_spawn(g, last_task);
589         return SUCCESSFULLY_ENQUEUED;
590     }
591 };
592 
593 //! Implements methods for an executable node that takes continue_msg as input
594 template< typename Output, typename Policy>
595 class continue_input : public continue_receiver {
596 public:
597 
598     //! The input type of this receiver
599     typedef continue_msg input_type;
600 
601     //! The output type of this receiver
602     typedef Output output_type;
603     typedef function_body<input_type, output_type> function_body_type;
604     typedef continue_input<output_type, Policy> class_type;
605 
606     template< typename Body >
607     continue_input( graph &g, Body& body, node_priority_t a_priority )
608         : continue_receiver(/*number_of_predecessors=*/0, a_priority)
609         , my_graph_ref(g)
610         , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
611         , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
612     { }
613 
614     template< typename Body >
615     continue_input( graph &g, int number_of_predecessors,
616                     Body& body, node_priority_t a_priority )
617       : continue_receiver( number_of_predecessors, a_priority )
618       , my_graph_ref(g)
619       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
620       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
621     { }
622 
623     continue_input( const continue_input& src ) : continue_receiver(src),
624                                                   my_graph_ref(src.my_graph_ref),
625                                                   my_body( src.my_init_body->clone() ),
626                                                   my_init_body( src.my_init_body->clone() ) {}
627 
628     ~continue_input() {
629         delete my_body;
630         delete my_init_body;
631     }
632 
633     template< typename Body >
634     Body copy_function_object() {
635         function_body_type &body_ref = *my_body;
636         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
637     }
638 
639     void reset_receiver( reset_flags f) override {
640         continue_receiver::reset_receiver(f);
641         if(f & rf_reset_bodies) {
642             function_body_type *tmp = my_init_body->clone();
643             delete my_body;
644             my_body = tmp;
645         }
646     }
647 
648 protected:
649 
650     graph& my_graph_ref;
651     function_body_type *my_body;
652     function_body_type *my_init_body;
653 
654     virtual broadcast_cache<output_type > &successors() = 0;
655 
656     friend class apply_body_task_bypass< class_type, continue_msg >;
657 
658     //! Applies the body to the provided input
659     graph_task* apply_body_bypass( input_type ) {
660         // There is an extra copied needed to capture the
661         // body execution without the try_put
662         fgt_begin_body( my_body );
663         output_type v = (*my_body)( continue_msg() );
664         fgt_end_body( my_body );
665         return successors().try_put_task( v );
666     }
667 
668     graph_task* execute() override {
669         if(!is_graph_active(my_graph_ref)) {
670             return nullptr;
671         }
672 #if _MSC_VER && !__INTEL_COMPILER
673 #pragma warning (push)
674 #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
675 #endif
676         if(has_policy<lightweight, Policy>::value) {
677 #if _MSC_VER && !__INTEL_COMPILER
678 #pragma warning (pop)
679 #endif
680             return apply_body_bypass( continue_msg() );
681         }
682         else {
683             small_object_allocator allocator{};
684             typedef apply_body_task_bypass<class_type, continue_msg> task_type;
685             graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
686             graph_reference().reserve_wait();
687             return t;
688         }
689     }
690 
691     graph& graph_reference() const override {
692         return my_graph_ref;
693     }
694 };  // continue_input
695 
696 //! Implements methods for both executable and function nodes that puts Output to its successors
697 template< typename Output >
698 class function_output : public sender<Output> {
699 public:
700 
701     template<int N> friend struct clear_element;
702     typedef Output output_type;
703     typedef typename sender<output_type>::successor_type successor_type;
704     typedef broadcast_cache<output_type> broadcast_cache_type;
705 
706     function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
707     function_output(const function_output& other) = delete;
708 
709     //! Adds a new successor to this node
710     bool register_successor( successor_type &r ) override {
711         successors().register_successor( r );
712         return true;
713     }
714 
715     //! Removes a successor from this node
716     bool remove_successor( successor_type &r ) override {
717         successors().remove_successor( r );
718         return true;
719     }
720 
721     broadcast_cache_type &successors() { return my_successors; }
722 
723     graph& graph_reference() const { return my_graph_ref; }
724 protected:
725     broadcast_cache_type my_successors;
726     graph& my_graph_ref;
727 };  // function_output
728 
729 template< typename Output >
730 class multifunction_output : public function_output<Output> {
731 public:
732     typedef Output output_type;
733     typedef function_output<output_type> base_type;
734     using base_type::my_successors;
735 
736     multifunction_output(graph& g) : base_type(g) {}
737     multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
738 
739     bool try_put(const output_type &i) {
740         graph_task *res = try_put_task(i);
741         if( !res ) return false;
742         if( res != SUCCESSFULLY_ENQUEUED ) {
743             // wrapping in task_arena::execute() is not needed since the method is called from
744             // inside task::execute()
745             spawn_in_graph_arena(graph_reference(), *res);
746         }
747         return true;
748     }
749 
750     using base_type::graph_reference;
751 
752 protected:
753 
754     graph_task* try_put_task(const output_type &i) {
755         return my_successors.try_put_task(i);
756     }
757 
758     template <int N> friend struct emit_element;
759 
760 };  // multifunction_output
761 
762 //composite_node
763 template<typename CompositeType>
764 void add_nodes_impl(CompositeType*, bool) {}
765 
766 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
767 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
768     void *addr = const_cast<NodeType1 *>(&n1);
769 
770     fgt_alias_port(c_node, addr, visible);
771     add_nodes_impl(c_node, visible, n...);
772 }
773 
774 #endif // __TBB__flow_graph_node_impl_H
775