149e08aacStbbdev /*
2*a088cfa0SKonstantin Boyarinov     Copyright (c) 2005-2023 Intel Corporation
349e08aacStbbdev 
449e08aacStbbdev     Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev     you may not use this file except in compliance with the License.
649e08aacStbbdev     You may obtain a copy of the License at
749e08aacStbbdev 
849e08aacStbbdev         http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev 
1049e08aacStbbdev     Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev     See the License for the specific language governing permissions and
1449e08aacStbbdev     limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev 
1749e08aacStbbdev #ifndef __TBB__flow_graph_node_impl_H
1849e08aacStbbdev #define __TBB__flow_graph_node_impl_H
1949e08aacStbbdev 
2049e08aacStbbdev #ifndef __TBB_flow_graph_H
2149e08aacStbbdev #error Do not #include this internal file directly; use public TBB headers instead.
2249e08aacStbbdev #endif
2349e08aacStbbdev 
2449e08aacStbbdev #include "_flow_graph_item_buffer_impl.h"
2549e08aacStbbdev 
2649e08aacStbbdev template< typename T, typename A >
2749e08aacStbbdev class function_input_queue : public item_buffer<T,A> {
2849e08aacStbbdev public:
empty()2949e08aacStbbdev     bool empty() const {
3049e08aacStbbdev         return this->buffer_empty();
3149e08aacStbbdev     }
3249e08aacStbbdev 
front()3349e08aacStbbdev     const T& front() const {
3449e08aacStbbdev         return this->item_buffer<T, A>::front();
3549e08aacStbbdev     }
3649e08aacStbbdev 
pop()3749e08aacStbbdev     void pop() {
3849e08aacStbbdev         this->destroy_front();
3949e08aacStbbdev     }
4049e08aacStbbdev 
push(T & t)4149e08aacStbbdev     bool push( T& t ) {
4249e08aacStbbdev         return this->push_back( t );
4349e08aacStbbdev     }
4449e08aacStbbdev };
4549e08aacStbbdev 
4649e08aacStbbdev //! Input and scheduling for a function node that takes a type Input as input
4749e08aacStbbdev //  The only up-ref is apply_body_impl, which should implement the function
4849e08aacStbbdev //  call and any handling of the result.
4949e08aacStbbdev template< typename Input, typename Policy, typename A, typename ImplType >
5049e08aacStbbdev class function_input_base : public receiver<Input>, no_assign {
5149e08aacStbbdev     enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
5249e08aacStbbdev     };
5349e08aacStbbdev     typedef function_input_base<Input, Policy, A, ImplType> class_type;
5449e08aacStbbdev 
5549e08aacStbbdev public:
5649e08aacStbbdev 
5749e08aacStbbdev     //! The input type of this receiver
5849e08aacStbbdev     typedef Input input_type;
5949e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
6049e08aacStbbdev     typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
6149e08aacStbbdev     typedef function_input_queue<input_type, A> input_queue_type;
6249e08aacStbbdev     typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
6349e08aacStbbdev     static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
6449e08aacStbbdev 
6549e08aacStbbdev     //! Constructor for function_input_base
function_input_base(graph & g,size_t max_concurrency,node_priority_t a_priority,bool is_no_throw)66324afd9eSIlya Mishin     function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
6749e08aacStbbdev         : my_graph_ref(g), my_max_concurrency(max_concurrency)
68324afd9eSIlya Mishin         , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
6957f524caSIlya Isaev         , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr)
7049e08aacStbbdev         , my_predecessors(this)
7149e08aacStbbdev         , forwarder_busy(false)
7249e08aacStbbdev     {
7349e08aacStbbdev         my_aggregator.initialize_handler(handler_type(this));
7449e08aacStbbdev     }
7549e08aacStbbdev 
7649e08aacStbbdev     //! Copy constructor
function_input_base(const function_input_base & src)7749e08aacStbbdev     function_input_base( const function_input_base& src )
78324afd9eSIlya Mishin         : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}
7949e08aacStbbdev 
8049e08aacStbbdev     //! Destructor
8149e08aacStbbdev     // The queue is allocated by the constructor for {multi}function_node.
8249e08aacStbbdev     // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead.
8349e08aacStbbdev     // This would be an interface-breaking change.
~function_input_base()8449e08aacStbbdev     virtual ~function_input_base() {
85ba947f18SIlya Isaev         delete my_queue;
86ba947f18SIlya Isaev         my_queue = nullptr;
8749e08aacStbbdev     }
8849e08aacStbbdev 
try_put_task(const input_type & t)8949e08aacStbbdev     graph_task* try_put_task( const input_type& t) override {
90324afd9eSIlya Mishin         if ( my_is_no_throw )
9149e08aacStbbdev             return try_put_task_impl(t, has_policy<lightweight, Policy>());
92324afd9eSIlya Mishin         else
93324afd9eSIlya Mishin             return try_put_task_impl(t, std::false_type());
9449e08aacStbbdev     }
9549e08aacStbbdev 
9649e08aacStbbdev     //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)9749e08aacStbbdev     bool register_predecessor( predecessor_type &src ) override {
9849e08aacStbbdev         operation_type op_data(reg_pred);
9949e08aacStbbdev         op_data.r = &src;
10049e08aacStbbdev         my_aggregator.execute(&op_data);
10149e08aacStbbdev         return true;
10249e08aacStbbdev     }
10349e08aacStbbdev 
10449e08aacStbbdev     //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)10549e08aacStbbdev     bool remove_predecessor( predecessor_type &src ) override {
10649e08aacStbbdev         operation_type op_data(rem_pred);
10749e08aacStbbdev         op_data.r = &src;
10849e08aacStbbdev         my_aggregator.execute(&op_data);
10949e08aacStbbdev         return true;
11049e08aacStbbdev     }
11149e08aacStbbdev 
11249e08aacStbbdev protected:
11349e08aacStbbdev 
reset_function_input_base(reset_flags f)11449e08aacStbbdev     void reset_function_input_base( reset_flags f) {
11549e08aacStbbdev         my_concurrency = 0;
11649e08aacStbbdev         if(my_queue) {
11749e08aacStbbdev             my_queue->reset();
11849e08aacStbbdev         }
11949e08aacStbbdev         reset_receiver(f);
12049e08aacStbbdev         forwarder_busy = false;
12149e08aacStbbdev     }
12249e08aacStbbdev 
12349e08aacStbbdev     graph& my_graph_ref;
12449e08aacStbbdev     const size_t my_max_concurrency;
12549e08aacStbbdev     size_t my_concurrency;
12649e08aacStbbdev     node_priority_t my_priority;
127324afd9eSIlya Mishin     const bool my_is_no_throw;
12849e08aacStbbdev     input_queue_type *my_queue;
12949e08aacStbbdev     predecessor_cache<input_type, null_mutex > my_predecessors;
13049e08aacStbbdev 
reset_receiver(reset_flags f)13149e08aacStbbdev     void reset_receiver( reset_flags f) {
13249e08aacStbbdev         if( f & rf_clear_edges) my_predecessors.clear();
13349e08aacStbbdev         else
13449e08aacStbbdev             my_predecessors.reset();
13549e08aacStbbdev         __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
13649e08aacStbbdev     }
13749e08aacStbbdev 
graph_reference()13849e08aacStbbdev     graph& graph_reference() const override {
13949e08aacStbbdev         return my_graph_ref;
14049e08aacStbbdev     }
14149e08aacStbbdev 
try_get_postponed_task(const input_type & i)14249e08aacStbbdev     graph_task* try_get_postponed_task(const input_type& i) {
14349e08aacStbbdev         operation_type op_data(i, app_body_bypass);  // tries to pop an item or get_item
14449e08aacStbbdev         my_aggregator.execute(&op_data);
14549e08aacStbbdev         return op_data.bypass_t;
14649e08aacStbbdev     }
14749e08aacStbbdev 
14849e08aacStbbdev private:
14949e08aacStbbdev 
15049e08aacStbbdev     friend class apply_body_task_bypass< class_type, input_type >;
15149e08aacStbbdev     friend class forward_task_bypass< class_type >;
15249e08aacStbbdev 
15349e08aacStbbdev     class operation_type : public aggregated_operation< operation_type > {
15449e08aacStbbdev     public:
15549e08aacStbbdev         char type;
15649e08aacStbbdev         union {
15749e08aacStbbdev             input_type *elem;
15849e08aacStbbdev             predecessor_type *r;
15949e08aacStbbdev         };
16049e08aacStbbdev         graph_task* bypass_t;
operation_type(const input_type & e,op_type t)16149e08aacStbbdev         operation_type(const input_type& e, op_type t) :
162f2af7473Skboyarinov             type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr) {}
operation_type(op_type t)163f2af7473Skboyarinov         operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {}
16449e08aacStbbdev     };
16549e08aacStbbdev 
16649e08aacStbbdev     bool forwarder_busy;
16749e08aacStbbdev     typedef aggregating_functor<class_type, operation_type> handler_type;
16849e08aacStbbdev     friend class aggregating_functor<class_type, operation_type>;
16949e08aacStbbdev     aggregator< handler_type, operation_type > my_aggregator;
17049e08aacStbbdev 
perform_queued_requests()17149e08aacStbbdev     graph_task* perform_queued_requests() {
17257f524caSIlya Isaev         graph_task* new_task = nullptr;
17349e08aacStbbdev         if(my_queue) {
17449e08aacStbbdev             if(!my_queue->empty()) {
17549e08aacStbbdev                 ++my_concurrency;
17649e08aacStbbdev                 new_task = create_body_task(my_queue->front());
17749e08aacStbbdev 
17849e08aacStbbdev                 my_queue->pop();
17949e08aacStbbdev             }
18049e08aacStbbdev         }
18149e08aacStbbdev         else {
18249e08aacStbbdev             input_type i;
18349e08aacStbbdev             if(my_predecessors.get_item(i)) {
18449e08aacStbbdev                 ++my_concurrency;
18549e08aacStbbdev                 new_task = create_body_task(i);
18649e08aacStbbdev             }
18749e08aacStbbdev         }
18849e08aacStbbdev         return new_task;
18949e08aacStbbdev     }
handle_operations(operation_type * op_list)19049e08aacStbbdev     void handle_operations(operation_type *op_list) {
19149e08aacStbbdev         operation_type* tmp;
19249e08aacStbbdev         while (op_list) {
19349e08aacStbbdev             tmp = op_list;
19449e08aacStbbdev             op_list = op_list->next;
19549e08aacStbbdev             switch (tmp->type) {
19649e08aacStbbdev             case reg_pred:
19749e08aacStbbdev                 my_predecessors.add(*(tmp->r));
19849e08aacStbbdev                 tmp->status.store(SUCCEEDED, std::memory_order_release);
19949e08aacStbbdev                 if (!forwarder_busy) {
20049e08aacStbbdev                     forwarder_busy = true;
20149e08aacStbbdev                     spawn_forward_task();
20249e08aacStbbdev                 }
20349e08aacStbbdev                 break;
20449e08aacStbbdev             case rem_pred:
20549e08aacStbbdev                 my_predecessors.remove(*(tmp->r));
20649e08aacStbbdev                 tmp->status.store(SUCCEEDED, std::memory_order_release);
20749e08aacStbbdev                 break;
20849e08aacStbbdev             case app_body_bypass: {
20957f524caSIlya Isaev                 tmp->bypass_t = nullptr;
21057f524caSIlya Isaev                 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
21149e08aacStbbdev                 --my_concurrency;
21249e08aacStbbdev                 if(my_concurrency<my_max_concurrency)
21349e08aacStbbdev                     tmp->bypass_t = perform_queued_requests();
21449e08aacStbbdev                 tmp->status.store(SUCCEEDED, std::memory_order_release);
21549e08aacStbbdev             }
21649e08aacStbbdev                 break;
21749e08aacStbbdev             case tryput_bypass: internal_try_put_task(tmp);  break;
21849e08aacStbbdev             case try_fwd: internal_forward(tmp);  break;
21949e08aacStbbdev             case occupy_concurrency:
22049e08aacStbbdev                 if (my_concurrency < my_max_concurrency) {
22149e08aacStbbdev                     ++my_concurrency;
22249e08aacStbbdev                     tmp->status.store(SUCCEEDED, std::memory_order_release);
22349e08aacStbbdev                 } else {
22449e08aacStbbdev                     tmp->status.store(FAILED, std::memory_order_release);
22549e08aacStbbdev                 }
22649e08aacStbbdev                 break;
22749e08aacStbbdev             }
22849e08aacStbbdev         }
22949e08aacStbbdev     }
23049e08aacStbbdev 
23149e08aacStbbdev     //! Put to the node, but return the task instead of enqueueing it
internal_try_put_task(operation_type * op)23249e08aacStbbdev     void internal_try_put_task(operation_type *op) {
23357f524caSIlya Isaev         __TBB_ASSERT(my_max_concurrency != 0, nullptr);
23449e08aacStbbdev         if (my_concurrency < my_max_concurrency) {
23549e08aacStbbdev             ++my_concurrency;
23649e08aacStbbdev             graph_task * new_task = create_body_task(*(op->elem));
23749e08aacStbbdev             op->bypass_t = new_task;
23849e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
23949e08aacStbbdev         } else if ( my_queue && my_queue->push(*(op->elem)) ) {
24049e08aacStbbdev             op->bypass_t = SUCCESSFULLY_ENQUEUED;
24149e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
24249e08aacStbbdev         } else {
24357f524caSIlya Isaev             op->bypass_t = nullptr;
24449e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
24549e08aacStbbdev         }
24649e08aacStbbdev     }
24749e08aacStbbdev 
24849e08aacStbbdev     //! Creates tasks for postponed messages if available and if concurrency allows
internal_forward(operation_type * op)24949e08aacStbbdev     void internal_forward(operation_type *op) {
25057f524caSIlya Isaev         op->bypass_t = nullptr;
25149e08aacStbbdev         if (my_concurrency < my_max_concurrency)
25249e08aacStbbdev             op->bypass_t = perform_queued_requests();
25349e08aacStbbdev         if(op->bypass_t)
25449e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
25549e08aacStbbdev         else {
25649e08aacStbbdev             forwarder_busy = false;
25749e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
25849e08aacStbbdev         }
25949e08aacStbbdev     }
26049e08aacStbbdev 
internal_try_put_bypass(const input_type & t)26149e08aacStbbdev     graph_task* internal_try_put_bypass( const input_type& t ) {
26249e08aacStbbdev         operation_type op_data(t, tryput_bypass);
26349e08aacStbbdev         my_aggregator.execute(&op_data);
26449e08aacStbbdev         if( op_data.status == SUCCEEDED ) {
26549e08aacStbbdev             return op_data.bypass_t;
26649e08aacStbbdev         }
26757f524caSIlya Isaev         return nullptr;
26849e08aacStbbdev     }
26949e08aacStbbdev 
try_put_task_impl(const input_type & t,std::true_type)27049e08aacStbbdev     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) {
27149e08aacStbbdev         if( my_max_concurrency == 0 ) {
27249e08aacStbbdev             return apply_body_bypass(t);
27349e08aacStbbdev         } else {
27449e08aacStbbdev             operation_type check_op(t, occupy_concurrency);
27549e08aacStbbdev             my_aggregator.execute(&check_op);
27649e08aacStbbdev             if( check_op.status == SUCCEEDED ) {
27749e08aacStbbdev                 return apply_body_bypass(t);
27849e08aacStbbdev             }
27949e08aacStbbdev             return internal_try_put_bypass(t);
28049e08aacStbbdev         }
28149e08aacStbbdev     }
28249e08aacStbbdev 
try_put_task_impl(const input_type & t,std::false_type)28349e08aacStbbdev     graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) {
28449e08aacStbbdev         if( my_max_concurrency == 0 ) {
28549e08aacStbbdev             return create_body_task(t);
28649e08aacStbbdev         } else {
28749e08aacStbbdev             return internal_try_put_bypass(t);
28849e08aacStbbdev         }
28949e08aacStbbdev     }
29049e08aacStbbdev 
29149e08aacStbbdev     //! Applies the body to the provided input
29249e08aacStbbdev     //  then decides if more work is available
apply_body_bypass(const input_type & i)29349e08aacStbbdev     graph_task* apply_body_bypass( const input_type &i ) {
29449e08aacStbbdev         return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
29549e08aacStbbdev     }
29649e08aacStbbdev 
29749e08aacStbbdev     //! allocates a task to apply a body
create_body_task(const input_type & input)29849e08aacStbbdev     graph_task* create_body_task( const input_type &input ) {
29949e08aacStbbdev         if (!is_graph_active(my_graph_ref)) {
30049e08aacStbbdev             return nullptr;
30149e08aacStbbdev         }
30249e08aacStbbdev         // TODO revamp: extract helper for common graph task allocation part
30349e08aacStbbdev         small_object_allocator allocator{};
30449e08aacStbbdev         typedef apply_body_task_bypass<class_type, input_type> task_type;
30549e08aacStbbdev         graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority );
30649e08aacStbbdev         graph_reference().reserve_wait();
30749e08aacStbbdev         return t;
30849e08aacStbbdev     }
30949e08aacStbbdev 
31049e08aacStbbdev     //! This is executed by an enqueued task, the "forwarder"
forward_task()31149e08aacStbbdev     graph_task* forward_task() {
31249e08aacStbbdev         operation_type op_data(try_fwd);
31357f524caSIlya Isaev         graph_task* rval = nullptr;
31449e08aacStbbdev         do {
31549e08aacStbbdev             op_data.status = WAIT;
31649e08aacStbbdev             my_aggregator.execute(&op_data);
31749e08aacStbbdev             if(op_data.status == SUCCEEDED) {
31849e08aacStbbdev                 graph_task* ttask = op_data.bypass_t;
31957f524caSIlya Isaev                 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr);
32049e08aacStbbdev                 rval = combine_tasks(my_graph_ref, rval, ttask);
32149e08aacStbbdev             }
32249e08aacStbbdev         } while (op_data.status == SUCCEEDED);
32349e08aacStbbdev         return rval;
32449e08aacStbbdev     }
32549e08aacStbbdev 
create_forward_task()32649e08aacStbbdev     inline graph_task* create_forward_task() {
32749e08aacStbbdev         if (!is_graph_active(my_graph_ref)) {
32849e08aacStbbdev             return nullptr;
32949e08aacStbbdev         }
33049e08aacStbbdev         small_object_allocator allocator{};
33149e08aacStbbdev         typedef forward_task_bypass<class_type> task_type;
33249e08aacStbbdev         graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
33349e08aacStbbdev         graph_reference().reserve_wait();
33449e08aacStbbdev         return t;
33549e08aacStbbdev     }
33649e08aacStbbdev 
33749e08aacStbbdev     //! Spawns a task that calls forward()
spawn_forward_task()33849e08aacStbbdev     inline void spawn_forward_task() {
33949e08aacStbbdev         graph_task* tp = create_forward_task();
34049e08aacStbbdev         if(tp) {
34149e08aacStbbdev             spawn_in_graph_arena(graph_reference(), *tp);
34249e08aacStbbdev         }
34349e08aacStbbdev     }
34449e08aacStbbdev 
priority()34549e08aacStbbdev     node_priority_t priority() const override { return my_priority; }
34649e08aacStbbdev };  // function_input_base
34749e08aacStbbdev 
34849e08aacStbbdev //! Implements methods for a function node that takes a type Input as input and sends
34949e08aacStbbdev //  a type Output to its successors.
35049e08aacStbbdev template< typename Input, typename Output, typename Policy, typename A>
35149e08aacStbbdev class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
35249e08aacStbbdev public:
35349e08aacStbbdev     typedef Input input_type;
35449e08aacStbbdev     typedef Output output_type;
35549e08aacStbbdev     typedef function_body<input_type, output_type> function_body_type;
35649e08aacStbbdev     typedef function_input<Input, Output, Policy,A> my_class;
35749e08aacStbbdev     typedef function_input_base<Input, Policy, A, my_class> base_type;
35849e08aacStbbdev     typedef function_input_queue<input_type, A> input_queue_type;
35949e08aacStbbdev 
36049e08aacStbbdev     // constructor
36149e08aacStbbdev     template<typename Body>
function_input(graph & g,size_t max_concurrency,Body & body,node_priority_t a_priority)36249e08aacStbbdev     function_input(
36349e08aacStbbdev         graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
364*a088cfa0SKonstantin Boyarinov       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type())))
36549e08aacStbbdev       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
36649e08aacStbbdev       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
36749e08aacStbbdev     }
36849e08aacStbbdev 
36949e08aacStbbdev     //! Copy constructor
function_input(const function_input & src)37049e08aacStbbdev     function_input( const function_input& src ) :
37149e08aacStbbdev         base_type(src),
37249e08aacStbbdev         my_body( src.my_init_body->clone() ),
37349e08aacStbbdev         my_init_body(src.my_init_body->clone() ) {
37449e08aacStbbdev     }
37549e08aacStbbdev #if __INTEL_COMPILER <= 2021
37649e08aacStbbdev     // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited
37749e08aacStbbdev     // class while the parent class has the virtual keyword for the destrocutor.
37849e08aacStbbdev     virtual
37949e08aacStbbdev #endif
~function_input()38049e08aacStbbdev     ~function_input() {
38149e08aacStbbdev         delete my_body;
38249e08aacStbbdev         delete my_init_body;
38349e08aacStbbdev     }
38449e08aacStbbdev 
38549e08aacStbbdev     template< typename Body >
copy_function_object()38649e08aacStbbdev     Body copy_function_object() {
38749e08aacStbbdev         function_body_type &body_ref = *this->my_body;
38849e08aacStbbdev         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
38949e08aacStbbdev     }
39049e08aacStbbdev 
apply_body_impl(const input_type & i)39149e08aacStbbdev     output_type apply_body_impl( const input_type& i) {
39249e08aacStbbdev         // There is an extra copied needed to capture the
39349e08aacStbbdev         // body execution without the try_put
39449e08aacStbbdev         fgt_begin_body( my_body );
395*a088cfa0SKonstantin Boyarinov         output_type v = tbb::detail::invoke(*my_body, i);
39649e08aacStbbdev         fgt_end_body( my_body );
39749e08aacStbbdev         return v;
39849e08aacStbbdev     }
39949e08aacStbbdev 
40049e08aacStbbdev     //TODO: consider moving into the base class
apply_body_impl_bypass(const input_type & i)40149e08aacStbbdev     graph_task* apply_body_impl_bypass( const input_type &i) {
40249e08aacStbbdev         output_type v = apply_body_impl(i);
40357f524caSIlya Isaev         graph_task* postponed_task = nullptr;
40449e08aacStbbdev         if( base_type::my_max_concurrency != 0 ) {
40549e08aacStbbdev             postponed_task = base_type::try_get_postponed_task(i);
40657f524caSIlya Isaev             __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr);
40749e08aacStbbdev         }
40849e08aacStbbdev         if( postponed_task ) {
40949e08aacStbbdev             // make the task available for other workers since we do not know successors'
41049e08aacStbbdev             // execution policy
41149e08aacStbbdev             spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
41249e08aacStbbdev         }
41349e08aacStbbdev         graph_task* successor_task = successors().try_put_task(v);
41449e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER
41549e08aacStbbdev #pragma warning (push)
41649e08aacStbbdev #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
41749e08aacStbbdev #endif
41849e08aacStbbdev         if(has_policy<lightweight, Policy>::value) {
41949e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER
42049e08aacStbbdev #pragma warning (pop)
42149e08aacStbbdev #endif
42249e08aacStbbdev             if(!successor_task) {
42349e08aacStbbdev                 // Return confirmative status since current
42449e08aacStbbdev                 // node's body has been executed anyway
42549e08aacStbbdev                 successor_task = SUCCESSFULLY_ENQUEUED;
42649e08aacStbbdev             }
42749e08aacStbbdev         }
42849e08aacStbbdev         return successor_task;
42949e08aacStbbdev     }
43049e08aacStbbdev 
43149e08aacStbbdev protected:
43249e08aacStbbdev 
reset_function_input(reset_flags f)43349e08aacStbbdev     void reset_function_input(reset_flags f) {
43449e08aacStbbdev         base_type::reset_function_input_base(f);
43549e08aacStbbdev         if(f & rf_reset_bodies) {
43649e08aacStbbdev             function_body_type *tmp = my_init_body->clone();
43749e08aacStbbdev             delete my_body;
43849e08aacStbbdev             my_body = tmp;
43949e08aacStbbdev         }
44049e08aacStbbdev     }
44149e08aacStbbdev 
44249e08aacStbbdev     function_body_type *my_body;
44349e08aacStbbdev     function_body_type *my_init_body;
44449e08aacStbbdev     virtual broadcast_cache<output_type > &successors() = 0;
44549e08aacStbbdev 
44649e08aacStbbdev };  // function_input
44749e08aacStbbdev 
44849e08aacStbbdev 
44949e08aacStbbdev // helper templates to clear the successor edges of the output ports of an multifunction_node
45049e08aacStbbdev template<int N> struct clear_element {
clear_thisclear_element45149e08aacStbbdev     template<typename P> static void clear_this(P &p) {
45249e08aacStbbdev         (void)std::get<N-1>(p).successors().clear();
45349e08aacStbbdev         clear_element<N-1>::clear_this(p);
45449e08aacStbbdev     }
45549e08aacStbbdev #if TBB_USE_ASSERT
this_emptyclear_element45649e08aacStbbdev     template<typename P> static bool this_empty(P &p) {
45749e08aacStbbdev         if(std::get<N-1>(p).successors().empty())
45849e08aacStbbdev             return clear_element<N-1>::this_empty(p);
45949e08aacStbbdev         return false;
46049e08aacStbbdev     }
46149e08aacStbbdev #endif
46249e08aacStbbdev };
46349e08aacStbbdev 
46449e08aacStbbdev template<> struct clear_element<1> {
46549e08aacStbbdev     template<typename P> static void clear_this(P &p) {
46649e08aacStbbdev         (void)std::get<0>(p).successors().clear();
46749e08aacStbbdev     }
46849e08aacStbbdev #if TBB_USE_ASSERT
46949e08aacStbbdev     template<typename P> static bool this_empty(P &p) {
47049e08aacStbbdev         return std::get<0>(p).successors().empty();
47149e08aacStbbdev     }
47249e08aacStbbdev #endif
47349e08aacStbbdev };
47449e08aacStbbdev 
47549e08aacStbbdev template <typename OutputTuple>
47649e08aacStbbdev struct init_output_ports {
47749e08aacStbbdev     template <typename... Args>
47849e08aacStbbdev     static OutputTuple call(graph& g, const std::tuple<Args...>&) {
47949e08aacStbbdev         return OutputTuple(Args(g)...);
48049e08aacStbbdev     }
48149e08aacStbbdev }; // struct init_output_ports
48249e08aacStbbdev 
48349e08aacStbbdev //! Implements methods for a function node that takes a type Input as input
48449e08aacStbbdev //  and has a tuple of output ports specified.
48549e08aacStbbdev template< typename Input, typename OutputPortSet, typename Policy, typename A>
48649e08aacStbbdev class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
48749e08aacStbbdev public:
48849e08aacStbbdev     static const int N = std::tuple_size<OutputPortSet>::value;
48949e08aacStbbdev     typedef Input input_type;
49049e08aacStbbdev     typedef OutputPortSet output_ports_type;
49149e08aacStbbdev     typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
49249e08aacStbbdev     typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
49349e08aacStbbdev     typedef function_input_base<Input, Policy, A, my_class> base_type;
49449e08aacStbbdev     typedef function_input_queue<input_type, A> input_queue_type;
49549e08aacStbbdev 
49649e08aacStbbdev     // constructor
49749e08aacStbbdev     template<typename Body>
49849e08aacStbbdev     multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
499*a088cfa0SKonstantin Boyarinov       : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports)))
50049e08aacStbbdev       , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
50149e08aacStbbdev       , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
50249e08aacStbbdev       , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
50349e08aacStbbdev     }
50449e08aacStbbdev 
50549e08aacStbbdev     //! Copy constructor
50649e08aacStbbdev     multifunction_input( const multifunction_input& src ) :
50749e08aacStbbdev         base_type(src),
50849e08aacStbbdev         my_body( src.my_init_body->clone() ),
50949e08aacStbbdev         my_init_body(src.my_init_body->clone() ),
51049e08aacStbbdev         my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
51149e08aacStbbdev     }
51249e08aacStbbdev 
51349e08aacStbbdev     ~multifunction_input() {
51449e08aacStbbdev         delete my_body;
51549e08aacStbbdev         delete my_init_body;
51649e08aacStbbdev     }
51749e08aacStbbdev 
51849e08aacStbbdev     template< typename Body >
51949e08aacStbbdev     Body copy_function_object() {
52049e08aacStbbdev         multifunction_body_type &body_ref = *this->my_body;
52149e08aacStbbdev         return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
52249e08aacStbbdev     }
52349e08aacStbbdev 
52449e08aacStbbdev     // for multifunction nodes we do not have a single successor as such.  So we just tell
52549e08aacStbbdev     // the task we were successful.
52649e08aacStbbdev     //TODO: consider moving common parts with implementation in function_input into separate function
52749e08aacStbbdev     graph_task* apply_body_impl_bypass( const input_type &i ) {
52849e08aacStbbdev         fgt_begin_body( my_body );
52949e08aacStbbdev         (*my_body)(i, my_output_ports);
53049e08aacStbbdev         fgt_end_body( my_body );
53157f524caSIlya Isaev         graph_task* ttask = nullptr;
53249e08aacStbbdev         if(base_type::my_max_concurrency != 0) {
53349e08aacStbbdev             ttask = base_type::try_get_postponed_task(i);
53449e08aacStbbdev         }
53549e08aacStbbdev         return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
53649e08aacStbbdev     }
53749e08aacStbbdev 
53849e08aacStbbdev     output_ports_type &output_ports(){ return my_output_ports; }
53949e08aacStbbdev 
54049e08aacStbbdev protected:
54149e08aacStbbdev 
54249e08aacStbbdev     void reset(reset_flags f) {
54349e08aacStbbdev         base_type::reset_function_input_base(f);
54449e08aacStbbdev         if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
54549e08aacStbbdev         if(f & rf_reset_bodies) {
54649e08aacStbbdev             multifunction_body_type* tmp = my_init_body->clone();
54749e08aacStbbdev             delete my_body;
54849e08aacStbbdev             my_body = tmp;
54949e08aacStbbdev         }
55049e08aacStbbdev         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
55149e08aacStbbdev     }
55249e08aacStbbdev 
55349e08aacStbbdev     multifunction_body_type *my_body;
55449e08aacStbbdev     multifunction_body_type *my_init_body;
55549e08aacStbbdev     output_ports_type my_output_ports;
55649e08aacStbbdev 
55749e08aacStbbdev };  // multifunction_input
55849e08aacStbbdev 
55949e08aacStbbdev // template to refer to an output port of a multifunction_node
56049e08aacStbbdev template<size_t N, typename MOP>
56149e08aacStbbdev typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
56249e08aacStbbdev     return std::get<N>(op.output_ports());
56349e08aacStbbdev }
56449e08aacStbbdev 
56549e08aacStbbdev inline void check_task_and_spawn(graph& g, graph_task* t) {
56649e08aacStbbdev     if (t && t != SUCCESSFULLY_ENQUEUED) {
56749e08aacStbbdev         spawn_in_graph_arena(g, *t);
56849e08aacStbbdev     }
56949e08aacStbbdev }
57049e08aacStbbdev 
57149e08aacStbbdev // helper structs for split_node
57249e08aacStbbdev template<int N>
57349e08aacStbbdev struct emit_element {
57449e08aacStbbdev     template<typename T, typename P>
57549e08aacStbbdev     static graph_task* emit_this(graph& g, const T &t, P &p) {
57649e08aacStbbdev         // TODO: consider to collect all the tasks in task_list and spawn them all at once
57749e08aacStbbdev         graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
57849e08aacStbbdev         check_task_and_spawn(g, last_task);
57949e08aacStbbdev         return emit_element<N-1>::emit_this(g,t,p);
58049e08aacStbbdev     }
58149e08aacStbbdev };
58249e08aacStbbdev 
58349e08aacStbbdev template<>
58449e08aacStbbdev struct emit_element<1> {
58549e08aacStbbdev     template<typename T, typename P>
58649e08aacStbbdev     static graph_task* emit_this(graph& g, const T &t, P &p) {
58749e08aacStbbdev         graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
58849e08aacStbbdev         check_task_and_spawn(g, last_task);
58949e08aacStbbdev         return SUCCESSFULLY_ENQUEUED;
59049e08aacStbbdev     }
59149e08aacStbbdev };
59249e08aacStbbdev 
59349e08aacStbbdev //! Implements methods for an executable node that takes continue_msg as input
59449e08aacStbbdev template< typename Output, typename Policy>
59549e08aacStbbdev class continue_input : public continue_receiver {
59649e08aacStbbdev public:
59749e08aacStbbdev 
59849e08aacStbbdev     //! The input type of this receiver
59949e08aacStbbdev     typedef continue_msg input_type;
60049e08aacStbbdev 
60149e08aacStbbdev     //! The output type of this receiver
60249e08aacStbbdev     typedef Output output_type;
60349e08aacStbbdev     typedef function_body<input_type, output_type> function_body_type;
60449e08aacStbbdev     typedef continue_input<output_type, Policy> class_type;
60549e08aacStbbdev 
60649e08aacStbbdev     template< typename Body >
60749e08aacStbbdev     continue_input( graph &g, Body& body, node_priority_t a_priority )
60849e08aacStbbdev         : continue_receiver(/*number_of_predecessors=*/0, a_priority)
60949e08aacStbbdev         , my_graph_ref(g)
61049e08aacStbbdev         , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
61149e08aacStbbdev         , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
61249e08aacStbbdev     { }
61349e08aacStbbdev 
61449e08aacStbbdev     template< typename Body >
61549e08aacStbbdev     continue_input( graph &g, int number_of_predecessors,
61649e08aacStbbdev                     Body& body, node_priority_t a_priority )
61749e08aacStbbdev       : continue_receiver( number_of_predecessors, a_priority )
61849e08aacStbbdev       , my_graph_ref(g)
61949e08aacStbbdev       , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
62049e08aacStbbdev       , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
62149e08aacStbbdev     { }
62249e08aacStbbdev 
62349e08aacStbbdev     continue_input( const continue_input& src ) : continue_receiver(src),
62449e08aacStbbdev                                                   my_graph_ref(src.my_graph_ref),
62549e08aacStbbdev                                                   my_body( src.my_init_body->clone() ),
62649e08aacStbbdev                                                   my_init_body( src.my_init_body->clone() ) {}
62749e08aacStbbdev 
62849e08aacStbbdev     ~continue_input() {
62949e08aacStbbdev         delete my_body;
63049e08aacStbbdev         delete my_init_body;
63149e08aacStbbdev     }
63249e08aacStbbdev 
63349e08aacStbbdev     template< typename Body >
63449e08aacStbbdev     Body copy_function_object() {
63549e08aacStbbdev         function_body_type &body_ref = *my_body;
63649e08aacStbbdev         return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
63749e08aacStbbdev     }
63849e08aacStbbdev 
63949e08aacStbbdev     void reset_receiver( reset_flags f) override {
64049e08aacStbbdev         continue_receiver::reset_receiver(f);
64149e08aacStbbdev         if(f & rf_reset_bodies) {
64249e08aacStbbdev             function_body_type *tmp = my_init_body->clone();
64349e08aacStbbdev             delete my_body;
64449e08aacStbbdev             my_body = tmp;
64549e08aacStbbdev         }
64649e08aacStbbdev     }
64749e08aacStbbdev 
64849e08aacStbbdev protected:
64949e08aacStbbdev 
65049e08aacStbbdev     graph& my_graph_ref;
65149e08aacStbbdev     function_body_type *my_body;
65249e08aacStbbdev     function_body_type *my_init_body;
65349e08aacStbbdev 
65449e08aacStbbdev     virtual broadcast_cache<output_type > &successors() = 0;
65549e08aacStbbdev 
65649e08aacStbbdev     friend class apply_body_task_bypass< class_type, continue_msg >;
65749e08aacStbbdev 
65849e08aacStbbdev     //! Applies the body to the provided input
65949e08aacStbbdev     graph_task* apply_body_bypass( input_type ) {
66049e08aacStbbdev         // There is an extra copied needed to capture the
66149e08aacStbbdev         // body execution without the try_put
66249e08aacStbbdev         fgt_begin_body( my_body );
66349e08aacStbbdev         output_type v = (*my_body)( continue_msg() );
66449e08aacStbbdev         fgt_end_body( my_body );
66549e08aacStbbdev         return successors().try_put_task( v );
66649e08aacStbbdev     }
66749e08aacStbbdev 
66849e08aacStbbdev     graph_task* execute() override {
66949e08aacStbbdev         if(!is_graph_active(my_graph_ref)) {
67057f524caSIlya Isaev             return nullptr;
67149e08aacStbbdev         }
67249e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER
67349e08aacStbbdev #pragma warning (push)
67449e08aacStbbdev #pragma warning (disable: 4127)  /* suppress conditional expression is constant */
67549e08aacStbbdev #endif
67649e08aacStbbdev         if(has_policy<lightweight, Policy>::value) {
67749e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER
67849e08aacStbbdev #pragma warning (pop)
67949e08aacStbbdev #endif
68049e08aacStbbdev             return apply_body_bypass( continue_msg() );
68149e08aacStbbdev         }
68249e08aacStbbdev         else {
68349e08aacStbbdev             small_object_allocator allocator{};
68449e08aacStbbdev             typedef apply_body_task_bypass<class_type, continue_msg> task_type;
68549e08aacStbbdev             graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
68649e08aacStbbdev             graph_reference().reserve_wait();
68749e08aacStbbdev             return t;
68849e08aacStbbdev         }
68949e08aacStbbdev     }
69049e08aacStbbdev 
69149e08aacStbbdev     graph& graph_reference() const override {
69249e08aacStbbdev         return my_graph_ref;
69349e08aacStbbdev     }
69449e08aacStbbdev };  // continue_input
69549e08aacStbbdev 
69649e08aacStbbdev //! Implements methods for both executable and function nodes that puts Output to its successors
69749e08aacStbbdev template< typename Output >
69849e08aacStbbdev class function_output : public sender<Output> {
69949e08aacStbbdev public:
70049e08aacStbbdev 
70149e08aacStbbdev     template<int N> friend struct clear_element;
70249e08aacStbbdev     typedef Output output_type;
70349e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
70449e08aacStbbdev     typedef broadcast_cache<output_type> broadcast_cache_type;
70549e08aacStbbdev 
70649e08aacStbbdev     function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
70749e08aacStbbdev     function_output(const function_output& other) = delete;
70849e08aacStbbdev 
70949e08aacStbbdev     //! Adds a new successor to this node
71049e08aacStbbdev     bool register_successor( successor_type &r ) override {
71149e08aacStbbdev         successors().register_successor( r );
71249e08aacStbbdev         return true;
71349e08aacStbbdev     }
71449e08aacStbbdev 
71549e08aacStbbdev     //! Removes a successor from this node
71649e08aacStbbdev     bool remove_successor( successor_type &r ) override {
71749e08aacStbbdev         successors().remove_successor( r );
71849e08aacStbbdev         return true;
71949e08aacStbbdev     }
72049e08aacStbbdev 
72149e08aacStbbdev     broadcast_cache_type &successors() { return my_successors; }
72249e08aacStbbdev 
72349e08aacStbbdev     graph& graph_reference() const { return my_graph_ref; }
72449e08aacStbbdev protected:
72549e08aacStbbdev     broadcast_cache_type my_successors;
72649e08aacStbbdev     graph& my_graph_ref;
72749e08aacStbbdev };  // function_output
72849e08aacStbbdev 
72949e08aacStbbdev template< typename Output >
73049e08aacStbbdev class multifunction_output : public function_output<Output> {
73149e08aacStbbdev public:
73249e08aacStbbdev     typedef Output output_type;
73349e08aacStbbdev     typedef function_output<output_type> base_type;
73449e08aacStbbdev     using base_type::my_successors;
73549e08aacStbbdev 
73649e08aacStbbdev     multifunction_output(graph& g) : base_type(g) {}
73749e08aacStbbdev     multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
73849e08aacStbbdev 
73949e08aacStbbdev     bool try_put(const output_type &i) {
74049e08aacStbbdev         graph_task *res = try_put_task(i);
74149e08aacStbbdev         if( !res ) return false;
74249e08aacStbbdev         if( res != SUCCESSFULLY_ENQUEUED ) {
74349e08aacStbbdev             // wrapping in task_arena::execute() is not needed since the method is called from
74449e08aacStbbdev             // inside task::execute()
74549e08aacStbbdev             spawn_in_graph_arena(graph_reference(), *res);
74649e08aacStbbdev         }
74749e08aacStbbdev         return true;
74849e08aacStbbdev     }
74949e08aacStbbdev 
75049e08aacStbbdev     using base_type::graph_reference;
75149e08aacStbbdev 
75249e08aacStbbdev protected:
75349e08aacStbbdev 
75449e08aacStbbdev     graph_task* try_put_task(const output_type &i) {
75549e08aacStbbdev         return my_successors.try_put_task(i);
75649e08aacStbbdev     }
75749e08aacStbbdev 
75849e08aacStbbdev     template <int N> friend struct emit_element;
75949e08aacStbbdev 
76049e08aacStbbdev };  // multifunction_output
76149e08aacStbbdev 
76249e08aacStbbdev //composite_node
76349e08aacStbbdev template<typename CompositeType>
76449e08aacStbbdev void add_nodes_impl(CompositeType*, bool) {}
76549e08aacStbbdev 
76649e08aacStbbdev template< typename CompositeType, typename NodeType1, typename... NodeTypes >
76749e08aacStbbdev void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
76849e08aacStbbdev     void *addr = const_cast<NodeType1 *>(&n1);
76949e08aacStbbdev 
77049e08aacStbbdev     fgt_alias_port(c_node, addr, visible);
77149e08aacStbbdev     add_nodes_impl(c_node, visible, n...);
77249e08aacStbbdev }
77349e08aacStbbdev 
77449e08aacStbbdev #endif // __TBB__flow_graph_node_impl_H
775