xref: /oneTBB/src/tbb/task_stream.h (revision bc48c4a0)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_task_stream_H
18 #define _TBB_task_stream_H
19 
20 //! This file is a possible future replacement for the task_stream class implemented in
21 //! task_stream.h. It refactors the code and extends task_stream capabilities by moving lane
22 //! management during operations on caller side. Despite the fact that new implementation should not
23 //! affect performance of the original task stream, analysis on this subject was not made at the
24 //! time it was developed. In addition, it is not clearly seen at the moment that this container
25 //! would be suitable for critical tasks due to linear time complexity on its operations.
26 
27 #include "oneapi/tbb/detail/_utils.h"
28 #include "oneapi/tbb/cache_aligned_allocator.h"
29 #include "oneapi/tbb/mutex.h"
30 
31 #include "scheduler_common.h"
32 #include "misc.h" // for FastRandom
33 
34 #include <deque>
35 #include <climits>
36 #include <atomic>
37 
38 namespace tbb {
39 namespace detail {
40 namespace r1 {
41 
42 //! Essentially, this is just a pair of a queue and a mutex to protect the queue.
43 /** The reason std::pair is not used is that the code would look less clean
44     if field names were replaced with 'first' and 'second'. **/
45 template< typename T, typename mutex_t >
alignas(max_nfs_size)46 struct alignas(max_nfs_size) queue_and_mutex {
47     typedef std::deque< T, cache_aligned_allocator<T> > queue_base_t;
48 
49     queue_base_t my_queue{};
50     mutex_t      my_mutex{};
51 };
52 
53 using population_t = uintptr_t;
54 const population_t one = 1;
55 
set_one_bit(std::atomic<population_t> & dest,int pos)56 inline void set_one_bit( std::atomic<population_t>& dest, int pos ) {
57     __TBB_ASSERT( pos>=0, nullptr);
58     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
59     dest.fetch_or( one<<pos );
60 }
61 
clear_one_bit(std::atomic<population_t> & dest,int pos)62 inline void clear_one_bit( std::atomic<population_t>& dest, int pos ) {
63     __TBB_ASSERT( pos>=0, nullptr);
64     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
65     dest.fetch_and( ~(one<<pos) );
66 }
67 
is_bit_set(population_t val,int pos)68 inline bool is_bit_set( population_t val, int pos ) {
69     __TBB_ASSERT( pos>=0, nullptr);
70     __TBB_ASSERT( pos<int(sizeof(population_t)*CHAR_BIT), nullptr);
71     return (val & (one<<pos)) != 0;
72 }
73 
74 struct random_lane_selector :
75 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
76         no_assign
77 #else
78         no_copy
79 #endif
80 {
random_lane_selectorrandom_lane_selector81     random_lane_selector( FastRandom& random ) : my_random( random ) {}
operatorrandom_lane_selector82     unsigned operator()( unsigned out_of ) const {
83         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
84         return my_random.get() & (out_of-1);
85     }
86 private:
87     FastRandom& my_random;
88 };
89 
90 struct lane_selector_base :
91 #if __INTEL_COMPILER == 1110 || __INTEL_COMPILER == 1500
92         no_assign
93 #else
94         no_copy
95 #endif
96 {
97     unsigned& my_previous;
lane_selector_baselane_selector_base98     lane_selector_base( unsigned& previous ) : my_previous( previous ) {}
99 };
100 
101 struct subsequent_lane_selector : lane_selector_base {
subsequent_lane_selectorsubsequent_lane_selector102     subsequent_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorsubsequent_lane_selector103     unsigned operator()( unsigned out_of ) const {
104         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
105         return (++my_previous &= out_of-1);
106     }
107 };
108 
109 struct preceding_lane_selector : lane_selector_base {
preceding_lane_selectorpreceding_lane_selector110     preceding_lane_selector( unsigned& previous ) : lane_selector_base( previous ) {}
operatorpreceding_lane_selector111     unsigned operator()( unsigned out_of ) const {
112         __TBB_ASSERT( ((out_of-1) & out_of) == 0, "number of lanes is not power of two." );
113         return (--my_previous &= (out_of-1));
114     }
115 };
116 
117 //! Specializes from which side of the underlying container elements are retrieved. Method must be
118 //! called under corresponding mutex locked.
119 template<task_stream_accessor_type accessor>
120 class task_stream_accessor : no_copy {
121 protected:
122     using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)123     d1::task* get_item( lane_t::queue_base_t& queue ) {
124         d1::task* result = queue.front();
125         queue.pop_front();
126         return result;
127     }
128 };
129 
130 template<>
131 class task_stream_accessor< back_nonnull_accessor > : no_copy {
132 protected:
133     using lane_t = queue_and_mutex <d1::task*, mutex>;
get_item(lane_t::queue_base_t & queue)134     d1::task* get_item( lane_t::queue_base_t& queue ) {
135         d1::task* result = nullptr;
136         __TBB_ASSERT(!queue.empty(), nullptr);
137         // Isolated task can put zeros in queue see look_specific
138         do {
139             result = queue.back();
140             queue.pop_back();
141         } while ( !result && !queue.empty() );
142         return result;
143     }
144 };
145 
146 //! The container for "fairness-oriented" aka "enqueued" tasks.
147 template<task_stream_accessor_type accessor>
148 class task_stream : public task_stream_accessor< accessor > {
149     using lane_t = typename task_stream_accessor<accessor>::lane_t;
150     std::atomic<population_t> population{};
151     lane_t* lanes{nullptr};
152     unsigned N{};
153 
154 public:
155     task_stream() = default;
156 
initialize(unsigned n_lanes)157     void initialize( unsigned n_lanes ) {
158         const unsigned max_lanes = sizeof(population_t) * CHAR_BIT;
159 
160         N = n_lanes >= max_lanes ? max_lanes : n_lanes > 2 ? 1 << (tbb::detail::log2(n_lanes - 1) + 1) : 2;
161         __TBB_ASSERT( N == max_lanes || (N >= n_lanes && ((N - 1) & N) == 0), "number of lanes miscalculated" );
162         __TBB_ASSERT( N <= sizeof(population_t) * CHAR_BIT, nullptr);
163         lanes = static_cast<lane_t*>(cache_aligned_allocate(sizeof(lane_t) * N));
164         for (unsigned i = 0; i < N; ++i) {
165             new (lanes + i) lane_t;
166         }
167         __TBB_ASSERT( !population.load(std::memory_order_relaxed), nullptr);
168     }
169 
~task_stream()170     ~task_stream() {
171         if (lanes) {
172             for (unsigned i = 0; i < N; ++i) {
173                 lanes[i].~lane_t();
174             }
175             cache_aligned_deallocate(lanes);
176         }
177     }
178 
179     //! Push a task into a lane. Lane selection is performed by passed functor.
180     template<typename lane_selector_t>
push(d1::task * source,const lane_selector_t & next_lane)181     void push(d1::task* source, const lane_selector_t& next_lane ) {
182         bool succeed = false;
183         unsigned lane = 0;
184         do {
185             lane = next_lane( /*out_of=*/N );
186             __TBB_ASSERT( lane < N, "Incorrect lane index." );
187         } while( ! (succeed = try_push( source, lane )) );
188     }
189 
190     //! Try finding and popping a task using passed functor for lane selection. Last used lane is
191     //! updated inside lane selector.
192     template<typename lane_selector_t>
pop(const lane_selector_t & next_lane)193     d1::task* pop( const lane_selector_t& next_lane ) {
194         d1::task* popped = nullptr;
195         unsigned lane = 0;
196         for (atomic_backoff b; !empty() && !popped; b.pause()) {
197             lane = next_lane( /*out_of=*/N);
198             __TBB_ASSERT(lane < N, "Incorrect lane index.");
199             popped = try_pop(lane);
200         }
201         return popped;
202     }
203 
204     //! Try finding and popping a related task.
pop_specific(unsigned & last_used_lane,isolation_type isolation)205     d1::task* pop_specific( unsigned& last_used_lane, isolation_type isolation ) {
206         d1::task* result = nullptr;
207         // Lane selection is round-robin in backward direction.
208         unsigned idx = last_used_lane & (N-1);
209         do {
210             if( is_bit_set( population.load(std::memory_order_relaxed), idx ) ) {
211                 lane_t& lane = lanes[idx];
212                 mutex::scoped_lock lock;
213                 if( lock.try_acquire(lane.my_mutex) && !lane.my_queue.empty() ) {
214                     result = look_specific( lane.my_queue, isolation );
215                     if( lane.my_queue.empty() )
216                         clear_one_bit( population, idx );
217                     if( result )
218                         break;
219                 }
220             }
221             idx=(idx-1)&(N-1);
222         } while( !empty() && idx != last_used_lane );
223         last_used_lane = idx;
224         return result;
225     }
226 
227     //! Checks existence of a task.
empty()228     bool empty() {
229         return !population.load(std::memory_order_relaxed);
230     }
231 
232 private:
233     //! Returns true on successful push, otherwise - false.
try_push(d1::task * source,unsigned lane_idx)234     bool try_push(d1::task* source, unsigned lane_idx ) {
235         mutex::scoped_lock lock;
236         if( lock.try_acquire( lanes[lane_idx].my_mutex ) ) {
237             lanes[lane_idx].my_queue.push_back( source );
238             set_one_bit( population, lane_idx ); // TODO: avoid atomic op if the bit is already set
239             return true;
240         }
241         return false;
242     }
243 
244     //! Returns pointer to task on successful pop, otherwise - nullptr.
try_pop(unsigned lane_idx)245     d1::task* try_pop( unsigned lane_idx ) {
246         if( !is_bit_set( population.load(std::memory_order_relaxed), lane_idx ) )
247             return nullptr;
248         d1::task* result = nullptr;
249         lane_t& lane = lanes[lane_idx];
250         mutex::scoped_lock lock;
251         if( lock.try_acquire( lane.my_mutex ) && !lane.my_queue.empty() ) {
252             result = this->get_item( lane.my_queue );
253             if( lane.my_queue.empty() )
254                 clear_one_bit( population, lane_idx );
255         }
256         return result;
257     }
258 
259     // TODO: unify '*_specific' logic with 'pop' methods above
look_specific(typename lane_t::queue_base_t & queue,isolation_type isolation)260     d1::task* look_specific( typename lane_t::queue_base_t& queue, isolation_type isolation ) {
261         __TBB_ASSERT( !queue.empty(), nullptr);
262         // TODO: add a worst-case performance test and consider an alternative container with better
263         // performance for isolation search.
264         typename lane_t::queue_base_t::iterator curr = queue.end();
265         do {
266             // TODO: consider logic from get_task to simplify the code.
267             d1::task* result = *--curr;
268             if( result && task_accessor::isolation(*result) == isolation ) {
269                 if( queue.end() - curr == 1 )
270                     queue.pop_back(); // a little of housekeeping along the way
271                 else
272                     *curr = nullptr;      // grabbing task with the same isolation
273                 // TODO: move one of the container's ends instead if the task has been found there
274                 return result;
275             }
276         } while( curr != queue.begin() );
277         return nullptr;
278     }
279 
280 }; // task_stream
281 
282 } // namespace r1
283 } // namespace detail
284 } // namespace tbb
285 
286 #endif /* _TBB_task_stream_H */
287