149e08aacStbbdev /* 2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation 349e08aacStbbdev 449e08aacStbbdev Licensed under the Apache License, Version 2.0 (the "License"); 549e08aacStbbdev you may not use this file except in compliance with the License. 649e08aacStbbdev You may obtain a copy of the License at 749e08aacStbbdev 849e08aacStbbdev http://www.apache.org/licenses/LICENSE-2.0 949e08aacStbbdev 1049e08aacStbbdev Unless required by applicable law or agreed to in writing, software 1149e08aacStbbdev distributed under the License is distributed on an "AS IS" BASIS, 1249e08aacStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1349e08aacStbbdev See the License for the specific language governing permissions and 1449e08aacStbbdev limitations under the License. 1549e08aacStbbdev */ 1649e08aacStbbdev 1749e08aacStbbdev #ifndef __TBB__flow_graph_cache_impl_H 1849e08aacStbbdev #define __TBB__flow_graph_cache_impl_H 1949e08aacStbbdev 2049e08aacStbbdev #ifndef __TBB_flow_graph_H 2149e08aacStbbdev #error Do not #include this internal file directly; use public TBB headers instead. 2249e08aacStbbdev #endif 2349e08aacStbbdev 2449e08aacStbbdev // included in namespace tbb::detail::d1 (in flow_graph.h) 2549e08aacStbbdev 2649e08aacStbbdev //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock. 2749e08aacStbbdev template< typename T, typename M=spin_mutex > 2849e08aacStbbdev class node_cache { 2949e08aacStbbdev public: 3049e08aacStbbdev 3149e08aacStbbdev typedef size_t size_type; 3249e08aacStbbdev empty()3349e08aacStbbdev bool empty() { 3449e08aacStbbdev typename mutex_type::scoped_lock lock( my_mutex ); 3549e08aacStbbdev return internal_empty(); 3649e08aacStbbdev } 3749e08aacStbbdev add(T & n)3849e08aacStbbdev void add( T &n ) { 3949e08aacStbbdev typename mutex_type::scoped_lock lock( my_mutex ); 4049e08aacStbbdev internal_push(n); 4149e08aacStbbdev } 4249e08aacStbbdev remove(T & n)4349e08aacStbbdev void remove( T &n ) { 4449e08aacStbbdev typename mutex_type::scoped_lock lock( my_mutex ); 4549e08aacStbbdev for ( size_t i = internal_size(); i != 0; --i ) { 4649e08aacStbbdev T &s = internal_pop(); 4749e08aacStbbdev if ( &s == &n ) 4849e08aacStbbdev break; // only remove one predecessor per request 4949e08aacStbbdev internal_push(s); 5049e08aacStbbdev } 5149e08aacStbbdev } 5249e08aacStbbdev clear()5349e08aacStbbdev void clear() { 5449e08aacStbbdev while( !my_q.empty()) (void)my_q.pop(); 5549e08aacStbbdev } 5649e08aacStbbdev 5749e08aacStbbdev protected: 5849e08aacStbbdev 5949e08aacStbbdev typedef M mutex_type; 6049e08aacStbbdev mutex_type my_mutex; 6149e08aacStbbdev std::queue< T * > my_q; 6249e08aacStbbdev 6349e08aacStbbdev // Assumes lock is held internal_empty()6449e08aacStbbdev inline bool internal_empty( ) { 6549e08aacStbbdev return my_q.empty(); 6649e08aacStbbdev } 6749e08aacStbbdev 6849e08aacStbbdev // Assumes lock is held internal_size()6949e08aacStbbdev inline size_type internal_size( ) { 7049e08aacStbbdev return my_q.size(); 7149e08aacStbbdev } 7249e08aacStbbdev 7349e08aacStbbdev // Assumes lock is held internal_push(T & n)7449e08aacStbbdev inline void internal_push( T &n ) { 7549e08aacStbbdev my_q.push(&n); 7649e08aacStbbdev } 7749e08aacStbbdev 7849e08aacStbbdev // Assumes lock is held internal_pop()7949e08aacStbbdev inline T &internal_pop() { 8049e08aacStbbdev T *v = my_q.front(); 8149e08aacStbbdev my_q.pop(); 8249e08aacStbbdev return *v; 8349e08aacStbbdev } 8449e08aacStbbdev 8549e08aacStbbdev }; 8649e08aacStbbdev 8749e08aacStbbdev //! A cache of predecessors that only supports try_get 8849e08aacStbbdev template< typename T, typename M=spin_mutex > 8949e08aacStbbdev class predecessor_cache : public node_cache< sender<T>, M > { 9049e08aacStbbdev public: 9149e08aacStbbdev typedef M mutex_type; 9249e08aacStbbdev typedef T output_type; 9349e08aacStbbdev typedef sender<output_type> predecessor_type; 9449e08aacStbbdev typedef receiver<output_type> successor_type; 9549e08aacStbbdev predecessor_cache(successor_type * owner)9649e08aacStbbdev predecessor_cache( successor_type* owner ) : my_owner( owner ) { 9749e08aacStbbdev __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." ); 9849e08aacStbbdev // Do not work with the passed pointer here as it may not be fully initialized yet 9949e08aacStbbdev } 10049e08aacStbbdev get_item(output_type & v)10149e08aacStbbdev bool get_item( output_type& v ) { 10249e08aacStbbdev 10349e08aacStbbdev bool msg = false; 10449e08aacStbbdev 10549e08aacStbbdev do { 10649e08aacStbbdev predecessor_type *src; 10749e08aacStbbdev { 10849e08aacStbbdev typename mutex_type::scoped_lock lock(this->my_mutex); 10949e08aacStbbdev if ( this->internal_empty() ) { 11049e08aacStbbdev break; 11149e08aacStbbdev } 11249e08aacStbbdev src = &this->internal_pop(); 11349e08aacStbbdev } 11449e08aacStbbdev 11549e08aacStbbdev // Try to get from this sender 11649e08aacStbbdev msg = src->try_get( v ); 11749e08aacStbbdev 11849e08aacStbbdev if (msg == false) { 11949e08aacStbbdev // Relinquish ownership of the edge 12049e08aacStbbdev register_successor(*src, *my_owner); 12149e08aacStbbdev } else { 12249e08aacStbbdev // Retain ownership of the edge 12349e08aacStbbdev this->add(*src); 12449e08aacStbbdev } 12549e08aacStbbdev } while ( msg == false ); 12649e08aacStbbdev return msg; 12749e08aacStbbdev } 12849e08aacStbbdev 12949e08aacStbbdev // If we are removing arcs (rf_clear_edges), call clear() rather than reset(). reset()13049e08aacStbbdev void reset() { 13149e08aacStbbdev for(;;) { 13249e08aacStbbdev predecessor_type *src; 13349e08aacStbbdev { 13449e08aacStbbdev if (this->internal_empty()) break; 13549e08aacStbbdev src = &this->internal_pop(); 13649e08aacStbbdev } 13749e08aacStbbdev register_successor(*src, *my_owner); 13849e08aacStbbdev } 13949e08aacStbbdev } 14049e08aacStbbdev 14149e08aacStbbdev protected: 14249e08aacStbbdev successor_type* my_owner; 14349e08aacStbbdev }; 14449e08aacStbbdev 14549e08aacStbbdev //! An cache of predecessors that supports requests and reservations 14649e08aacStbbdev template< typename T, typename M=spin_mutex > 14749e08aacStbbdev class reservable_predecessor_cache : public predecessor_cache< T, M > { 14849e08aacStbbdev public: 14949e08aacStbbdev typedef M mutex_type; 15049e08aacStbbdev typedef T output_type; 15149e08aacStbbdev typedef sender<T> predecessor_type; 15249e08aacStbbdev typedef receiver<T> successor_type; 15349e08aacStbbdev reservable_predecessor_cache(successor_type * owner)15449e08aacStbbdev reservable_predecessor_cache( successor_type* owner ) 1558b6f831cStbbdev : predecessor_cache<T,M>(owner), reserved_src(nullptr) 15649e08aacStbbdev { 15749e08aacStbbdev // Do not work with the passed pointer here as it may not be fully initialized yet 15849e08aacStbbdev } 15949e08aacStbbdev try_reserve(output_type & v)1608b6f831cStbbdev bool try_reserve( output_type &v ) { 16149e08aacStbbdev bool msg = false; 16249e08aacStbbdev 16349e08aacStbbdev do { 1648b6f831cStbbdev predecessor_type* pred = nullptr; 16549e08aacStbbdev { 16649e08aacStbbdev typename mutex_type::scoped_lock lock(this->my_mutex); 1678b6f831cStbbdev if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() ) 16849e08aacStbbdev return false; 16949e08aacStbbdev 1708b6f831cStbbdev pred = &this->internal_pop(); 1718b6f831cStbbdev reserved_src.store(pred, std::memory_order_relaxed); 17249e08aacStbbdev } 17349e08aacStbbdev 17449e08aacStbbdev // Try to get from this sender 1758b6f831cStbbdev msg = pred->try_reserve( v ); 17649e08aacStbbdev 17749e08aacStbbdev if (msg == false) { 17849e08aacStbbdev typename mutex_type::scoped_lock lock(this->my_mutex); 17949e08aacStbbdev // Relinquish ownership of the edge 1808b6f831cStbbdev register_successor( *pred, *this->my_owner ); 1818b6f831cStbbdev reserved_src.store(nullptr, std::memory_order_relaxed); 18249e08aacStbbdev } else { 18349e08aacStbbdev // Retain ownership of the edge 1848b6f831cStbbdev this->add( *pred); 18549e08aacStbbdev } 18649e08aacStbbdev } while ( msg == false ); 18749e08aacStbbdev 18849e08aacStbbdev return msg; 18949e08aacStbbdev } 19049e08aacStbbdev try_release()1918b6f831cStbbdev bool try_release() { 1928b6f831cStbbdev reserved_src.load(std::memory_order_relaxed)->try_release(); 1938b6f831cStbbdev reserved_src.store(nullptr, std::memory_order_relaxed); 19449e08aacStbbdev return true; 19549e08aacStbbdev } 19649e08aacStbbdev try_consume()1978b6f831cStbbdev bool try_consume() { 1988b6f831cStbbdev reserved_src.load(std::memory_order_relaxed)->try_consume(); 1998b6f831cStbbdev reserved_src.store(nullptr, std::memory_order_relaxed); 20049e08aacStbbdev return true; 20149e08aacStbbdev } 20249e08aacStbbdev reset()20349e08aacStbbdev void reset() { 2048b6f831cStbbdev reserved_src.store(nullptr, std::memory_order_relaxed); 20549e08aacStbbdev predecessor_cache<T, M>::reset(); 20649e08aacStbbdev } 20749e08aacStbbdev clear()20849e08aacStbbdev void clear() { 2098b6f831cStbbdev reserved_src.store(nullptr, std::memory_order_relaxed); 21049e08aacStbbdev predecessor_cache<T, M>::clear(); 21149e08aacStbbdev } 21249e08aacStbbdev 21349e08aacStbbdev private: 2148b6f831cStbbdev std::atomic<predecessor_type*> reserved_src; 21549e08aacStbbdev }; 21649e08aacStbbdev 21749e08aacStbbdev 21849e08aacStbbdev //! An abstract cache of successors 21949e08aacStbbdev template<typename T, typename M=spin_rw_mutex > 22049e08aacStbbdev class successor_cache : no_copy { 22149e08aacStbbdev protected: 22249e08aacStbbdev 22349e08aacStbbdev typedef M mutex_type; 22449e08aacStbbdev mutex_type my_mutex; 22549e08aacStbbdev 22649e08aacStbbdev typedef receiver<T> successor_type; 22749e08aacStbbdev typedef receiver<T>* pointer_type; 22849e08aacStbbdev typedef sender<T> owner_type; 22949e08aacStbbdev // TODO revamp: introduce heapified collection of successors for strict priorities 23049e08aacStbbdev typedef std::list< pointer_type > successors_type; 23149e08aacStbbdev successors_type my_successors; 23249e08aacStbbdev 23349e08aacStbbdev owner_type* my_owner; 23449e08aacStbbdev 23549e08aacStbbdev public: successor_cache(owner_type * owner)23649e08aacStbbdev successor_cache( owner_type* owner ) : my_owner(owner) { 23749e08aacStbbdev // Do not work with the passed pointer here as it may not be fully initialized yet 23849e08aacStbbdev } 23949e08aacStbbdev ~successor_cache()24049e08aacStbbdev virtual ~successor_cache() {} 24149e08aacStbbdev register_successor(successor_type & r)24249e08aacStbbdev void register_successor( successor_type& r ) { 24349e08aacStbbdev typename mutex_type::scoped_lock l(my_mutex, true); 24449e08aacStbbdev if( r.priority() != no_priority ) 24549e08aacStbbdev my_successors.push_front( &r ); 24649e08aacStbbdev else 24749e08aacStbbdev my_successors.push_back( &r ); 24849e08aacStbbdev } 24949e08aacStbbdev remove_successor(successor_type & r)25049e08aacStbbdev void remove_successor( successor_type& r ) { 25149e08aacStbbdev typename mutex_type::scoped_lock l(my_mutex, true); 25249e08aacStbbdev for ( typename successors_type::iterator i = my_successors.begin(); 25349e08aacStbbdev i != my_successors.end(); ++i ) { 25449e08aacStbbdev if ( *i == & r ) { 25549e08aacStbbdev my_successors.erase(i); 25649e08aacStbbdev break; 25749e08aacStbbdev } 25849e08aacStbbdev } 25949e08aacStbbdev } 26049e08aacStbbdev empty()26149e08aacStbbdev bool empty() { 26249e08aacStbbdev typename mutex_type::scoped_lock l(my_mutex, false); 26349e08aacStbbdev return my_successors.empty(); 26449e08aacStbbdev } 26549e08aacStbbdev clear()26649e08aacStbbdev void clear() { 26749e08aacStbbdev my_successors.clear(); 26849e08aacStbbdev } 26949e08aacStbbdev 27049e08aacStbbdev virtual graph_task* try_put_task( const T& t ) = 0; 27149e08aacStbbdev }; // successor_cache<T> 27249e08aacStbbdev 27349e08aacStbbdev //! An abstract cache of successors, specialized to continue_msg 27449e08aacStbbdev template<typename M> 27549e08aacStbbdev class successor_cache< continue_msg, M > : no_copy { 27649e08aacStbbdev protected: 27749e08aacStbbdev 27849e08aacStbbdev typedef M mutex_type; 27949e08aacStbbdev mutex_type my_mutex; 28049e08aacStbbdev 28149e08aacStbbdev typedef receiver<continue_msg> successor_type; 28249e08aacStbbdev typedef receiver<continue_msg>* pointer_type; 28349e08aacStbbdev typedef sender<continue_msg> owner_type; 28449e08aacStbbdev typedef std::list< pointer_type > successors_type; 28549e08aacStbbdev successors_type my_successors; 28649e08aacStbbdev owner_type* my_owner; 28749e08aacStbbdev 28849e08aacStbbdev public: successor_cache(sender<continue_msg> * owner)28949e08aacStbbdev successor_cache( sender<continue_msg>* owner ) : my_owner(owner) { 29049e08aacStbbdev // Do not work with the passed pointer here as it may not be fully initialized yet 29149e08aacStbbdev } 29249e08aacStbbdev ~successor_cache()29349e08aacStbbdev virtual ~successor_cache() {} 29449e08aacStbbdev register_successor(successor_type & r)29549e08aacStbbdev void register_successor( successor_type& r ) { 29649e08aacStbbdev typename mutex_type::scoped_lock l(my_mutex, true); 29749e08aacStbbdev if( r.priority() != no_priority ) 29849e08aacStbbdev my_successors.push_front( &r ); 29949e08aacStbbdev else 30049e08aacStbbdev my_successors.push_back( &r ); 30149e08aacStbbdev __TBB_ASSERT( my_owner, "Cache of successors must have an owner." ); 30249e08aacStbbdev if ( r.is_continue_receiver() ) { 30349e08aacStbbdev r.register_predecessor( *my_owner ); 30449e08aacStbbdev } 30549e08aacStbbdev } 30649e08aacStbbdev remove_successor(successor_type & r)30749e08aacStbbdev void remove_successor( successor_type& r ) { 30849e08aacStbbdev typename mutex_type::scoped_lock l(my_mutex, true); 30949e08aacStbbdev for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) { 31049e08aacStbbdev if ( *i == &r ) { 31149e08aacStbbdev __TBB_ASSERT(my_owner, "Cache of successors must have an owner."); 31249e08aacStbbdev // TODO: check if we need to test for continue_receiver before removing from r. 31349e08aacStbbdev r.remove_predecessor( *my_owner ); 31449e08aacStbbdev my_successors.erase(i); 31549e08aacStbbdev break; 31649e08aacStbbdev } 31749e08aacStbbdev } 31849e08aacStbbdev } 31949e08aacStbbdev empty()32049e08aacStbbdev bool empty() { 32149e08aacStbbdev typename mutex_type::scoped_lock l(my_mutex, false); 32249e08aacStbbdev return my_successors.empty(); 32349e08aacStbbdev } 32449e08aacStbbdev clear()32549e08aacStbbdev void clear() { 32649e08aacStbbdev my_successors.clear(); 32749e08aacStbbdev } 32849e08aacStbbdev 32949e08aacStbbdev virtual graph_task* try_put_task( const continue_msg& t ) = 0; 33049e08aacStbbdev }; // successor_cache< continue_msg > 33149e08aacStbbdev 33249e08aacStbbdev //! A cache of successors that are broadcast to 33349e08aacStbbdev template<typename T, typename M=spin_rw_mutex> 33449e08aacStbbdev class broadcast_cache : public successor_cache<T, M> { 33549e08aacStbbdev typedef successor_cache<T, M> base_type; 33649e08aacStbbdev typedef M mutex_type; 33749e08aacStbbdev typedef typename successor_cache<T,M>::successors_type successors_type; 33849e08aacStbbdev 33949e08aacStbbdev public: 34049e08aacStbbdev broadcast_cache(typename base_type::owner_type * owner)34149e08aacStbbdev broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) { 34249e08aacStbbdev // Do not work with the passed pointer here as it may not be fully initialized yet 34349e08aacStbbdev } 34449e08aacStbbdev 34549e08aacStbbdev // as above, but call try_put_task instead, and return the last task we received (if any) try_put_task(const T & t)34649e08aacStbbdev graph_task* try_put_task( const T &t ) override { 34749e08aacStbbdev graph_task * last_task = nullptr; 34849e08aacStbbdev typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 34949e08aacStbbdev typename successors_type::iterator i = this->my_successors.begin(); 35049e08aacStbbdev while ( i != this->my_successors.end() ) { 35149e08aacStbbdev graph_task *new_task = (*i)->try_put_task(t); 35249e08aacStbbdev // workaround for icc bug 35349e08aacStbbdev graph& graph_ref = (*i)->graph_reference(); 35449e08aacStbbdev last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary 35549e08aacStbbdev if(new_task) { 35649e08aacStbbdev ++i; 35749e08aacStbbdev } 35849e08aacStbbdev else { // failed 35949e08aacStbbdev if ( (*i)->register_predecessor(*this->my_owner) ) { 36049e08aacStbbdev i = this->my_successors.erase(i); 36149e08aacStbbdev } else { 36249e08aacStbbdev ++i; 36349e08aacStbbdev } 36449e08aacStbbdev } 36549e08aacStbbdev } 36649e08aacStbbdev return last_task; 36749e08aacStbbdev } 36849e08aacStbbdev 36949e08aacStbbdev // call try_put_task and return list of received tasks gather_successful_try_puts(const T & t,graph_task_list & tasks)37049e08aacStbbdev bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) { 37149e08aacStbbdev bool is_at_least_one_put_successful = false; 37249e08aacStbbdev typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 37349e08aacStbbdev typename successors_type::iterator i = this->my_successors.begin(); 37449e08aacStbbdev while ( i != this->my_successors.end() ) { 37549e08aacStbbdev graph_task * new_task = (*i)->try_put_task(t); 37649e08aacStbbdev if(new_task) { 37749e08aacStbbdev ++i; 37849e08aacStbbdev if(new_task != SUCCESSFULLY_ENQUEUED) { 37949e08aacStbbdev tasks.push_back(*new_task); 38049e08aacStbbdev } 38149e08aacStbbdev is_at_least_one_put_successful = true; 38249e08aacStbbdev } 38349e08aacStbbdev else { // failed 38449e08aacStbbdev if ( (*i)->register_predecessor(*this->my_owner) ) { 38549e08aacStbbdev i = this->my_successors.erase(i); 38649e08aacStbbdev } else { 38749e08aacStbbdev ++i; 38849e08aacStbbdev } 38949e08aacStbbdev } 39049e08aacStbbdev } 39149e08aacStbbdev return is_at_least_one_put_successful; 39249e08aacStbbdev } 39349e08aacStbbdev }; 39449e08aacStbbdev 39549e08aacStbbdev //! A cache of successors that are put in a round-robin fashion 39649e08aacStbbdev template<typename T, typename M=spin_rw_mutex > 39749e08aacStbbdev class round_robin_cache : public successor_cache<T, M> { 39849e08aacStbbdev typedef successor_cache<T, M> base_type; 39949e08aacStbbdev typedef size_t size_type; 40049e08aacStbbdev typedef M mutex_type; 40149e08aacStbbdev typedef typename successor_cache<T,M>::successors_type successors_type; 40249e08aacStbbdev 40349e08aacStbbdev public: 40449e08aacStbbdev round_robin_cache(typename base_type::owner_type * owner)40549e08aacStbbdev round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) { 40649e08aacStbbdev // Do not work with the passed pointer here as it may not be fully initialized yet 40749e08aacStbbdev } 40849e08aacStbbdev size()40949e08aacStbbdev size_type size() { 41049e08aacStbbdev typename mutex_type::scoped_lock l(this->my_mutex, false); 41149e08aacStbbdev return this->my_successors.size(); 41249e08aacStbbdev } 41349e08aacStbbdev try_put_task(const T & t)41449e08aacStbbdev graph_task* try_put_task( const T &t ) override { 41549e08aacStbbdev typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true); 41649e08aacStbbdev typename successors_type::iterator i = this->my_successors.begin(); 41749e08aacStbbdev while ( i != this->my_successors.end() ) { 41849e08aacStbbdev graph_task* new_task = (*i)->try_put_task(t); 41949e08aacStbbdev if ( new_task ) { 42049e08aacStbbdev return new_task; 42149e08aacStbbdev } else { 42249e08aacStbbdev if ( (*i)->register_predecessor(*this->my_owner) ) { 42349e08aacStbbdev i = this->my_successors.erase(i); 42449e08aacStbbdev } 42549e08aacStbbdev else { 42649e08aacStbbdev ++i; 42749e08aacStbbdev } 42849e08aacStbbdev } 42949e08aacStbbdev } 43057f524caSIlya Isaev return nullptr; 43149e08aacStbbdev } 43249e08aacStbbdev }; 43349e08aacStbbdev 43449e08aacStbbdev #endif // __TBB__flow_graph_cache_impl_H 435