xref: /oneTBB/src/tbb/task_stream.h (revision bc48c4a0)
151c0b2f7Stbbdev /*
2c21e688aSSergey Zheltov     Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #ifndef _TBB_task_stream_H
1851c0b2f7Stbbdev #define _TBB_task_stream_H
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev //! This file is a possible future replacement for the task_stream class implemented in
2151c0b2f7Stbbdev //! task_stream.h. It refactors the code and extends task_stream capabilities by moving lane
2251c0b2f7Stbbdev //! management during operations on caller side. Despite the fact that new implementation should not
2351c0b2f7Stbbdev //! affect performance of the original task stream, analysis on this subject was not made at the
2451c0b2f7Stbbdev //! time it was developed. In addition, it is not clearly seen at the moment that this container
2551c0b2f7Stbbdev //! would be suitable for critical tasks due to linear time complexity on its operations.
2651c0b2f7Stbbdev 
2749e08aacStbbdev #include "oneapi/tbb/detail/_utils.h"
2849e08aacStbbdev #include "oneapi/tbb/cache_aligned_allocator.h"
294523a761Stbbdev #include "oneapi/tbb/mutex.h"
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev #include "scheduler_common.h"
3251c0b2f7Stbbdev #include "misc.h" // for FastRandom
3351c0b2f7Stbbdev 
3451c0b2f7Stbbdev #include <deque>
3551c0b2f7Stbbdev #include <climits>
3651c0b2f7Stbbdev #include <atomic>
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev namespace tbb {
3951c0b2f7Stbbdev namespace detail {
4051c0b2f7Stbbdev namespace r1 {
4151c0b2f7Stbbdev 
4251c0b2f7Stbbdev //! Essentially, this is just a pair of a queue and a mutex to protect the queue.
4351c0b2f7Stbbdev /** The reason std::pair is not used is that the code would look less clean
4451c0b2f7Stbbdev     if field names were replaced with 'first' and 'second'. **/
4551c0b2f7Stbbdev template< typename T, typename mutex_t >
alignas(max_nfs_size)4651c0b2f7Stbbdev struct alignas(max_nfs_size) queue_and_mutex {
4751c0b2f7Stbbdev     typedef std::deque< T, cache_aligned_allocator<T> > queue_base_t;
4851c0b2f7Stbbdev 
4951c0b2f7Stbbdev     queue_base_t my_queue{};
5051c0b2f7Stbbdev     mutex_t      my_mutex{};
5151c0b2f7Stbbdev };
5251c0b2f7Stbbdev 
5351c0b2f7Stbbdev using population_t = uintptr_t;
5451c0b2f7Stbbdev const population_t one = 1;
5551c0b2f7Stbbdev 
set_one_bit(std::atomic<population_t> & dest,int pos)5651c0b2f7Stbbdev inline void set_one_bit( std::atomic<population_t>& dest, int pos ) {
5757f524caSIlya Isaev     __TBB_ASSERT( pos>=0, nullptr);
5857f524caSIlya Isaev     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
5951c0b2f7Stbbdev     dest.fetch_or( one<<pos );
6051c0b2f7Stbbdev }
6151c0b2f7Stbbdev 
clear_one_bit(std::atomic<population_t> & dest,int pos)6251c0b2f7Stbbdev inline void clear_one_bit( std::atomic<population_t>& dest, int pos ) {
6357f524caSIlya Isaev     __TBB_ASSERT( pos>=0, nullptr);
6457f524caSIlya Isaev     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
6551c0b2f7Stbbdev     dest.fetch_and( ~(one<<pos) );
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev 
is_bit_set(population_t val,int pos)6851c0b2f7Stbbdev inline bool is_bit_set( population_t val, int pos ) {
6957f524caSIlya Isaev     __TBB_ASSERT( pos>=0, nullptr);
7057f524caSIlya Isaev     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
7151c0b2f7Stbbdev     return (val & (one<<pos)) != 0;
7251c0b2f7Stbbdev }
7351c0b2f7Stbbdev 
7451c0b2f7Stbbdev struct random_lane_selector :
7551c0b2f7Stbbdev #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
7651c0b2f7Stbbdev         no_assign
7751c0b2f7Stbbdev #else
7851c0b2f7Stbbdev         no_copy
7951c0b2f7Stbbdev #endif
8051c0b2f7Stbbdev {
random_lane_selectorrandom_lane_selector8151c0b2f7Stbbdev     random_lane_selector( FastRandom& random ) : my_random( random ) {}
operatorrandom_lane_selector8251c0b2f7Stbbdev     unsigned operator()( unsigned out_of ) const {
8351c0b2f7Stbbdev         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
8451c0b2f7Stbbdev         return my_random.get() & (out_of-1);
8551c0b2f7Stbbdev     }
8651c0b2f7Stbbdev private:
8751c0b2f7Stbbdev     FastRandom& my_random;
8851c0b2f7Stbbdev };
8951c0b2f7Stbbdev 
9051c0b2f7Stbbdev struct lane_selector_base :
9151c0b2f7Stbbdev #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
9251c0b2f7Stbbdev         no_assign
9351c0b2f7Stbbdev #else
9451c0b2f7Stbbdev         no_copy
9551c0b2f7Stbbdev #endif
9651c0b2f7Stbbdev {
9751c0b2f7Stbbdev     unsigned& my_previous;
lane_selector_baselane_selector_base9851c0b2f7Stbbdev     lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
9951c0b2f7Stbbdev };
10051c0b2f7Stbbdev 
10151c0b2f7Stbbdev struct subsequent_lane_selector : lane_selector_base {
subsequent_lane_selectorsubsequent_lane_selector10251c0b2f7Stbbdev     subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorsubsequent_lane_selector10351c0b2f7Stbbdev     unsigned operator()( unsigned out_of ) const {
10451c0b2f7Stbbdev         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
10551c0b2f7Stbbdev         return (++my_previous &= out_of-1);
10651c0b2f7Stbbdev     }
10751c0b2f7Stbbdev };
10851c0b2f7Stbbdev 
10951c0b2f7Stbbdev struct preceding_lane_selector : lane_selector_base {
preceding_lane_selectorpreceding_lane_selector11051c0b2f7Stbbdev     preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorpreceding_lane_selector11151c0b2f7Stbbdev     unsigned operator()( unsigned out_of ) const {
11251c0b2f7Stbbdev         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
11351c0b2f7Stbbdev         return (--my_previous &= (out_of-1));
11451c0b2f7Stbbdev     }
11551c0b2f7Stbbdev };
11651c0b2f7Stbbdev 
11751c0b2f7Stbbdev //! Specializes from which side of the underlying container elements are retrieved. Method must be
11851c0b2f7Stbbdev //! called under corresponding mutex locked.
11951c0b2f7Stbbdev template<task_stream_accessor_type accessor>
12051c0b2f7Stbbdev class task_stream_accessor : no_copy {
12151c0b2f7Stbbdev protected:
1224523a761Stbbdev     using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)12351c0b2f7Stbbdev     d1::task* get_item( lane_t::queue_base_t& queue ) {
12451c0b2f7Stbbdev         d1::task* result = queue.front();
12551c0b2f7Stbbdev         queue.pop_front();
12651c0b2f7Stbbdev         return result;
12751c0b2f7Stbbdev     }
12851c0b2f7Stbbdev };
12951c0b2f7Stbbdev 
13051c0b2f7Stbbdev template<>
13151c0b2f7Stbbdev class task_stream_accessor< back_nonnull_accessor > : no_copy {
13251c0b2f7Stbbdev protected:
1334523a761Stbbdev     using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)13451c0b2f7Stbbdev     d1::task* get_item( lane_t::queue_base_t& queue ) {
13551c0b2f7Stbbdev         d1::task* result = nullptr;
13651c0b2f7Stbbdev         __TBB_ASSERT(!queue.empty(), nullptr);
13751c0b2f7Stbbdev         // Isolated task can put zeros in queue see look_specific
13851c0b2f7Stbbdev         do {
13951c0b2f7Stbbdev             result = queue.back();
14051c0b2f7Stbbdev             queue.pop_back();
14151c0b2f7Stbbdev         } while ( !result && !queue.empty() );
14251c0b2f7Stbbdev         return result;
14351c0b2f7Stbbdev     }
14451c0b2f7Stbbdev };
14551c0b2f7Stbbdev 
14651c0b2f7Stbbdev //! The container for "fairness-oriented" aka "enqueued" tasks.
14751c0b2f7Stbbdev template<task_stream_accessor_type accessor>
14851c0b2f7Stbbdev class task_stream : public task_stream_accessor< accessor > {
14951c0b2f7Stbbdev     using lane_t = typename task_stream_accessor<accessor>::lane_t;
15051c0b2f7Stbbdev     std::atomic<population_t> population{};
15151c0b2f7Stbbdev     lane_t* lanes{nullptr};
15251c0b2f7Stbbdev     unsigned N{};
15351c0b2f7Stbbdev 
15451c0b2f7Stbbdev public:
15551c0b2f7Stbbdev     task_stream() = default;
15651c0b2f7Stbbdev 
initialize(unsigned n_lanes)15751c0b2f7Stbbdev     void initialize( unsigned n_lanes ) {
15851c0b2f7Stbbdev         const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
15951c0b2f7Stbbdev 
16051c0b2f7Stbbdev         N = n_lanes >= max_lanes ? max_lanes : n_lanes > 2 ? 1 << (tbb::detail::log2(n_lanes - 1) + 1) : 2;
161b15aabb3Stbbdev         __TBB_ASSERT( N == max_lanes || (N >= n_lanes && ((N - 1) & N) == 0), "number of lanes miscalculated" );
16257f524caSIlya Isaev         __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, nullptr);
16351c0b2f7Stbbdev         lanes = static_cast<lane_t*>(cache_aligned_allocate(sizeof(lane_t) * N));
16451c0b2f7Stbbdev         for (unsigned i = 0; i < N; ++i) {
16551c0b2f7Stbbdev             new (lanes + i) lane_t;
16651c0b2f7Stbbdev         }
16757f524caSIlya Isaev         __TBB_ASSERT( !population.load(std::memory_order_relaxed), nullptr);
16851c0b2f7Stbbdev     }
16951c0b2f7Stbbdev 
~task_stream()17051c0b2f7Stbbdev     ~task_stream() {
171b15aabb3Stbbdev         if (lanes) {
17251c0b2f7Stbbdev             for (unsigned i = 0; i < N; ++i) {
17351c0b2f7Stbbdev                 lanes[i].~lane_t();
17451c0b2f7Stbbdev             }
17551c0b2f7Stbbdev             cache_aligned_deallocate(lanes);
17651c0b2f7Stbbdev         }
177b15aabb3Stbbdev     }
17851c0b2f7Stbbdev 
17951c0b2f7Stbbdev     //! Push a task into a lane. Lane selection is performed by passed functor.
18051c0b2f7Stbbdev     template<typename lane_selector_t>
push(d1::task * source,const lane_selector_t & next_lane)18151c0b2f7Stbbdev     void push(d1::task* source, const lane_selector_t& next_lane ) {
18251c0b2f7Stbbdev         bool succeed = false;
18351c0b2f7Stbbdev         unsigned lane = 0;
18451c0b2f7Stbbdev         do {
18551c0b2f7Stbbdev             lane = next_lane( /*out_of=*/N );
18651c0b2f7Stbbdev             __TBB_ASSERT( lane < N, "Incorrect lane index." );
18751c0b2f7Stbbdev         } while( ! (succeed = try_push( source, lane )) );
18851c0b2f7Stbbdev     }
18951c0b2f7Stbbdev 
19051c0b2f7Stbbdev     //! Try finding and popping a task using passed functor for lane selection. Last used lane is
19151c0b2f7Stbbdev     //! updated inside lane selector.
19251c0b2f7Stbbdev     template<typename lane_selector_t>
pop(const lane_selector_t & next_lane)19351c0b2f7Stbbdev     d1::task* pop( const lane_selector_t& next_lane ) {
19457f524caSIlya Isaev         d1::task* popped = nullptr;
19551c0b2f7Stbbdev         unsigned lane = 0;
196*bc48c4a0SIlya Isaev         for (atomic_backoff b; !empty() && !popped; b.pause()) {
19751c0b2f7Stbbdev             lane = next_lane( /*out_of=*/N);
19851c0b2f7Stbbdev             __TBB_ASSERT(lane < N, "Incorrect lane index.");
199*bc48c4a0SIlya Isaev             popped = try_pop(lane);
200*bc48c4a0SIlya Isaev         }
20151c0b2f7Stbbdev         return popped;
20251c0b2f7Stbbdev     }
20351c0b2f7Stbbdev 
20451c0b2f7Stbbdev     //! Try finding and popping a related task.
pop_specific(unsigned & last_used_lane,isolation_type isolation)20551c0b2f7Stbbdev     d1::task* pop_specific( unsigned& last_used_lane, isolation_type isolation ) {
20657f524caSIlya Isaev         d1::task* result = nullptr;
20751c0b2f7Stbbdev         // Lane selection is round-robin in backward direction.
20851c0b2f7Stbbdev         unsigned idx = last_used_lane & (N-1);
20951c0b2f7Stbbdev         do {
21051c0b2f7Stbbdev             if( is_bit_set( population.load(std::memory_order_relaxed), idx ) ) {
21151c0b2f7Stbbdev                 lane_t& lane = lanes[idx];
2124523a761Stbbdev                 mutex::scoped_lock lock;
21351c0b2f7Stbbdev                 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
21451c0b2f7Stbbdev                     result = look_specific( lane.my_queue, isolation );
21551c0b2f7Stbbdev                     if( lane.my_queue.empty() )
21651c0b2f7Stbbdev                         clear_one_bit( population, idx );
21751c0b2f7Stbbdev                     if( result )
21851c0b2f7Stbbdev                         break;
21951c0b2f7Stbbdev                 }
22051c0b2f7Stbbdev             }
22151c0b2f7Stbbdev             idx=(idx-1)&(N-1);
22251c0b2f7Stbbdev         } while( !empty() && idx != last_used_lane );
22351c0b2f7Stbbdev         last_used_lane = idx;
22451c0b2f7Stbbdev         return result;
22551c0b2f7Stbbdev     }
22651c0b2f7Stbbdev 
22751c0b2f7Stbbdev     //! Checks existence of a task.
empty()22851c0b2f7Stbbdev     bool empty() {
22951c0b2f7Stbbdev         return !population.load(std::memory_order_relaxed);
23051c0b2f7Stbbdev     }
23151c0b2f7Stbbdev 
23251c0b2f7Stbbdev private:
23351c0b2f7Stbbdev     //! Returns true on successful push, otherwise - false.
try_push(d1::task * source,unsigned lane_idx)23451c0b2f7Stbbdev     bool try_push(d1::task* source, unsigned lane_idx ) {
2354523a761Stbbdev         mutex::scoped_lock lock;
23651c0b2f7Stbbdev         if( lock.try_acquire( lanes[lane_idx].my_mutex ) ) {
23751c0b2f7Stbbdev             lanes[lane_idx].my_queue.push_back( source );
23851c0b2f7Stbbdev             set_one_bit( population, lane_idx ); // TODO: avoid atomic op if the bit is already set
23951c0b2f7Stbbdev             return true;
24051c0b2f7Stbbdev         }
24151c0b2f7Stbbdev         return false;
24251c0b2f7Stbbdev     }
24351c0b2f7Stbbdev 
24457f524caSIlya Isaev     //! Returns pointer to task on successful pop, otherwise - nullptr.
try_pop(unsigned lane_idx)24551c0b2f7Stbbdev     d1::task* try_pop( unsigned lane_idx ) {
24651c0b2f7Stbbdev         if( !is_bit_set( population.load(std::memory_order_relaxed), lane_idx ) )
24757f524caSIlya Isaev             return nullptr;
24857f524caSIlya Isaev         d1::task* result = nullptr;
24951c0b2f7Stbbdev         lane_t& lane = lanes[lane_idx];
2504523a761Stbbdev         mutex::scoped_lock lock;
25151c0b2f7Stbbdev         if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
25251c0b2f7Stbbdev             result = this->get_item( lane.my_queue );
25351c0b2f7Stbbdev             if( lane.my_queue.empty() )
25451c0b2f7Stbbdev                 clear_one_bit( population, lane_idx );
25551c0b2f7Stbbdev         }
25651c0b2f7Stbbdev         return result;
25751c0b2f7Stbbdev     }
25851c0b2f7Stbbdev 
25951c0b2f7Stbbdev     // TODO: unify '*_specific' logic with 'pop' methods above
look_specific(typename lane_t::queue_base_t & queue,isolation_type isolation)26051c0b2f7Stbbdev     d1::task* look_specific( typename lane_t::queue_base_t& queue, isolation_type isolation ) {
26157f524caSIlya Isaev         __TBB_ASSERT( !queue.empty(), nullptr);
26251c0b2f7Stbbdev         // TODO: add a worst-case performance test and consider an alternative container with better
26351c0b2f7Stbbdev         // performance for isolation search.
26451c0b2f7Stbbdev         typename lane_t::queue_base_t::iterator curr = queue.end();
26551c0b2f7Stbbdev         do {
26651c0b2f7Stbbdev             // TODO: consider logic from get_task to simplify the code.
26751c0b2f7Stbbdev             d1::task* result = *--curr;
26851c0b2f7Stbbdev             if( result && task_accessor::isolation(*result) == isolation ) {
26951c0b2f7Stbbdev                 if( queue.end() - curr == 1 )
27051c0b2f7Stbbdev                     queue.pop_back(); // a little of housekeeping along the way
27151c0b2f7Stbbdev                 else
27257f524caSIlya Isaev                     *curr = nullptr;      // grabbing task with the same isolation
27351c0b2f7Stbbdev                 // TODO: move one of the container's ends instead if the task has been found there
27451c0b2f7Stbbdev                 return result;
27551c0b2f7Stbbdev             }
27651c0b2f7Stbbdev         } while( curr != queue.begin() );
27757f524caSIlya Isaev         return nullptr;
27851c0b2f7Stbbdev     }
27951c0b2f7Stbbdev 
28051c0b2f7Stbbdev }; // task_stream
28151c0b2f7Stbbdev 
28251c0b2f7Stbbdev } // namespace r1
28351c0b2f7Stbbdev } // namespace detail
28451c0b2f7Stbbdev } // namespace tbb
28551c0b2f7Stbbdev 
28651c0b2f7Stbbdev #endif /* _TBB_task_stream_H */
287