1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef _TBB_arena_slot_H
18 #define _TBB_arena_slot_H
19
20 #include "oneapi/tbb/detail/_config.h"
21 #include "oneapi/tbb/detail/_utils.h"
22 #include "oneapi/tbb/detail/_template_helpers.h"
23 #include "oneapi/tbb/detail/_task.h"
24
25 #include "oneapi/tbb/cache_aligned_allocator.h"
26
27 #include "misc.h"
28 #include "mailbox.h"
29 #include "scheduler_common.h"
30
31 #include <atomic>
32
33 namespace tbb {
34 namespace detail {
35 namespace r1 {
36
37 class arena;
38 class task_group_context;
39
40 //--------------------------------------------------------------------------------------------------------
41 // Arena Slot
42 //--------------------------------------------------------------------------------------------------------
43
44 static d1::task** const EmptyTaskPool = nullptr;
45 static d1::task** const LockedTaskPool = reinterpret_cast<d1::task**>(~std::intptr_t(0));
46
alignas(max_nfs_size)47 struct alignas(max_nfs_size) arena_slot_shared_state {
48 //! Scheduler of the thread attached to the slot
49 /** Marks the slot as busy, and is used to iterate through the schedulers belonging to this arena **/
50 std::atomic<bool> my_is_occupied;
51
52 // Synchronization of access to Task pool
53 /** Also is used to specify if the slot is empty or locked:
54 0 - empty
55 -1 - locked **/
56 std::atomic<d1::task**> task_pool;
57
58 //! Index of the first ready task in the deque.
59 /** Modified by thieves, and by the owner during compaction/reallocation **/
60 std::atomic<std::size_t> head;
61 };
62
alignas(max_nfs_size)63 struct alignas(max_nfs_size) arena_slot_private_state {
64 //! Hint provided for operations with the container of starvation-resistant tasks.
65 /** Modified by the owner thread (during these operations). **/
66 unsigned hint_for_fifo_stream;
67
68 #if __TBB_PREVIEW_CRITICAL_TASKS
69 //! Similar to 'hint_for_fifo_stream' but for critical tasks.
70 unsigned hint_for_critical_stream;
71 #endif
72
73 //! Similar to 'hint_for_fifo_stream' but for the resume tasks.
74 unsigned hint_for_resume_stream;
75
76 //! Index of the element following the last ready task in the deque.
77 /** Modified by the owner thread. **/
78 std::atomic<std::size_t> tail;
79
80 //! Capacity of the primary task pool (number of elements - pointers to task).
81 std::size_t my_task_pool_size;
82
83 //! Task pool of the scheduler that owns this slot
84 // TODO: previously was task**__TBB_atomic, but seems like not accessed on other thread
85 d1::task** task_pool_ptr;
86 };
87
88 class arena_slot : private arena_slot_shared_state, private arena_slot_private_state {
89 friend class arena;
90 friend class outermost_worker_waiter;
91 friend class task_dispatcher;
92 friend class thread_data;
93 friend class nested_arena_context;
94
95 //! The original task dispather associated with this slot
96 task_dispatcher* my_default_task_dispatcher;
97
98 #if TBB_USE_ASSERT
fill_with_canary_pattern(std::size_t first,std::size_t last)99 void fill_with_canary_pattern ( std::size_t first, std::size_t last ) {
100 for ( std::size_t i = first; i < last; ++i )
101 poison_pointer(task_pool_ptr[i]);
102 }
103 #else
fill_with_canary_pattern(size_t,std::size_t)104 void fill_with_canary_pattern ( size_t, std::size_t ) {}
105 #endif /* TBB_USE_ASSERT */
106
107 static constexpr std::size_t min_task_pool_size = 64;
108
allocate_task_pool(std::size_t n)109 void allocate_task_pool( std::size_t n ) {
110 std::size_t byte_size = ((n * sizeof(d1::task*) + max_nfs_size - 1) / max_nfs_size) * max_nfs_size;
111 my_task_pool_size = byte_size / sizeof(d1::task*);
112 task_pool_ptr = (d1::task**)cache_aligned_allocate(byte_size);
113 // No need to clear the fresh deque since valid items are designated by the head and tail members.
114 // But fill it with a canary pattern in the high vigilance debug mode.
115 fill_with_canary_pattern( 0, my_task_pool_size );
116 }
117
118 public:
119 //! Deallocate task pool that was allocated by means of allocate_task_pool.
free_task_pool()120 void free_task_pool( ) {
121 // TODO: understand the assertion and modify
122 // __TBB_ASSERT( !task_pool /* TODO: == EmptyTaskPool */, nullptr);
123 if( task_pool_ptr ) {
124 __TBB_ASSERT( my_task_pool_size, nullptr);
125 cache_aligned_deallocate( task_pool_ptr );
126 task_pool_ptr = nullptr;
127 my_task_pool_size = 0;
128 }
129 }
130
131 //! Get a task from the local pool.
132 /** Called only by the pool owner.
133 Returns the pointer to the task or nullptr if a suitable task is not found.
134 Resets the pool if it is empty. **/
135 d1::task* get_task(execution_data_ext&, isolation_type);
136
137 //! Steal task from slot's ready pool
138 d1::task* steal_task(arena&, isolation_type, std::size_t);
139
140 //! Some thread is now the owner of this slot
occupy()141 void occupy() {
142 __TBB_ASSERT(!my_is_occupied.load(std::memory_order_relaxed), nullptr);
143 my_is_occupied.store(true, std::memory_order_release);
144 }
145
146 //! Try to occupy the slot
try_occupy()147 bool try_occupy() {
148 return !is_occupied() && my_is_occupied.exchange(true) == false;
149 }
150
151 //! Some thread is now the owner of this slot
release()152 void release() {
153 __TBB_ASSERT(my_is_occupied.load(std::memory_order_relaxed), nullptr);
154 my_is_occupied.store(false, std::memory_order_release);
155 }
156
157 //! Spawn newly created tasks
spawn(d1::task & t)158 void spawn(d1::task& t) {
159 std::size_t T = prepare_task_pool(1);
160 __TBB_ASSERT(is_poisoned(task_pool_ptr[T]), nullptr);
161 task_pool_ptr[T] = &t;
162 commit_spawned_tasks(T + 1);
163 if (!is_task_pool_published()) {
164 publish_task_pool();
165 }
166 }
167
is_task_pool_published()168 bool is_task_pool_published() const {
169 return task_pool.load(std::memory_order_relaxed) != EmptyTaskPool;
170 }
171
is_empty()172 bool is_empty() const {
173 return task_pool.load(std::memory_order_relaxed) == EmptyTaskPool ||
174 head.load(std::memory_order_relaxed) >= tail.load(std::memory_order_relaxed);
175 }
176
is_occupied()177 bool is_occupied() const {
178 return my_is_occupied.load(std::memory_order_relaxed);
179 }
180
default_task_dispatcher()181 task_dispatcher& default_task_dispatcher() {
182 __TBB_ASSERT(my_default_task_dispatcher != nullptr, nullptr);
183 return *my_default_task_dispatcher;
184 }
185
init_task_streams(unsigned h)186 void init_task_streams(unsigned h) {
187 hint_for_fifo_stream = h;
188 #if __TBB_RESUMABLE_TASKS
189 hint_for_resume_stream = h;
190 #endif
191 #if __TBB_PREVIEW_CRITICAL_TASKS
192 hint_for_critical_stream = h;
193 #endif
194 }
195
196 #if __TBB_PREVIEW_CRITICAL_TASKS
critical_hint()197 unsigned& critical_hint() {
198 return hint_for_critical_stream;
199 }
200 #endif
201 private:
202 //! Get a task from the local pool at specified location T.
203 /** Returns the pointer to the task or nullptr if the task cannot be executed,
204 e.g. proxy has been deallocated or isolation constraint is not met.
205 tasks_omitted tells if some tasks have been omitted.
206 Called only by the pool owner. The caller should guarantee that the
207 position T is not available for a thief. **/
208 d1::task* get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_omitted, isolation_type isolation);
209
210 //! Makes sure that the task pool can accommodate at least n more elements
211 /** If necessary relocates existing task pointers or grows the ready task deque.
212 * Returns (possible updated) tail index (not accounting for n). **/
prepare_task_pool(std::size_t num_tasks)213 std::size_t prepare_task_pool(std::size_t num_tasks) {
214 std::size_t T = tail.load(std::memory_order_relaxed); // mirror
215 if ( T + num_tasks <= my_task_pool_size ) {
216 return T;
217 }
218
219 std::size_t new_size = num_tasks;
220 if ( !my_task_pool_size ) {
221 __TBB_ASSERT( !is_task_pool_published() && is_quiescent_local_task_pool_reset(), nullptr);
222 __TBB_ASSERT( !task_pool_ptr, nullptr);
223 if ( num_tasks < min_task_pool_size ) new_size = min_task_pool_size;
224 allocate_task_pool( new_size );
225 return 0;
226 }
227 acquire_task_pool();
228 std::size_t H = head.load(std::memory_order_relaxed); // mirror
229 d1::task** new_task_pool = task_pool_ptr;
230 __TBB_ASSERT( my_task_pool_size >= min_task_pool_size, nullptr);
231 // Count not skipped tasks. Consider using std::count_if.
232 for ( std::size_t i = H; i < T; ++i )
233 if ( new_task_pool[i] ) ++new_size;
234 // If the free space at the beginning of the task pool is too short, we
235 // are likely facing a pathological single-producer-multiple-consumers
236 // scenario, and thus it's better to expand the task pool
237 bool allocate = new_size > my_task_pool_size - min_task_pool_size/4;
238 if ( allocate ) {
239 // Grow task pool. As this operation is rare, and its cost is asymptotically
240 // amortizable, we can tolerate new task pool allocation done under the lock.
241 if ( new_size < 2 * my_task_pool_size )
242 new_size = 2 * my_task_pool_size;
243 allocate_task_pool( new_size ); // updates my_task_pool_size
244 }
245 // Filter out skipped tasks. Consider using std::copy_if.
246 std::size_t T1 = 0;
247 for ( std::size_t i = H; i < T; ++i ) {
248 if ( new_task_pool[i] ) {
249 task_pool_ptr[T1++] = new_task_pool[i];
250 }
251 }
252 // Deallocate the previous task pool if a new one has been allocated.
253 if ( allocate )
254 cache_aligned_deallocate( new_task_pool );
255 else
256 fill_with_canary_pattern( T1, tail );
257 // Publish the new state.
258 commit_relocated_tasks( T1 );
259 // assert_task_pool_valid();
260 return T1;
261 }
262
263 //! Makes newly spawned tasks visible to thieves
commit_spawned_tasks(std::size_t new_tail)264 void commit_spawned_tasks(std::size_t new_tail) {
265 __TBB_ASSERT (new_tail <= my_task_pool_size, "task deque end was overwritten");
266 // emit "task was released" signal
267 // Release fence is necessary to make sure that previously stored task pointers
268 // are visible to thieves.
269 tail.store(new_tail, std::memory_order_release);
270 }
271
272 //! Used by workers to enter the task pool
273 /** Does not lock the task pool in case if arena slot has been successfully grabbed. **/
publish_task_pool()274 void publish_task_pool() {
275 __TBB_ASSERT ( task_pool == EmptyTaskPool, "someone else grabbed my arena slot?" );
276 __TBB_ASSERT ( head.load(std::memory_order_relaxed) < tail.load(std::memory_order_relaxed),
277 "entering arena without tasks to share" );
278 // Release signal on behalf of previously spawned tasks (when this thread was not in arena yet)
279 task_pool.store(task_pool_ptr, std::memory_order_release );
280 }
281
282 //! Locks the local task pool
283 /** Garbles task_pool for the duration of the lock. Requires correctly set task_pool_ptr.
284 ATTENTION: This method is mostly the same as generic_scheduler::lock_task_pool(), with
285 a little different logic of slot state checks (slot is either locked or points
286 to our task pool). Thus if either of them is changed, consider changing the counterpart as well. **/
acquire_task_pool()287 void acquire_task_pool() {
288 if (!is_task_pool_published()) {
289 return; // we are not in arena - nothing to lock
290 }
291 bool sync_prepare_done = false;
292 for( atomic_backoff b;;b.pause() ) {
293 #if TBB_USE_ASSERT
294 // Local copy of the arena slot task pool pointer is necessary for the next
295 // assertion to work correctly to exclude asynchronous state transition effect.
296 d1::task** tp = task_pool.load(std::memory_order_relaxed);
297 __TBB_ASSERT( tp == LockedTaskPool || tp == task_pool_ptr, "slot ownership corrupt?" );
298 #endif
299 d1::task** expected = task_pool_ptr;
300 if( task_pool.load(std::memory_order_relaxed) != LockedTaskPool &&
301 task_pool.compare_exchange_strong(expected, LockedTaskPool ) ) {
302 // We acquired our own slot
303 break;
304 } else if( !sync_prepare_done ) {
305 // Start waiting
306 sync_prepare_done = true;
307 }
308 // Someone else acquired a lock, so pause and do exponential backoff.
309 }
310 __TBB_ASSERT( task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "not really acquired task pool" );
311 }
312
313 //! Unlocks the local task pool
314 /** Restores task_pool munged by acquire_task_pool. Requires
315 correctly set task_pool_ptr. **/
release_task_pool()316 void release_task_pool() {
317 if ( !(task_pool.load(std::memory_order_relaxed) != EmptyTaskPool) )
318 return; // we are not in arena - nothing to unlock
319 __TBB_ASSERT( task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "arena slot is not locked" );
320 task_pool.store( task_pool_ptr, std::memory_order_release );
321 }
322
323 //! Locks victim's task pool, and returns pointer to it. The pointer can be nullptr.
324 /** Garbles victim_arena_slot->task_pool for the duration of the lock. **/
lock_task_pool()325 d1::task** lock_task_pool() {
326 d1::task** victim_task_pool;
327 for ( atomic_backoff backoff;; /*backoff pause embedded in the loop*/) {
328 victim_task_pool = task_pool.load(std::memory_order_relaxed);
329 // Microbenchmarks demonstrated that aborting stealing attempt when the
330 // victim's task pool is locked degrade performance.
331 // NOTE: Do not use comparison of head and tail indices to check for
332 // the presence of work in the victim's task pool, as they may give
333 // incorrect indication because of task pool relocations and resizes.
334 if (victim_task_pool == EmptyTaskPool) {
335 break;
336 }
337 d1::task** expected = victim_task_pool;
338 if (victim_task_pool != LockedTaskPool && task_pool.compare_exchange_strong(expected, LockedTaskPool) ) {
339 // We've locked victim's task pool
340 break;
341 }
342 // Someone else acquired a lock, so pause and do exponential backoff.
343 backoff.pause();
344 }
345 __TBB_ASSERT(victim_task_pool == EmptyTaskPool ||
346 (task_pool.load(std::memory_order_relaxed) == LockedTaskPool &&
347 victim_task_pool != LockedTaskPool), "not really locked victim's task pool?");
348 return victim_task_pool;
349 }
350
351 //! Unlocks victim's task pool
352 /** Restores victim_arena_slot->task_pool munged by lock_task_pool. **/
unlock_task_pool(d1::task ** victim_task_pool)353 void unlock_task_pool(d1::task** victim_task_pool) {
354 __TBB_ASSERT(task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "victim arena slot is not locked");
355 __TBB_ASSERT(victim_task_pool != LockedTaskPool, nullptr);
356 task_pool.store(victim_task_pool, std::memory_order_release);
357 }
358
359 #if TBB_USE_ASSERT
is_local_task_pool_quiescent()360 bool is_local_task_pool_quiescent() const {
361 d1::task** tp = task_pool.load(std::memory_order_relaxed);
362 return tp == EmptyTaskPool || tp == LockedTaskPool;
363 }
364
is_quiescent_local_task_pool_empty()365 bool is_quiescent_local_task_pool_empty() const {
366 __TBB_ASSERT(is_local_task_pool_quiescent(), "Task pool is not quiescent");
367 return head.load(std::memory_order_relaxed) == tail.load(std::memory_order_relaxed);
368 }
369
is_quiescent_local_task_pool_reset()370 bool is_quiescent_local_task_pool_reset() const {
371 __TBB_ASSERT(is_local_task_pool_quiescent(), "Task pool is not quiescent");
372 return head.load(std::memory_order_relaxed) == 0 && tail.load(std::memory_order_relaxed) == 0;
373 }
374 #endif // TBB_USE_ASSERT
375
376 //! Leave the task pool
377 /** Leaving task pool automatically releases the task pool if it is locked. **/
leave_task_pool()378 void leave_task_pool() {
379 __TBB_ASSERT(is_task_pool_published(), "Not in arena");
380 // Do not reset my_arena_index. It will be used to (attempt to) re-acquire the slot next time
381 __TBB_ASSERT(task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "Task pool must be locked when leaving arena");
382 __TBB_ASSERT(is_quiescent_local_task_pool_empty(), "Cannot leave arena when the task pool is not empty");
383 // No release fence is necessary here as this assignment precludes external
384 // accesses to the local task pool when becomes visible. Thus it is harmless
385 // if it gets hoisted above preceding local bookkeeping manipulations.
386 task_pool.store(EmptyTaskPool, std::memory_order_relaxed);
387 }
388
389 //! Resets head and tail indices to 0, and leaves task pool
390 /** The task pool must be locked by the owner (via acquire_task_pool).**/
reset_task_pool_and_leave()391 void reset_task_pool_and_leave() {
392 __TBB_ASSERT(task_pool.load(std::memory_order_relaxed) == LockedTaskPool, "Task pool must be locked when resetting task pool");
393 tail.store(0, std::memory_order_relaxed);
394 head.store(0, std::memory_order_relaxed);
395 leave_task_pool();
396 }
397
398 //! Makes relocated tasks visible to thieves and releases the local task pool.
399 /** Obviously, the task pool must be locked when calling this method. **/
commit_relocated_tasks(std::size_t new_tail)400 void commit_relocated_tasks(std::size_t new_tail) {
401 __TBB_ASSERT(is_local_task_pool_quiescent(), "Task pool must be locked when calling commit_relocated_tasks()");
402 head.store(0, std::memory_order_relaxed);
403 // Tail is updated last to minimize probability of a thread making arena
404 // snapshot being misguided into thinking that this task pool is empty.
405 tail.store(new_tail, std::memory_order_release);
406 release_task_pool();
407 }
408 };
409
410 } // namespace r1
411 } // namespace detail
412 } // namespace tbb
413
414 #endif // __TBB_arena_slot_H
415