149e08aacStbbdev /* 2*a088cfa0SKonstantin Boyarinov Copyright (c) 2005-2023 Intel Corporation 349e08aacStbbdev 449e08aacStbbdev Licensed under the Apache License, Version 2.0 (the "License"); 549e08aacStbbdev you may not use this file except in compliance with the License. 649e08aacStbbdev You may obtain a copy of the License at 749e08aacStbbdev 849e08aacStbbdev http://www.apache.org/licenses/LICENSE-2.0 949e08aacStbbdev 1049e08aacStbbdev Unless required by applicable law or agreed to in writing, software 1149e08aacStbbdev distributed under the License is distributed on an "AS IS" BASIS, 1249e08aacStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1349e08aacStbbdev See the License for the specific language governing permissions and 1449e08aacStbbdev limitations under the License. 1549e08aacStbbdev */ 1649e08aacStbbdev 1749e08aacStbbdev #ifndef __TBB__flow_graph_node_impl_H 1849e08aacStbbdev #define __TBB__flow_graph_node_impl_H 1949e08aacStbbdev 2049e08aacStbbdev #ifndef __TBB_flow_graph_H 2149e08aacStbbdev #error Do not #include this internal file directly; use public TBB headers instead. 2249e08aacStbbdev #endif 2349e08aacStbbdev 2449e08aacStbbdev #include "_flow_graph_item_buffer_impl.h" 2549e08aacStbbdev 2649e08aacStbbdev template< typename T, typename A > 2749e08aacStbbdev class function_input_queue : public item_buffer<T,A> { 2849e08aacStbbdev public: empty()2949e08aacStbbdev bool empty() const { 3049e08aacStbbdev return this->buffer_empty(); 3149e08aacStbbdev } 3249e08aacStbbdev front()3349e08aacStbbdev const T& front() const { 3449e08aacStbbdev return this->item_buffer<T, A>::front(); 3549e08aacStbbdev } 3649e08aacStbbdev pop()3749e08aacStbbdev void pop() { 3849e08aacStbbdev this->destroy_front(); 3949e08aacStbbdev } 4049e08aacStbbdev push(T & t)4149e08aacStbbdev bool push( T& t ) { 4249e08aacStbbdev return this->push_back( t ); 4349e08aacStbbdev } 4449e08aacStbbdev }; 4549e08aacStbbdev 4649e08aacStbbdev //! Input and scheduling for a function node that takes a type Input as input 4749e08aacStbbdev // The only up-ref is apply_body_impl, which should implement the function 4849e08aacStbbdev // call and any handling of the result. 4949e08aacStbbdev template< typename Input, typename Policy, typename A, typename ImplType > 5049e08aacStbbdev class function_input_base : public receiver<Input>, no_assign { 5149e08aacStbbdev enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency 5249e08aacStbbdev }; 5349e08aacStbbdev typedef function_input_base<Input, Policy, A, ImplType> class_type; 5449e08aacStbbdev 5549e08aacStbbdev public: 5649e08aacStbbdev 5749e08aacStbbdev //! The input type of this receiver 5849e08aacStbbdev typedef Input input_type; 5949e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type; 6049e08aacStbbdev typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type; 6149e08aacStbbdev typedef function_input_queue<input_type, A> input_queue_type; 6249e08aacStbbdev typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type; 6349e08aacStbbdev static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, ""); 6449e08aacStbbdev 6549e08aacStbbdev //! Constructor for function_input_base function_input_base(graph & g,size_t max_concurrency,node_priority_t a_priority,bool is_no_throw)66324afd9eSIlya Mishin function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw ) 6749e08aacStbbdev : my_graph_ref(g), my_max_concurrency(max_concurrency) 68324afd9eSIlya Mishin , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw) 6957f524caSIlya Isaev , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr) 7049e08aacStbbdev , my_predecessors(this) 7149e08aacStbbdev , forwarder_busy(false) 7249e08aacStbbdev { 7349e08aacStbbdev my_aggregator.initialize_handler(handler_type(this)); 7449e08aacStbbdev } 7549e08aacStbbdev 7649e08aacStbbdev //! Copy constructor function_input_base(const function_input_base & src)7749e08aacStbbdev function_input_base( const function_input_base& src ) 78324afd9eSIlya Mishin : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {} 7949e08aacStbbdev 8049e08aacStbbdev //! Destructor 8149e08aacStbbdev // The queue is allocated by the constructor for {multi}function_node. 8249e08aacStbbdev // TODO: pass the graph_buffer_policy to the base so it can allocate the queue instead. 8349e08aacStbbdev // This would be an interface-breaking change. ~function_input_base()8449e08aacStbbdev virtual ~function_input_base() { 85ba947f18SIlya Isaev delete my_queue; 86ba947f18SIlya Isaev my_queue = nullptr; 8749e08aacStbbdev } 8849e08aacStbbdev try_put_task(const input_type & t)8949e08aacStbbdev graph_task* try_put_task( const input_type& t) override { 90324afd9eSIlya Mishin if ( my_is_no_throw ) 9149e08aacStbbdev return try_put_task_impl(t, has_policy<lightweight, Policy>()); 92324afd9eSIlya Mishin else 93324afd9eSIlya Mishin return try_put_task_impl(t, std::false_type()); 9449e08aacStbbdev } 9549e08aacStbbdev 9649e08aacStbbdev //! Adds src to the list of cached predecessors. register_predecessor(predecessor_type & src)9749e08aacStbbdev bool register_predecessor( predecessor_type &src ) override { 9849e08aacStbbdev operation_type op_data(reg_pred); 9949e08aacStbbdev op_data.r = &src; 10049e08aacStbbdev my_aggregator.execute(&op_data); 10149e08aacStbbdev return true; 10249e08aacStbbdev } 10349e08aacStbbdev 10449e08aacStbbdev //! Removes src from the list of cached predecessors. remove_predecessor(predecessor_type & src)10549e08aacStbbdev bool remove_predecessor( predecessor_type &src ) override { 10649e08aacStbbdev operation_type op_data(rem_pred); 10749e08aacStbbdev op_data.r = &src; 10849e08aacStbbdev my_aggregator.execute(&op_data); 10949e08aacStbbdev return true; 11049e08aacStbbdev } 11149e08aacStbbdev 11249e08aacStbbdev protected: 11349e08aacStbbdev reset_function_input_base(reset_flags f)11449e08aacStbbdev void reset_function_input_base( reset_flags f) { 11549e08aacStbbdev my_concurrency = 0; 11649e08aacStbbdev if(my_queue) { 11749e08aacStbbdev my_queue->reset(); 11849e08aacStbbdev } 11949e08aacStbbdev reset_receiver(f); 12049e08aacStbbdev forwarder_busy = false; 12149e08aacStbbdev } 12249e08aacStbbdev 12349e08aacStbbdev graph& my_graph_ref; 12449e08aacStbbdev const size_t my_max_concurrency; 12549e08aacStbbdev size_t my_concurrency; 12649e08aacStbbdev node_priority_t my_priority; 127324afd9eSIlya Mishin const bool my_is_no_throw; 12849e08aacStbbdev input_queue_type *my_queue; 12949e08aacStbbdev predecessor_cache<input_type, null_mutex > my_predecessors; 13049e08aacStbbdev reset_receiver(reset_flags f)13149e08aacStbbdev void reset_receiver( reset_flags f) { 13249e08aacStbbdev if( f & rf_clear_edges) my_predecessors.clear(); 13349e08aacStbbdev else 13449e08aacStbbdev my_predecessors.reset(); 13549e08aacStbbdev __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed"); 13649e08aacStbbdev } 13749e08aacStbbdev graph_reference()13849e08aacStbbdev graph& graph_reference() const override { 13949e08aacStbbdev return my_graph_ref; 14049e08aacStbbdev } 14149e08aacStbbdev try_get_postponed_task(const input_type & i)14249e08aacStbbdev graph_task* try_get_postponed_task(const input_type& i) { 14349e08aacStbbdev operation_type op_data(i, app_body_bypass); // tries to pop an item or get_item 14449e08aacStbbdev my_aggregator.execute(&op_data); 14549e08aacStbbdev return op_data.bypass_t; 14649e08aacStbbdev } 14749e08aacStbbdev 14849e08aacStbbdev private: 14949e08aacStbbdev 15049e08aacStbbdev friend class apply_body_task_bypass< class_type, input_type >; 15149e08aacStbbdev friend class forward_task_bypass< class_type >; 15249e08aacStbbdev 15349e08aacStbbdev class operation_type : public aggregated_operation< operation_type > { 15449e08aacStbbdev public: 15549e08aacStbbdev char type; 15649e08aacStbbdev union { 15749e08aacStbbdev input_type *elem; 15849e08aacStbbdev predecessor_type *r; 15949e08aacStbbdev }; 16049e08aacStbbdev graph_task* bypass_t; operation_type(const input_type & e,op_type t)16149e08aacStbbdev operation_type(const input_type& e, op_type t) : 162f2af7473Skboyarinov type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr) {} operation_type(op_type t)163f2af7473Skboyarinov operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {} 16449e08aacStbbdev }; 16549e08aacStbbdev 16649e08aacStbbdev bool forwarder_busy; 16749e08aacStbbdev typedef aggregating_functor<class_type, operation_type> handler_type; 16849e08aacStbbdev friend class aggregating_functor<class_type, operation_type>; 16949e08aacStbbdev aggregator< handler_type, operation_type > my_aggregator; 17049e08aacStbbdev perform_queued_requests()17149e08aacStbbdev graph_task* perform_queued_requests() { 17257f524caSIlya Isaev graph_task* new_task = nullptr; 17349e08aacStbbdev if(my_queue) { 17449e08aacStbbdev if(!my_queue->empty()) { 17549e08aacStbbdev ++my_concurrency; 17649e08aacStbbdev new_task = create_body_task(my_queue->front()); 17749e08aacStbbdev 17849e08aacStbbdev my_queue->pop(); 17949e08aacStbbdev } 18049e08aacStbbdev } 18149e08aacStbbdev else { 18249e08aacStbbdev input_type i; 18349e08aacStbbdev if(my_predecessors.get_item(i)) { 18449e08aacStbbdev ++my_concurrency; 18549e08aacStbbdev new_task = create_body_task(i); 18649e08aacStbbdev } 18749e08aacStbbdev } 18849e08aacStbbdev return new_task; 18949e08aacStbbdev } handle_operations(operation_type * op_list)19049e08aacStbbdev void handle_operations(operation_type *op_list) { 19149e08aacStbbdev operation_type* tmp; 19249e08aacStbbdev while (op_list) { 19349e08aacStbbdev tmp = op_list; 19449e08aacStbbdev op_list = op_list->next; 19549e08aacStbbdev switch (tmp->type) { 19649e08aacStbbdev case reg_pred: 19749e08aacStbbdev my_predecessors.add(*(tmp->r)); 19849e08aacStbbdev tmp->status.store(SUCCEEDED, std::memory_order_release); 19949e08aacStbbdev if (!forwarder_busy) { 20049e08aacStbbdev forwarder_busy = true; 20149e08aacStbbdev spawn_forward_task(); 20249e08aacStbbdev } 20349e08aacStbbdev break; 20449e08aacStbbdev case rem_pred: 20549e08aacStbbdev my_predecessors.remove(*(tmp->r)); 20649e08aacStbbdev tmp->status.store(SUCCEEDED, std::memory_order_release); 20749e08aacStbbdev break; 20849e08aacStbbdev case app_body_bypass: { 20957f524caSIlya Isaev tmp->bypass_t = nullptr; 21057f524caSIlya Isaev __TBB_ASSERT(my_max_concurrency != 0, nullptr); 21149e08aacStbbdev --my_concurrency; 21249e08aacStbbdev if(my_concurrency<my_max_concurrency) 21349e08aacStbbdev tmp->bypass_t = perform_queued_requests(); 21449e08aacStbbdev tmp->status.store(SUCCEEDED, std::memory_order_release); 21549e08aacStbbdev } 21649e08aacStbbdev break; 21749e08aacStbbdev case tryput_bypass: internal_try_put_task(tmp); break; 21849e08aacStbbdev case try_fwd: internal_forward(tmp); break; 21949e08aacStbbdev case occupy_concurrency: 22049e08aacStbbdev if (my_concurrency < my_max_concurrency) { 22149e08aacStbbdev ++my_concurrency; 22249e08aacStbbdev tmp->status.store(SUCCEEDED, std::memory_order_release); 22349e08aacStbbdev } else { 22449e08aacStbbdev tmp->status.store(FAILED, std::memory_order_release); 22549e08aacStbbdev } 22649e08aacStbbdev break; 22749e08aacStbbdev } 22849e08aacStbbdev } 22949e08aacStbbdev } 23049e08aacStbbdev 23149e08aacStbbdev //! Put to the node, but return the task instead of enqueueing it internal_try_put_task(operation_type * op)23249e08aacStbbdev void internal_try_put_task(operation_type *op) { 23357f524caSIlya Isaev __TBB_ASSERT(my_max_concurrency != 0, nullptr); 23449e08aacStbbdev if (my_concurrency < my_max_concurrency) { 23549e08aacStbbdev ++my_concurrency; 23649e08aacStbbdev graph_task * new_task = create_body_task(*(op->elem)); 23749e08aacStbbdev op->bypass_t = new_task; 23849e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release); 23949e08aacStbbdev } else if ( my_queue && my_queue->push(*(op->elem)) ) { 24049e08aacStbbdev op->bypass_t = SUCCESSFULLY_ENQUEUED; 24149e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release); 24249e08aacStbbdev } else { 24357f524caSIlya Isaev op->bypass_t = nullptr; 24449e08aacStbbdev op->status.store(FAILED, std::memory_order_release); 24549e08aacStbbdev } 24649e08aacStbbdev } 24749e08aacStbbdev 24849e08aacStbbdev //! Creates tasks for postponed messages if available and if concurrency allows internal_forward(operation_type * op)24949e08aacStbbdev void internal_forward(operation_type *op) { 25057f524caSIlya Isaev op->bypass_t = nullptr; 25149e08aacStbbdev if (my_concurrency < my_max_concurrency) 25249e08aacStbbdev op->bypass_t = perform_queued_requests(); 25349e08aacStbbdev if(op->bypass_t) 25449e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release); 25549e08aacStbbdev else { 25649e08aacStbbdev forwarder_busy = false; 25749e08aacStbbdev op->status.store(FAILED, std::memory_order_release); 25849e08aacStbbdev } 25949e08aacStbbdev } 26049e08aacStbbdev internal_try_put_bypass(const input_type & t)26149e08aacStbbdev graph_task* internal_try_put_bypass( const input_type& t ) { 26249e08aacStbbdev operation_type op_data(t, tryput_bypass); 26349e08aacStbbdev my_aggregator.execute(&op_data); 26449e08aacStbbdev if( op_data.status == SUCCEEDED ) { 26549e08aacStbbdev return op_data.bypass_t; 26649e08aacStbbdev } 26757f524caSIlya Isaev return nullptr; 26849e08aacStbbdev } 26949e08aacStbbdev try_put_task_impl(const input_type & t,std::true_type)27049e08aacStbbdev graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::true_type ) { 27149e08aacStbbdev if( my_max_concurrency == 0 ) { 27249e08aacStbbdev return apply_body_bypass(t); 27349e08aacStbbdev } else { 27449e08aacStbbdev operation_type check_op(t, occupy_concurrency); 27549e08aacStbbdev my_aggregator.execute(&check_op); 27649e08aacStbbdev if( check_op.status == SUCCEEDED ) { 27749e08aacStbbdev return apply_body_bypass(t); 27849e08aacStbbdev } 27949e08aacStbbdev return internal_try_put_bypass(t); 28049e08aacStbbdev } 28149e08aacStbbdev } 28249e08aacStbbdev try_put_task_impl(const input_type & t,std::false_type)28349e08aacStbbdev graph_task* try_put_task_impl( const input_type& t, /*lightweight=*/std::false_type ) { 28449e08aacStbbdev if( my_max_concurrency == 0 ) { 28549e08aacStbbdev return create_body_task(t); 28649e08aacStbbdev } else { 28749e08aacStbbdev return internal_try_put_bypass(t); 28849e08aacStbbdev } 28949e08aacStbbdev } 29049e08aacStbbdev 29149e08aacStbbdev //! Applies the body to the provided input 29249e08aacStbbdev // then decides if more work is available apply_body_bypass(const input_type & i)29349e08aacStbbdev graph_task* apply_body_bypass( const input_type &i ) { 29449e08aacStbbdev return static_cast<ImplType *>(this)->apply_body_impl_bypass(i); 29549e08aacStbbdev } 29649e08aacStbbdev 29749e08aacStbbdev //! allocates a task to apply a body create_body_task(const input_type & input)29849e08aacStbbdev graph_task* create_body_task( const input_type &input ) { 29949e08aacStbbdev if (!is_graph_active(my_graph_ref)) { 30049e08aacStbbdev return nullptr; 30149e08aacStbbdev } 30249e08aacStbbdev // TODO revamp: extract helper for common graph task allocation part 30349e08aacStbbdev small_object_allocator allocator{}; 30449e08aacStbbdev typedef apply_body_task_bypass<class_type, input_type> task_type; 30549e08aacStbbdev graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority ); 30649e08aacStbbdev graph_reference().reserve_wait(); 30749e08aacStbbdev return t; 30849e08aacStbbdev } 30949e08aacStbbdev 31049e08aacStbbdev //! This is executed by an enqueued task, the "forwarder" forward_task()31149e08aacStbbdev graph_task* forward_task() { 31249e08aacStbbdev operation_type op_data(try_fwd); 31357f524caSIlya Isaev graph_task* rval = nullptr; 31449e08aacStbbdev do { 31549e08aacStbbdev op_data.status = WAIT; 31649e08aacStbbdev my_aggregator.execute(&op_data); 31749e08aacStbbdev if(op_data.status == SUCCEEDED) { 31849e08aacStbbdev graph_task* ttask = op_data.bypass_t; 31957f524caSIlya Isaev __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr); 32049e08aacStbbdev rval = combine_tasks(my_graph_ref, rval, ttask); 32149e08aacStbbdev } 32249e08aacStbbdev } while (op_data.status == SUCCEEDED); 32349e08aacStbbdev return rval; 32449e08aacStbbdev } 32549e08aacStbbdev create_forward_task()32649e08aacStbbdev inline graph_task* create_forward_task() { 32749e08aacStbbdev if (!is_graph_active(my_graph_ref)) { 32849e08aacStbbdev return nullptr; 32949e08aacStbbdev } 33049e08aacStbbdev small_object_allocator allocator{}; 33149e08aacStbbdev typedef forward_task_bypass<class_type> task_type; 33249e08aacStbbdev graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority ); 33349e08aacStbbdev graph_reference().reserve_wait(); 33449e08aacStbbdev return t; 33549e08aacStbbdev } 33649e08aacStbbdev 33749e08aacStbbdev //! Spawns a task that calls forward() spawn_forward_task()33849e08aacStbbdev inline void spawn_forward_task() { 33949e08aacStbbdev graph_task* tp = create_forward_task(); 34049e08aacStbbdev if(tp) { 34149e08aacStbbdev spawn_in_graph_arena(graph_reference(), *tp); 34249e08aacStbbdev } 34349e08aacStbbdev } 34449e08aacStbbdev priority()34549e08aacStbbdev node_priority_t priority() const override { return my_priority; } 34649e08aacStbbdev }; // function_input_base 34749e08aacStbbdev 34849e08aacStbbdev //! Implements methods for a function node that takes a type Input as input and sends 34949e08aacStbbdev // a type Output to its successors. 35049e08aacStbbdev template< typename Input, typename Output, typename Policy, typename A> 35149e08aacStbbdev class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > { 35249e08aacStbbdev public: 35349e08aacStbbdev typedef Input input_type; 35449e08aacStbbdev typedef Output output_type; 35549e08aacStbbdev typedef function_body<input_type, output_type> function_body_type; 35649e08aacStbbdev typedef function_input<Input, Output, Policy,A> my_class; 35749e08aacStbbdev typedef function_input_base<Input, Policy, A, my_class> base_type; 35849e08aacStbbdev typedef function_input_queue<input_type, A> input_queue_type; 35949e08aacStbbdev 36049e08aacStbbdev // constructor 36149e08aacStbbdev template<typename Body> function_input(graph & g,size_t max_concurrency,Body & body,node_priority_t a_priority)36249e08aacStbbdev function_input( 36349e08aacStbbdev graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority ) 364*a088cfa0SKonstantin Boyarinov : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type()))) 36549e08aacStbbdev , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 36649e08aacStbbdev , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) { 36749e08aacStbbdev } 36849e08aacStbbdev 36949e08aacStbbdev //! Copy constructor function_input(const function_input & src)37049e08aacStbbdev function_input( const function_input& src ) : 37149e08aacStbbdev base_type(src), 37249e08aacStbbdev my_body( src.my_init_body->clone() ), 37349e08aacStbbdev my_init_body(src.my_init_body->clone() ) { 37449e08aacStbbdev } 37549e08aacStbbdev #if __INTEL_COMPILER <= 2021 37649e08aacStbbdev // Suppress superfluous diagnostic about virtual keyword absence in a destructor of an inherited 37749e08aacStbbdev // class while the parent class has the virtual keyword for the destrocutor. 37849e08aacStbbdev virtual 37949e08aacStbbdev #endif ~function_input()38049e08aacStbbdev ~function_input() { 38149e08aacStbbdev delete my_body; 38249e08aacStbbdev delete my_init_body; 38349e08aacStbbdev } 38449e08aacStbbdev 38549e08aacStbbdev template< typename Body > copy_function_object()38649e08aacStbbdev Body copy_function_object() { 38749e08aacStbbdev function_body_type &body_ref = *this->my_body; 38849e08aacStbbdev return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 38949e08aacStbbdev } 39049e08aacStbbdev apply_body_impl(const input_type & i)39149e08aacStbbdev output_type apply_body_impl( const input_type& i) { 39249e08aacStbbdev // There is an extra copied needed to capture the 39349e08aacStbbdev // body execution without the try_put 39449e08aacStbbdev fgt_begin_body( my_body ); 395*a088cfa0SKonstantin Boyarinov output_type v = tbb::detail::invoke(*my_body, i); 39649e08aacStbbdev fgt_end_body( my_body ); 39749e08aacStbbdev return v; 39849e08aacStbbdev } 39949e08aacStbbdev 40049e08aacStbbdev //TODO: consider moving into the base class apply_body_impl_bypass(const input_type & i)40149e08aacStbbdev graph_task* apply_body_impl_bypass( const input_type &i) { 40249e08aacStbbdev output_type v = apply_body_impl(i); 40357f524caSIlya Isaev graph_task* postponed_task = nullptr; 40449e08aacStbbdev if( base_type::my_max_concurrency != 0 ) { 40549e08aacStbbdev postponed_task = base_type::try_get_postponed_task(i); 40657f524caSIlya Isaev __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr); 40749e08aacStbbdev } 40849e08aacStbbdev if( postponed_task ) { 40949e08aacStbbdev // make the task available for other workers since we do not know successors' 41049e08aacStbbdev // execution policy 41149e08aacStbbdev spawn_in_graph_arena(base_type::graph_reference(), *postponed_task); 41249e08aacStbbdev } 41349e08aacStbbdev graph_task* successor_task = successors().try_put_task(v); 41449e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER 41549e08aacStbbdev #pragma warning (push) 41649e08aacStbbdev #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 41749e08aacStbbdev #endif 41849e08aacStbbdev if(has_policy<lightweight, Policy>::value) { 41949e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER 42049e08aacStbbdev #pragma warning (pop) 42149e08aacStbbdev #endif 42249e08aacStbbdev if(!successor_task) { 42349e08aacStbbdev // Return confirmative status since current 42449e08aacStbbdev // node's body has been executed anyway 42549e08aacStbbdev successor_task = SUCCESSFULLY_ENQUEUED; 42649e08aacStbbdev } 42749e08aacStbbdev } 42849e08aacStbbdev return successor_task; 42949e08aacStbbdev } 43049e08aacStbbdev 43149e08aacStbbdev protected: 43249e08aacStbbdev reset_function_input(reset_flags f)43349e08aacStbbdev void reset_function_input(reset_flags f) { 43449e08aacStbbdev base_type::reset_function_input_base(f); 43549e08aacStbbdev if(f & rf_reset_bodies) { 43649e08aacStbbdev function_body_type *tmp = my_init_body->clone(); 43749e08aacStbbdev delete my_body; 43849e08aacStbbdev my_body = tmp; 43949e08aacStbbdev } 44049e08aacStbbdev } 44149e08aacStbbdev 44249e08aacStbbdev function_body_type *my_body; 44349e08aacStbbdev function_body_type *my_init_body; 44449e08aacStbbdev virtual broadcast_cache<output_type > &successors() = 0; 44549e08aacStbbdev 44649e08aacStbbdev }; // function_input 44749e08aacStbbdev 44849e08aacStbbdev 44949e08aacStbbdev // helper templates to clear the successor edges of the output ports of an multifunction_node 45049e08aacStbbdev template<int N> struct clear_element { clear_thisclear_element45149e08aacStbbdev template<typename P> static void clear_this(P &p) { 45249e08aacStbbdev (void)std::get<N-1>(p).successors().clear(); 45349e08aacStbbdev clear_element<N-1>::clear_this(p); 45449e08aacStbbdev } 45549e08aacStbbdev #if TBB_USE_ASSERT this_emptyclear_element45649e08aacStbbdev template<typename P> static bool this_empty(P &p) { 45749e08aacStbbdev if(std::get<N-1>(p).successors().empty()) 45849e08aacStbbdev return clear_element<N-1>::this_empty(p); 45949e08aacStbbdev return false; 46049e08aacStbbdev } 46149e08aacStbbdev #endif 46249e08aacStbbdev }; 46349e08aacStbbdev 46449e08aacStbbdev template<> struct clear_element<1> { 46549e08aacStbbdev template<typename P> static void clear_this(P &p) { 46649e08aacStbbdev (void)std::get<0>(p).successors().clear(); 46749e08aacStbbdev } 46849e08aacStbbdev #if TBB_USE_ASSERT 46949e08aacStbbdev template<typename P> static bool this_empty(P &p) { 47049e08aacStbbdev return std::get<0>(p).successors().empty(); 47149e08aacStbbdev } 47249e08aacStbbdev #endif 47349e08aacStbbdev }; 47449e08aacStbbdev 47549e08aacStbbdev template <typename OutputTuple> 47649e08aacStbbdev struct init_output_ports { 47749e08aacStbbdev template <typename... Args> 47849e08aacStbbdev static OutputTuple call(graph& g, const std::tuple<Args...>&) { 47949e08aacStbbdev return OutputTuple(Args(g)...); 48049e08aacStbbdev } 48149e08aacStbbdev }; // struct init_output_ports 48249e08aacStbbdev 48349e08aacStbbdev //! Implements methods for a function node that takes a type Input as input 48449e08aacStbbdev // and has a tuple of output ports specified. 48549e08aacStbbdev template< typename Input, typename OutputPortSet, typename Policy, typename A> 48649e08aacStbbdev class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > { 48749e08aacStbbdev public: 48849e08aacStbbdev static const int N = std::tuple_size<OutputPortSet>::value; 48949e08aacStbbdev typedef Input input_type; 49049e08aacStbbdev typedef OutputPortSet output_ports_type; 49149e08aacStbbdev typedef multifunction_body<input_type, output_ports_type> multifunction_body_type; 49249e08aacStbbdev typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class; 49349e08aacStbbdev typedef function_input_base<Input, Policy, A, my_class> base_type; 49449e08aacStbbdev typedef function_input_queue<input_type, A> input_queue_type; 49549e08aacStbbdev 49649e08aacStbbdev // constructor 49749e08aacStbbdev template<typename Body> 49849e08aacStbbdev multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority ) 499*a088cfa0SKonstantin Boyarinov : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports))) 50049e08aacStbbdev , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 50149e08aacStbbdev , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) ) 50249e08aacStbbdev , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){ 50349e08aacStbbdev } 50449e08aacStbbdev 50549e08aacStbbdev //! Copy constructor 50649e08aacStbbdev multifunction_input( const multifunction_input& src ) : 50749e08aacStbbdev base_type(src), 50849e08aacStbbdev my_body( src.my_init_body->clone() ), 50949e08aacStbbdev my_init_body(src.my_init_body->clone() ), 51049e08aacStbbdev my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) { 51149e08aacStbbdev } 51249e08aacStbbdev 51349e08aacStbbdev ~multifunction_input() { 51449e08aacStbbdev delete my_body; 51549e08aacStbbdev delete my_init_body; 51649e08aacStbbdev } 51749e08aacStbbdev 51849e08aacStbbdev template< typename Body > 51949e08aacStbbdev Body copy_function_object() { 52049e08aacStbbdev multifunction_body_type &body_ref = *this->my_body; 52149e08aacStbbdev return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr()); 52249e08aacStbbdev } 52349e08aacStbbdev 52449e08aacStbbdev // for multifunction nodes we do not have a single successor as such. So we just tell 52549e08aacStbbdev // the task we were successful. 52649e08aacStbbdev //TODO: consider moving common parts with implementation in function_input into separate function 52749e08aacStbbdev graph_task* apply_body_impl_bypass( const input_type &i ) { 52849e08aacStbbdev fgt_begin_body( my_body ); 52949e08aacStbbdev (*my_body)(i, my_output_ports); 53049e08aacStbbdev fgt_end_body( my_body ); 53157f524caSIlya Isaev graph_task* ttask = nullptr; 53249e08aacStbbdev if(base_type::my_max_concurrency != 0) { 53349e08aacStbbdev ttask = base_type::try_get_postponed_task(i); 53449e08aacStbbdev } 53549e08aacStbbdev return ttask ? ttask : SUCCESSFULLY_ENQUEUED; 53649e08aacStbbdev } 53749e08aacStbbdev 53849e08aacStbbdev output_ports_type &output_ports(){ return my_output_ports; } 53949e08aacStbbdev 54049e08aacStbbdev protected: 54149e08aacStbbdev 54249e08aacStbbdev void reset(reset_flags f) { 54349e08aacStbbdev base_type::reset_function_input_base(f); 54449e08aacStbbdev if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports); 54549e08aacStbbdev if(f & rf_reset_bodies) { 54649e08aacStbbdev multifunction_body_type* tmp = my_init_body->clone(); 54749e08aacStbbdev delete my_body; 54849e08aacStbbdev my_body = tmp; 54949e08aacStbbdev } 55049e08aacStbbdev __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed"); 55149e08aacStbbdev } 55249e08aacStbbdev 55349e08aacStbbdev multifunction_body_type *my_body; 55449e08aacStbbdev multifunction_body_type *my_init_body; 55549e08aacStbbdev output_ports_type my_output_ports; 55649e08aacStbbdev 55749e08aacStbbdev }; // multifunction_input 55849e08aacStbbdev 55949e08aacStbbdev // template to refer to an output port of a multifunction_node 56049e08aacStbbdev template<size_t N, typename MOP> 56149e08aacStbbdev typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) { 56249e08aacStbbdev return std::get<N>(op.output_ports()); 56349e08aacStbbdev } 56449e08aacStbbdev 56549e08aacStbbdev inline void check_task_and_spawn(graph& g, graph_task* t) { 56649e08aacStbbdev if (t && t != SUCCESSFULLY_ENQUEUED) { 56749e08aacStbbdev spawn_in_graph_arena(g, *t); 56849e08aacStbbdev } 56949e08aacStbbdev } 57049e08aacStbbdev 57149e08aacStbbdev // helper structs for split_node 57249e08aacStbbdev template<int N> 57349e08aacStbbdev struct emit_element { 57449e08aacStbbdev template<typename T, typename P> 57549e08aacStbbdev static graph_task* emit_this(graph& g, const T &t, P &p) { 57649e08aacStbbdev // TODO: consider to collect all the tasks in task_list and spawn them all at once 57749e08aacStbbdev graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t)); 57849e08aacStbbdev check_task_and_spawn(g, last_task); 57949e08aacStbbdev return emit_element<N-1>::emit_this(g,t,p); 58049e08aacStbbdev } 58149e08aacStbbdev }; 58249e08aacStbbdev 58349e08aacStbbdev template<> 58449e08aacStbbdev struct emit_element<1> { 58549e08aacStbbdev template<typename T, typename P> 58649e08aacStbbdev static graph_task* emit_this(graph& g, const T &t, P &p) { 58749e08aacStbbdev graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t)); 58849e08aacStbbdev check_task_and_spawn(g, last_task); 58949e08aacStbbdev return SUCCESSFULLY_ENQUEUED; 59049e08aacStbbdev } 59149e08aacStbbdev }; 59249e08aacStbbdev 59349e08aacStbbdev //! Implements methods for an executable node that takes continue_msg as input 59449e08aacStbbdev template< typename Output, typename Policy> 59549e08aacStbbdev class continue_input : public continue_receiver { 59649e08aacStbbdev public: 59749e08aacStbbdev 59849e08aacStbbdev //! The input type of this receiver 59949e08aacStbbdev typedef continue_msg input_type; 60049e08aacStbbdev 60149e08aacStbbdev //! The output type of this receiver 60249e08aacStbbdev typedef Output output_type; 60349e08aacStbbdev typedef function_body<input_type, output_type> function_body_type; 60449e08aacStbbdev typedef continue_input<output_type, Policy> class_type; 60549e08aacStbbdev 60649e08aacStbbdev template< typename Body > 60749e08aacStbbdev continue_input( graph &g, Body& body, node_priority_t a_priority ) 60849e08aacStbbdev : continue_receiver(/*number_of_predecessors=*/0, a_priority) 60949e08aacStbbdev , my_graph_ref(g) 61049e08aacStbbdev , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 61149e08aacStbbdev , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 61249e08aacStbbdev { } 61349e08aacStbbdev 61449e08aacStbbdev template< typename Body > 61549e08aacStbbdev continue_input( graph &g, int number_of_predecessors, 61649e08aacStbbdev Body& body, node_priority_t a_priority ) 61749e08aacStbbdev : continue_receiver( number_of_predecessors, a_priority ) 61849e08aacStbbdev , my_graph_ref(g) 61949e08aacStbbdev , my_body( new function_body_leaf< input_type, output_type, Body>(body) ) 62049e08aacStbbdev , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) 62149e08aacStbbdev { } 62249e08aacStbbdev 62349e08aacStbbdev continue_input( const continue_input& src ) : continue_receiver(src), 62449e08aacStbbdev my_graph_ref(src.my_graph_ref), 62549e08aacStbbdev my_body( src.my_init_body->clone() ), 62649e08aacStbbdev my_init_body( src.my_init_body->clone() ) {} 62749e08aacStbbdev 62849e08aacStbbdev ~continue_input() { 62949e08aacStbbdev delete my_body; 63049e08aacStbbdev delete my_init_body; 63149e08aacStbbdev } 63249e08aacStbbdev 63349e08aacStbbdev template< typename Body > 63449e08aacStbbdev Body copy_function_object() { 63549e08aacStbbdev function_body_type &body_ref = *my_body; 63649e08aacStbbdev return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body(); 63749e08aacStbbdev } 63849e08aacStbbdev 63949e08aacStbbdev void reset_receiver( reset_flags f) override { 64049e08aacStbbdev continue_receiver::reset_receiver(f); 64149e08aacStbbdev if(f & rf_reset_bodies) { 64249e08aacStbbdev function_body_type *tmp = my_init_body->clone(); 64349e08aacStbbdev delete my_body; 64449e08aacStbbdev my_body = tmp; 64549e08aacStbbdev } 64649e08aacStbbdev } 64749e08aacStbbdev 64849e08aacStbbdev protected: 64949e08aacStbbdev 65049e08aacStbbdev graph& my_graph_ref; 65149e08aacStbbdev function_body_type *my_body; 65249e08aacStbbdev function_body_type *my_init_body; 65349e08aacStbbdev 65449e08aacStbbdev virtual broadcast_cache<output_type > &successors() = 0; 65549e08aacStbbdev 65649e08aacStbbdev friend class apply_body_task_bypass< class_type, continue_msg >; 65749e08aacStbbdev 65849e08aacStbbdev //! Applies the body to the provided input 65949e08aacStbbdev graph_task* apply_body_bypass( input_type ) { 66049e08aacStbbdev // There is an extra copied needed to capture the 66149e08aacStbbdev // body execution without the try_put 66249e08aacStbbdev fgt_begin_body( my_body ); 66349e08aacStbbdev output_type v = (*my_body)( continue_msg() ); 66449e08aacStbbdev fgt_end_body( my_body ); 66549e08aacStbbdev return successors().try_put_task( v ); 66649e08aacStbbdev } 66749e08aacStbbdev 66849e08aacStbbdev graph_task* execute() override { 66949e08aacStbbdev if(!is_graph_active(my_graph_ref)) { 67057f524caSIlya Isaev return nullptr; 67149e08aacStbbdev } 67249e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER 67349e08aacStbbdev #pragma warning (push) 67449e08aacStbbdev #pragma warning (disable: 4127) /* suppress conditional expression is constant */ 67549e08aacStbbdev #endif 67649e08aacStbbdev if(has_policy<lightweight, Policy>::value) { 67749e08aacStbbdev #if _MSC_VER && !__INTEL_COMPILER 67849e08aacStbbdev #pragma warning (pop) 67949e08aacStbbdev #endif 68049e08aacStbbdev return apply_body_bypass( continue_msg() ); 68149e08aacStbbdev } 68249e08aacStbbdev else { 68349e08aacStbbdev small_object_allocator allocator{}; 68449e08aacStbbdev typedef apply_body_task_bypass<class_type, continue_msg> task_type; 68549e08aacStbbdev graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority ); 68649e08aacStbbdev graph_reference().reserve_wait(); 68749e08aacStbbdev return t; 68849e08aacStbbdev } 68949e08aacStbbdev } 69049e08aacStbbdev 69149e08aacStbbdev graph& graph_reference() const override { 69249e08aacStbbdev return my_graph_ref; 69349e08aacStbbdev } 69449e08aacStbbdev }; // continue_input 69549e08aacStbbdev 69649e08aacStbbdev //! Implements methods for both executable and function nodes that puts Output to its successors 69749e08aacStbbdev template< typename Output > 69849e08aacStbbdev class function_output : public sender<Output> { 69949e08aacStbbdev public: 70049e08aacStbbdev 70149e08aacStbbdev template<int N> friend struct clear_element; 70249e08aacStbbdev typedef Output output_type; 70349e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type; 70449e08aacStbbdev typedef broadcast_cache<output_type> broadcast_cache_type; 70549e08aacStbbdev 70649e08aacStbbdev function_output(graph& g) : my_successors(this), my_graph_ref(g) {} 70749e08aacStbbdev function_output(const function_output& other) = delete; 70849e08aacStbbdev 70949e08aacStbbdev //! Adds a new successor to this node 71049e08aacStbbdev bool register_successor( successor_type &r ) override { 71149e08aacStbbdev successors().register_successor( r ); 71249e08aacStbbdev return true; 71349e08aacStbbdev } 71449e08aacStbbdev 71549e08aacStbbdev //! Removes a successor from this node 71649e08aacStbbdev bool remove_successor( successor_type &r ) override { 71749e08aacStbbdev successors().remove_successor( r ); 71849e08aacStbbdev return true; 71949e08aacStbbdev } 72049e08aacStbbdev 72149e08aacStbbdev broadcast_cache_type &successors() { return my_successors; } 72249e08aacStbbdev 72349e08aacStbbdev graph& graph_reference() const { return my_graph_ref; } 72449e08aacStbbdev protected: 72549e08aacStbbdev broadcast_cache_type my_successors; 72649e08aacStbbdev graph& my_graph_ref; 72749e08aacStbbdev }; // function_output 72849e08aacStbbdev 72949e08aacStbbdev template< typename Output > 73049e08aacStbbdev class multifunction_output : public function_output<Output> { 73149e08aacStbbdev public: 73249e08aacStbbdev typedef Output output_type; 73349e08aacStbbdev typedef function_output<output_type> base_type; 73449e08aacStbbdev using base_type::my_successors; 73549e08aacStbbdev 73649e08aacStbbdev multifunction_output(graph& g) : base_type(g) {} 73749e08aacStbbdev multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {} 73849e08aacStbbdev 73949e08aacStbbdev bool try_put(const output_type &i) { 74049e08aacStbbdev graph_task *res = try_put_task(i); 74149e08aacStbbdev if( !res ) return false; 74249e08aacStbbdev if( res != SUCCESSFULLY_ENQUEUED ) { 74349e08aacStbbdev // wrapping in task_arena::execute() is not needed since the method is called from 74449e08aacStbbdev // inside task::execute() 74549e08aacStbbdev spawn_in_graph_arena(graph_reference(), *res); 74649e08aacStbbdev } 74749e08aacStbbdev return true; 74849e08aacStbbdev } 74949e08aacStbbdev 75049e08aacStbbdev using base_type::graph_reference; 75149e08aacStbbdev 75249e08aacStbbdev protected: 75349e08aacStbbdev 75449e08aacStbbdev graph_task* try_put_task(const output_type &i) { 75549e08aacStbbdev return my_successors.try_put_task(i); 75649e08aacStbbdev } 75749e08aacStbbdev 75849e08aacStbbdev template <int N> friend struct emit_element; 75949e08aacStbbdev 76049e08aacStbbdev }; // multifunction_output 76149e08aacStbbdev 76249e08aacStbbdev //composite_node 76349e08aacStbbdev template<typename CompositeType> 76449e08aacStbbdev void add_nodes_impl(CompositeType*, bool) {} 76549e08aacStbbdev 76649e08aacStbbdev template< typename CompositeType, typename NodeType1, typename... NodeTypes > 76749e08aacStbbdev void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) { 76849e08aacStbbdev void *addr = const_cast<NodeType1 *>(&n1); 76949e08aacStbbdev 77049e08aacStbbdev fgt_alias_port(c_node, addr, visible); 77149e08aacStbbdev add_nodes_impl(c_node, visible, n...); 77249e08aacStbbdev } 77349e08aacStbbdev 77449e08aacStbbdev #endif // __TBB__flow_graph_node_impl_H 775