151c0b2f7Stbbdev /*
2c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev #ifndef _TBB_task_stream_H
1851c0b2f7Stbbdev #define _TBB_task_stream_H
1951c0b2f7Stbbdev
2051c0b2f7Stbbdev //! This file is a possible future replacement for the task_stream class implemented in
2151c0b2f7Stbbdev //! task_stream.h. It refactors the code and extends task_stream capabilities by moving lane
2251c0b2f7Stbbdev //! management during operations on caller side. Despite the fact that new implementation should not
2351c0b2f7Stbbdev //! affect performance of the original task stream, analysis on this subject was not made at the
2451c0b2f7Stbbdev //! time it was developed. In addition, it is not clearly seen at the moment that this container
2551c0b2f7Stbbdev //! would be suitable for critical tasks due to linear time complexity on its operations.
2651c0b2f7Stbbdev
2749e08aacStbbdev #include "oneapi/tbb/detail/_utils.h"
2849e08aacStbbdev #include "oneapi/tbb/cache_aligned_allocator.h"
294523a761Stbbdev #include "oneapi/tbb/mutex.h"
3051c0b2f7Stbbdev
3151c0b2f7Stbbdev #include "scheduler_common.h"
3251c0b2f7Stbbdev #include "misc.h" // for FastRandom
3351c0b2f7Stbbdev
3451c0b2f7Stbbdev #include <deque>
3551c0b2f7Stbbdev #include <climits>
3651c0b2f7Stbbdev #include <atomic>
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev namespace tbb {
3951c0b2f7Stbbdev namespace detail {
4051c0b2f7Stbbdev namespace r1 {
4151c0b2f7Stbbdev
4251c0b2f7Stbbdev //! Essentially, this is just a pair of a queue and a mutex to protect the queue.
4351c0b2f7Stbbdev /** The reason std::pair is not used is that the code would look less clean
4451c0b2f7Stbbdev if field names were replaced with 'first' and 'second'. **/
4551c0b2f7Stbbdev template< typename T, typename mutex_t >
alignas(max_nfs_size)4651c0b2f7Stbbdev struct alignas(max_nfs_size) queue_and_mutex {
4751c0b2f7Stbbdev typedef std::deque< T, cache_aligned_allocator<T> > queue_base_t;
4851c0b2f7Stbbdev
4951c0b2f7Stbbdev queue_base_t my_queue{};
5051c0b2f7Stbbdev mutex_t my_mutex{};
5151c0b2f7Stbbdev };
5251c0b2f7Stbbdev
5351c0b2f7Stbbdev using population_t = uintptr_t;
5451c0b2f7Stbbdev const population_t one = 1;
5551c0b2f7Stbbdev
set_one_bit(std::atomic<population_t> & dest,int pos)5651c0b2f7Stbbdev inline void set_one_bit( std::atomic<population_t>& dest, int pos ) {
5757f524caSIlya Isaev __TBB_ASSERT( pos>=0, nullptr);
5857f524caSIlya Isaev __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
5951c0b2f7Stbbdev dest.fetch_or( one<<pos );
6051c0b2f7Stbbdev }
6151c0b2f7Stbbdev
clear_one_bit(std::atomic<population_t> & dest,int pos)6251c0b2f7Stbbdev inline void clear_one_bit( std::atomic<population_t>& dest, int pos ) {
6357f524caSIlya Isaev __TBB_ASSERT( pos>=0, nullptr);
6457f524caSIlya Isaev __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
6551c0b2f7Stbbdev dest.fetch_and( ~(one<<pos) );
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev
is_bit_set(population_t val,int pos)6851c0b2f7Stbbdev inline bool is_bit_set( population_t val, int pos ) {
6957f524caSIlya Isaev __TBB_ASSERT( pos>=0, nullptr);
7057f524caSIlya Isaev __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
7151c0b2f7Stbbdev return (val & (one<<pos)) != 0;
7251c0b2f7Stbbdev }
7351c0b2f7Stbbdev
7451c0b2f7Stbbdev struct random_lane_selector :
7551c0b2f7Stbbdev #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
7651c0b2f7Stbbdev no_assign
7751c0b2f7Stbbdev #else
7851c0b2f7Stbbdev no_copy
7951c0b2f7Stbbdev #endif
8051c0b2f7Stbbdev {
random_lane_selectorrandom_lane_selector8151c0b2f7Stbbdev random_lane_selector( FastRandom& random ) : my_random( random ) {}
operatorrandom_lane_selector8251c0b2f7Stbbdev unsigned operator()( unsigned out_of ) const {
8351c0b2f7Stbbdev __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
8451c0b2f7Stbbdev return my_random.get() & (out_of-1);
8551c0b2f7Stbbdev }
8651c0b2f7Stbbdev private:
8751c0b2f7Stbbdev FastRandom& my_random;
8851c0b2f7Stbbdev };
8951c0b2f7Stbbdev
9051c0b2f7Stbbdev struct lane_selector_base :
9151c0b2f7Stbbdev #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
9251c0b2f7Stbbdev no_assign
9351c0b2f7Stbbdev #else
9451c0b2f7Stbbdev no_copy
9551c0b2f7Stbbdev #endif
9651c0b2f7Stbbdev {
9751c0b2f7Stbbdev unsigned& my_previous;
lane_selector_baselane_selector_base9851c0b2f7Stbbdev lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
9951c0b2f7Stbbdev };
10051c0b2f7Stbbdev
10151c0b2f7Stbbdev struct subsequent_lane_selector : lane_selector_base {
subsequent_lane_selectorsubsequent_lane_selector10251c0b2f7Stbbdev subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorsubsequent_lane_selector10351c0b2f7Stbbdev unsigned operator()( unsigned out_of ) const {
10451c0b2f7Stbbdev __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
10551c0b2f7Stbbdev return (++my_previous &= out_of-1);
10651c0b2f7Stbbdev }
10751c0b2f7Stbbdev };
10851c0b2f7Stbbdev
10951c0b2f7Stbbdev struct preceding_lane_selector : lane_selector_base {
preceding_lane_selectorpreceding_lane_selector11051c0b2f7Stbbdev preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorpreceding_lane_selector11151c0b2f7Stbbdev unsigned operator()( unsigned out_of ) const {
11251c0b2f7Stbbdev __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
11351c0b2f7Stbbdev return (--my_previous &= (out_of-1));
11451c0b2f7Stbbdev }
11551c0b2f7Stbbdev };
11651c0b2f7Stbbdev
11751c0b2f7Stbbdev //! Specializes from which side of the underlying container elements are retrieved. Method must be
11851c0b2f7Stbbdev //! called under corresponding mutex locked.
11951c0b2f7Stbbdev template<task_stream_accessor_type accessor>
12051c0b2f7Stbbdev class task_stream_accessor : no_copy {
12151c0b2f7Stbbdev protected:
1224523a761Stbbdev using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)12351c0b2f7Stbbdev d1::task* get_item( lane_t::queue_base_t& queue ) {
12451c0b2f7Stbbdev d1::task* result = queue.front();
12551c0b2f7Stbbdev queue.pop_front();
12651c0b2f7Stbbdev return result;
12751c0b2f7Stbbdev }
12851c0b2f7Stbbdev };
12951c0b2f7Stbbdev
13051c0b2f7Stbbdev template<>
13151c0b2f7Stbbdev class task_stream_accessor< back_nonnull_accessor > : no_copy {
13251c0b2f7Stbbdev protected:
1334523a761Stbbdev using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)13451c0b2f7Stbbdev d1::task* get_item( lane_t::queue_base_t& queue ) {
13551c0b2f7Stbbdev d1::task* result = nullptr;
13651c0b2f7Stbbdev __TBB_ASSERT(!queue.empty(), nullptr);
13751c0b2f7Stbbdev // Isolated task can put zeros in queue see look_specific
13851c0b2f7Stbbdev do {
13951c0b2f7Stbbdev result = queue.back();
14051c0b2f7Stbbdev queue.pop_back();
14151c0b2f7Stbbdev } while ( !result && !queue.empty() );
14251c0b2f7Stbbdev return result;
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev };
14551c0b2f7Stbbdev
14651c0b2f7Stbbdev //! The container for "fairness-oriented" aka "enqueued" tasks.
14751c0b2f7Stbbdev template<task_stream_accessor_type accessor>
14851c0b2f7Stbbdev class task_stream : public task_stream_accessor< accessor > {
14951c0b2f7Stbbdev using lane_t = typename task_stream_accessor<accessor>::lane_t;
15051c0b2f7Stbbdev std::atomic<population_t> population{};
15151c0b2f7Stbbdev lane_t* lanes{nullptr};
15251c0b2f7Stbbdev unsigned N{};
15351c0b2f7Stbbdev
15451c0b2f7Stbbdev public:
15551c0b2f7Stbbdev task_stream() = default;
15651c0b2f7Stbbdev
initialize(unsigned n_lanes)15751c0b2f7Stbbdev void initialize( unsigned n_lanes ) {
15851c0b2f7Stbbdev const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
15951c0b2f7Stbbdev
16051c0b2f7Stbbdev N = n_lanes >= max_lanes ? max_lanes : n_lanes > 2 ? 1 << (tbb::detail::log2(n_lanes - 1) + 1) : 2;
161b15aabb3Stbbdev __TBB_ASSERT( N == max_lanes || (N >= n_lanes && ((N - 1) & N) == 0), "number of lanes miscalculated" );
16257f524caSIlya Isaev __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, nullptr);
16351c0b2f7Stbbdev lanes = static_cast<lane_t*>(cache_aligned_allocate(sizeof(lane_t) * N));
16451c0b2f7Stbbdev for (unsigned i = 0; i < N; ++i) {
16551c0b2f7Stbbdev new (lanes + i) lane_t;
16651c0b2f7Stbbdev }
16757f524caSIlya Isaev __TBB_ASSERT( !population.load(std::memory_order_relaxed), nullptr);
16851c0b2f7Stbbdev }
16951c0b2f7Stbbdev
~task_stream()17051c0b2f7Stbbdev ~task_stream() {
171b15aabb3Stbbdev if (lanes) {
17251c0b2f7Stbbdev for (unsigned i = 0; i < N; ++i) {
17351c0b2f7Stbbdev lanes[i].~lane_t();
17451c0b2f7Stbbdev }
17551c0b2f7Stbbdev cache_aligned_deallocate(lanes);
17651c0b2f7Stbbdev }
177b15aabb3Stbbdev }
17851c0b2f7Stbbdev
17951c0b2f7Stbbdev //! Push a task into a lane. Lane selection is performed by passed functor.
18051c0b2f7Stbbdev template<typename lane_selector_t>
push(d1::task * source,const lane_selector_t & next_lane)18151c0b2f7Stbbdev void push(d1::task* source, const lane_selector_t& next_lane ) {
18251c0b2f7Stbbdev bool succeed = false;
18351c0b2f7Stbbdev unsigned lane = 0;
18451c0b2f7Stbbdev do {
18551c0b2f7Stbbdev lane = next_lane( /*out_of=*/N );
18651c0b2f7Stbbdev __TBB_ASSERT( lane < N, "Incorrect lane index." );
18751c0b2f7Stbbdev } while( ! (succeed = try_push( source, lane )) );
18851c0b2f7Stbbdev }
18951c0b2f7Stbbdev
19051c0b2f7Stbbdev //! Try finding and popping a task using passed functor for lane selection. Last used lane is
19151c0b2f7Stbbdev //! updated inside lane selector.
19251c0b2f7Stbbdev template<typename lane_selector_t>
pop(const lane_selector_t & next_lane)19351c0b2f7Stbbdev d1::task* pop( const lane_selector_t& next_lane ) {
19457f524caSIlya Isaev d1::task* popped = nullptr;
19551c0b2f7Stbbdev unsigned lane = 0;
196*bc48c4a0SIlya Isaev for (atomic_backoff b; !empty() && !popped; b.pause()) {
19751c0b2f7Stbbdev lane = next_lane( /*out_of=*/N);
19851c0b2f7Stbbdev __TBB_ASSERT(lane < N, "Incorrect lane index.");
199*bc48c4a0SIlya Isaev popped = try_pop(lane);
200*bc48c4a0SIlya Isaev }
20151c0b2f7Stbbdev return popped;
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev
20451c0b2f7Stbbdev //! Try finding and popping a related task.
pop_specific(unsigned & last_used_lane,isolation_type isolation)20551c0b2f7Stbbdev d1::task* pop_specific( unsigned& last_used_lane, isolation_type isolation ) {
20657f524caSIlya Isaev d1::task* result = nullptr;
20751c0b2f7Stbbdev // Lane selection is round-robin in backward direction.
20851c0b2f7Stbbdev unsigned idx = last_used_lane & (N-1);
20951c0b2f7Stbbdev do {
21051c0b2f7Stbbdev if( is_bit_set( population.load(std::memory_order_relaxed), idx ) ) {
21151c0b2f7Stbbdev lane_t& lane = lanes[idx];
2124523a761Stbbdev mutex::scoped_lock lock;
21351c0b2f7Stbbdev if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
21451c0b2f7Stbbdev result = look_specific( lane.my_queue, isolation );
21551c0b2f7Stbbdev if( lane.my_queue.empty() )
21651c0b2f7Stbbdev clear_one_bit( population, idx );
21751c0b2f7Stbbdev if( result )
21851c0b2f7Stbbdev break;
21951c0b2f7Stbbdev }
22051c0b2f7Stbbdev }
22151c0b2f7Stbbdev idx=(idx-1)&(N-1);
22251c0b2f7Stbbdev } while( !empty() && idx != last_used_lane );
22351c0b2f7Stbbdev last_used_lane = idx;
22451c0b2f7Stbbdev return result;
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev
22751c0b2f7Stbbdev //! Checks existence of a task.
empty()22851c0b2f7Stbbdev bool empty() {
22951c0b2f7Stbbdev return !population.load(std::memory_order_relaxed);
23051c0b2f7Stbbdev }
23151c0b2f7Stbbdev
23251c0b2f7Stbbdev private:
23351c0b2f7Stbbdev //! Returns true on successful push, otherwise - false.
try_push(d1::task * source,unsigned lane_idx)23451c0b2f7Stbbdev bool try_push(d1::task* source, unsigned lane_idx ) {
2354523a761Stbbdev mutex::scoped_lock lock;
23651c0b2f7Stbbdev if( lock.try_acquire( lanes[lane_idx].my_mutex ) ) {
23751c0b2f7Stbbdev lanes[lane_idx].my_queue.push_back( source );
23851c0b2f7Stbbdev set_one_bit( population, lane_idx ); // TODO: avoid atomic op if the bit is already set
23951c0b2f7Stbbdev return true;
24051c0b2f7Stbbdev }
24151c0b2f7Stbbdev return false;
24251c0b2f7Stbbdev }
24351c0b2f7Stbbdev
24457f524caSIlya Isaev //! Returns pointer to task on successful pop, otherwise - nullptr.
try_pop(unsigned lane_idx)24551c0b2f7Stbbdev d1::task* try_pop( unsigned lane_idx ) {
24651c0b2f7Stbbdev if( !is_bit_set( population.load(std::memory_order_relaxed), lane_idx ) )
24757f524caSIlya Isaev return nullptr;
24857f524caSIlya Isaev d1::task* result = nullptr;
24951c0b2f7Stbbdev lane_t& lane = lanes[lane_idx];
2504523a761Stbbdev mutex::scoped_lock lock;
25151c0b2f7Stbbdev if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
25251c0b2f7Stbbdev result = this->get_item( lane.my_queue );
25351c0b2f7Stbbdev if( lane.my_queue.empty() )
25451c0b2f7Stbbdev clear_one_bit( population, lane_idx );
25551c0b2f7Stbbdev }
25651c0b2f7Stbbdev return result;
25751c0b2f7Stbbdev }
25851c0b2f7Stbbdev
25951c0b2f7Stbbdev // TODO: unify '*_specific' logic with 'pop' methods above
look_specific(typename lane_t::queue_base_t & queue,isolation_type isolation)26051c0b2f7Stbbdev d1::task* look_specific( typename lane_t::queue_base_t& queue, isolation_type isolation ) {
26157f524caSIlya Isaev __TBB_ASSERT( !queue.empty(), nullptr);
26251c0b2f7Stbbdev // TODO: add a worst-case performance test and consider an alternative container with better
26351c0b2f7Stbbdev // performance for isolation search.
26451c0b2f7Stbbdev typename lane_t::queue_base_t::iterator curr = queue.end();
26551c0b2f7Stbbdev do {
26651c0b2f7Stbbdev // TODO: consider logic from get_task to simplify the code.
26751c0b2f7Stbbdev d1::task* result = *--curr;
26851c0b2f7Stbbdev if( result && task_accessor::isolation(*result) == isolation ) {
26951c0b2f7Stbbdev if( queue.end() - curr == 1 )
27051c0b2f7Stbbdev queue.pop_back(); // a little of housekeeping along the way
27151c0b2f7Stbbdev else
27257f524caSIlya Isaev *curr = nullptr; // grabbing task with the same isolation
27351c0b2f7Stbbdev // TODO: move one of the container's ends instead if the task has been found there
27451c0b2f7Stbbdev return result;
27551c0b2f7Stbbdev }
27651c0b2f7Stbbdev } while( curr != queue.begin() );
27757f524caSIlya Isaev return nullptr;
27851c0b2f7Stbbdev }
27951c0b2f7Stbbdev
28051c0b2f7Stbbdev }; // task_stream
28151c0b2f7Stbbdev
28251c0b2f7Stbbdev } // namespace r1
28351c0b2f7Stbbdev } // namespace detail
28451c0b2f7Stbbdev } // namespace tbb
28551c0b2f7Stbbdev
28651c0b2f7Stbbdev #endif /* _TBB_task_stream_H */
287