1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef _TBB_arena_H
18 #define _TBB_arena_H
19
20 #include <atomic>
21 #include <cstring>
22
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/detail/_utils.h"
25 #include "oneapi/tbb/spin_mutex.h"
26
27 #include "scheduler_common.h"
28 #include "intrusive_list.h"
29 #include "task_stream.h"
30 #include "arena_slot.h"
31 #include "rml_tbb.h"
32 #include "mailbox.h"
33 #include "governor.h"
34 #include "concurrent_monitor.h"
35 #include "observer_proxy.h"
36 #include "thread_control_monitor.h"
37 #include "threading_control_client.h"
38
39 namespace tbb {
40 namespace detail {
41 namespace r1 {
42
43 class task_dispatcher;
44 class task_group_context;
45 class threading_control;
46 class allocate_root_with_context_proxy;
47
48 #if __TBB_ARENA_BINDING
49 class numa_binding_observer;
50 #endif /*__TBB_ARENA_BINDING*/
51
52 //! Bounded coroutines cache LIFO ring buffer
53 class arena_co_cache {
54 //! Ring buffer storage
55 task_dispatcher** my_co_scheduler_cache;
56 //! Current cache index
57 unsigned my_head;
58 //! Cache capacity for arena
59 unsigned my_max_index;
60 //! Accessor lock for modification operations
61 tbb::spin_mutex my_co_cache_mutex;
62
next_index()63 unsigned next_index() {
64 return ( my_head == my_max_index ) ? 0 : my_head + 1;
65 }
66
prev_index()67 unsigned prev_index() {
68 return ( my_head == 0 ) ? my_max_index : my_head - 1;
69 }
70
internal_empty()71 bool internal_empty() {
72 return my_co_scheduler_cache[prev_index()] == nullptr;
73 }
74
internal_task_dispatcher_cleanup(task_dispatcher * to_cleanup)75 void internal_task_dispatcher_cleanup(task_dispatcher* to_cleanup) {
76 to_cleanup->~task_dispatcher();
77 cache_aligned_deallocate(to_cleanup);
78 }
79
80 public:
init(unsigned cache_capacity)81 void init(unsigned cache_capacity) {
82 std::size_t alloc_size = cache_capacity * sizeof(task_dispatcher*);
83 my_co_scheduler_cache = (task_dispatcher**)cache_aligned_allocate(alloc_size);
84 std::memset( my_co_scheduler_cache, 0, alloc_size );
85 my_head = 0;
86 my_max_index = cache_capacity - 1;
87 }
88
cleanup()89 void cleanup() {
90 while (task_dispatcher* to_cleanup = pop()) {
91 internal_task_dispatcher_cleanup(to_cleanup);
92 }
93 cache_aligned_deallocate(my_co_scheduler_cache);
94 }
95
96 //! Insert scheduler to the current available place.
97 //! Replace an old value, if necessary.
push(task_dispatcher * s)98 void push(task_dispatcher* s) {
99 task_dispatcher* to_cleanup = nullptr;
100 {
101 tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
102 // Check if we are replacing some existing buffer entrance
103 if (my_co_scheduler_cache[my_head] != nullptr) {
104 to_cleanup = my_co_scheduler_cache[my_head];
105 }
106 // Store the cached value
107 my_co_scheduler_cache[my_head] = s;
108 // Move head index to the next slot
109 my_head = next_index();
110 }
111 // Cleanup replaced buffer if any
112 if (to_cleanup) {
113 internal_task_dispatcher_cleanup(to_cleanup);
114 }
115 }
116
117 //! Get a cached scheduler if any
pop()118 task_dispatcher* pop() {
119 tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
120 // No cached coroutine
121 if (internal_empty()) {
122 return nullptr;
123 }
124 // Move head index to the currently available value
125 my_head = prev_index();
126 // Retrieve the value from the buffer
127 task_dispatcher* to_return = my_co_scheduler_cache[my_head];
128 // Clear the previous entrance value
129 my_co_scheduler_cache[my_head] = nullptr;
130 return to_return;
131 }
132 };
133
134 struct stack_anchor_type {
135 stack_anchor_type() = default;
136 stack_anchor_type(const stack_anchor_type&) = delete;
137 };
138
139 class atomic_flag {
140 static const std::uintptr_t SET = 1;
141 static const std::uintptr_t UNSET = 0;
142 std::atomic<std::uintptr_t> my_state{UNSET};
143 public:
test_and_set()144 bool test_and_set() {
145 std::uintptr_t state = my_state.load(std::memory_order_acquire);
146 switch (state) {
147 case SET:
148 return false;
149 default: /* busy */
150 if (my_state.compare_exchange_strong(state, SET)) {
151 // We interrupted clear transaction
152 return false;
153 }
154 if (state != UNSET) {
155 // We lost our epoch
156 return false;
157 }
158 // We are too late but still in the same epoch
159 __TBB_fallthrough;
160 case UNSET:
161 return my_state.compare_exchange_strong(state, SET);
162 }
163 }
164 template <typename Pred>
try_clear_if(Pred && pred)165 bool try_clear_if(Pred&& pred) {
166 std::uintptr_t busy = std::uintptr_t(&busy);
167 std::uintptr_t state = my_state.load(std::memory_order_acquire);
168 if (state == SET && my_state.compare_exchange_strong(state, busy)) {
169 if (pred()) {
170 return my_state.compare_exchange_strong(busy, UNSET);
171 }
172 // The result of the next operation is discarded, always false should be returned.
173 my_state.compare_exchange_strong(busy, SET);
174 }
175 return false;
176 }
177 bool test(std::memory_order order = std::memory_order_acquire) {
178 return my_state.load(order) != UNSET;
179 }
180 };
181
182 //! The structure of an arena, except the array of slots.
183 /** Separated in order to simplify padding.
184 Intrusive list node base class is used by market to form a list of arenas. **/
185 // TODO: Analyze arena_base cache lines placement
186 struct arena_base : padded<intrusive_list_node> {
187 //! The number of workers that have been marked out by the resource manager to service the arena.
188 std::atomic<unsigned> my_num_workers_allotted; // heavy use in stealing loop
189
190 //! Reference counter for the arena.
191 /** Worker and external thread references are counted separately: first several bits are for references
192 from external thread threads or explicit task_arenas (see arena::ref_external_bits below);
193 the rest counts the number of workers servicing the arena. */
194 std::atomic<unsigned> my_references; // heavy use in stealing loop
195
196 //! The maximal number of currently busy slots.
197 std::atomic<unsigned> my_limit; // heavy use in stealing loop
198
199 //! Task pool for the tasks scheduled via task::enqueue() method.
200 /** Such scheduling guarantees eventual execution even if
201 - new tasks are constantly coming (by extracting scheduled tasks in
202 relaxed FIFO order);
203 - the enqueuing thread does not call any of wait_for_all methods. **/
204 task_stream<front_accessor> my_fifo_task_stream; // heavy use in stealing loop
205
206 //! Task pool for the tasks scheduled via tbb::resume() function.
207 task_stream<front_accessor> my_resume_task_stream; // heavy use in stealing loop
208
209 #if __TBB_PREVIEW_CRITICAL_TASKS
210 //! Task pool for the tasks with critical property set.
211 /** Critical tasks are scheduled for execution ahead of other sources (including local task pool
212 and even bypassed tasks) unless the thread already executes a critical task in an outer
213 dispatch loop **/
214 // used on the hot path of the task dispatch loop
215 task_stream<back_nonnull_accessor> my_critical_task_stream;
216 #endif
217
218 //! The total number of workers that are requested from the resource manager.
219 int my_total_num_workers_requested;
220
221 //! The index in the array of per priority lists of arenas this object is in.
222 /*const*/ unsigned my_priority_level;
223
224 //! The max priority level of arena in permit manager.
225 std::atomic<bool> my_is_top_priority{false};
226
227 //! Current task pool state and estimate of available tasks amount.
228 atomic_flag my_pool_state;
229
230 //! The list of local observers attached to this arena.
231 observer_list my_observers;
232
233 #if __TBB_ARENA_BINDING
234 //! Pointer to internal observer that allows to bind threads in arena to certain NUMA node.
235 numa_binding_observer* my_numa_binding_observer{nullptr};
236 #endif /*__TBB_ARENA_BINDING*/
237
238 // Below are rarely modified members
239
240 threading_control* my_threading_control;
241
242 //! Default task group context.
243 d1::task_group_context* my_default_ctx;
244
245 //! Waiting object for external threads that cannot join the arena.
246 concurrent_monitor my_exit_monitors;
247
248 //! Coroutines (task_dispathers) cache buffer
249 arena_co_cache my_co_cache;
250
251 // arena needs an extra worker despite the arena limit
252 atomic_flag my_mandatory_concurrency;
253 // the number of local mandatory concurrency requests
254 int my_mandatory_requests;
255
256 //! The number of slots in the arena.
257 unsigned my_num_slots;
258 //! The number of reserved slots (can be occupied only by external threads).
259 unsigned my_num_reserved_slots;
260 //! The number of workers requested by the external thread owning the arena.
261 unsigned my_max_num_workers;
262
263 threading_control_client my_tc_client;
264
265 #if TBB_USE_ASSERT
266 //! Used to trap accesses to the object after its destruction.
267 std::uintptr_t my_guard;
268 #endif /* TBB_USE_ASSERT */
269 }; // struct arena_base
270
271 class arena: public padded<arena_base>
272 {
273 public:
274 using base_type = padded<arena_base>;
275
276 //! Types of work advertised by advertise_new_work()
277 enum new_work_type {
278 work_spawned,
279 wakeup,
280 work_enqueued
281 };
282
283 //! Constructor
284 arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level);
285
286 //! Allocate an instance of arena.
287 static arena& allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
288 unsigned priority_level);
289
290 static arena& create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints = d1::constraints{});
291
num_arena_slots(unsigned num_slots,unsigned num_reserved_slots)292 static int unsigned num_arena_slots ( unsigned num_slots, unsigned num_reserved_slots ) {
293 return num_reserved_slots == 0 ? num_slots : max(2u, num_slots);
294 }
295
allocation_size(unsigned num_slots)296 static int allocation_size( unsigned num_slots ) {
297 return sizeof(base_type) + num_slots * (sizeof(mail_outbox) + sizeof(arena_slot) + sizeof(task_dispatcher));
298 }
299
300 //! Get reference to mailbox corresponding to given slot_id
mailbox(d1::slot_id slot)301 mail_outbox& mailbox( d1::slot_id slot ) {
302 __TBB_ASSERT( slot != d1::no_slot, "affinity should be specified" );
303
304 return reinterpret_cast<mail_outbox*>(this)[-(int)(slot+1)]; // cast to 'int' is redundant but left for readability
305 }
306
307 //! Completes arena shutdown, destructs and deallocates it.
308 void free_arena();
309
310 //! The number of least significant bits for external references
311 static const unsigned ref_external_bits = 12; // up to 4095 external and 1M workers
312
313 //! Reference increment values for externals and workers
314 static const unsigned ref_external = 1;
315 static const unsigned ref_worker = 1 << ref_external_bits;
316
317 //! The number of workers active in the arena.
num_workers_active()318 unsigned num_workers_active() const {
319 return my_references.load(std::memory_order_acquire) >> ref_external_bits;
320 }
321
322 //! Check if the recall is requested by the market.
is_recall_requested()323 bool is_recall_requested() const {
324 return num_workers_active() > my_num_workers_allotted.load(std::memory_order_relaxed);
325 }
326
327 void request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads = false);
328
329 //! If necessary, raise a flag that there is new job in arena.
330 template<arena::new_work_type work_type> void advertise_new_work();
331
332 //! Attempts to steal a task from a randomly chosen arena slot
333 d1::task* steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation);
334
335 //! Get a task from a global starvation resistant queue
336 template<task_stream_accessor_type accessor>
337 d1::task* get_stream_task(task_stream<accessor>& stream, unsigned& hint);
338
339 #if __TBB_PREVIEW_CRITICAL_TASKS
340 //! Tries to find a critical task in global critical task stream
341 d1::task* get_critical_task(unsigned& hint, isolation_type isolation);
342 #endif
343
344 //! Check if there is job anywhere in arena.
345 void out_of_work();
346
347 //! enqueue a task into starvation-resistance queue
348 void enqueue_task(d1::task&, d1::task_group_context&, thread_data&);
349
350 //! Registers the worker with the arena and enters TBB scheduler dispatch loop
351 void process(thread_data&);
352
353 //! Notification that the thread leaves its arena
354
355 void on_thread_leaving(unsigned ref_param);
356
357 //! Check for the presence of enqueued tasks
358 bool has_enqueued_tasks();
359
360 //! Check for the presence of any tasks
361 bool has_tasks();
362
is_empty()363 bool is_empty() { return my_pool_state.test() == /* EMPTY */ false; }
364
365 thread_control_monitor& get_waiting_threads_monitor();
366
367 static const std::size_t out_of_arena = ~size_t(0);
368 //! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena.
369 template <bool as_worker>
370 std::size_t occupy_free_slot(thread_data&);
371 //! Tries to occupy a slot in the specified range.
372 std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper);
373
374 std::uintptr_t calculate_stealing_threshold();
375
priority_level()376 unsigned priority_level() { return my_priority_level; }
377
has_request()378 bool has_request() { return my_total_num_workers_requested; }
379
references()380 unsigned references() const { return my_references.load(std::memory_order_acquire); }
381
is_arena_workerless()382 bool is_arena_workerless() const { return my_max_num_workers == 0; }
383
384 void set_top_priority(bool);
385
386 bool is_top_priority() const;
387
388 bool try_join();
389
390 void set_allotment(unsigned allotment);
391
392 int update_concurrency(unsigned concurrency);
393
394 std::pair</*min workers = */ int, /*max workers = */ int> update_request(int mandatory_delta, int workers_delta);
395
396 /** Must be the last data field */
397 arena_slot my_slots[1];
398 }; // class arena
399
400 template <arena::new_work_type work_type>
advertise_new_work()401 void arena::advertise_new_work() {
402 bool is_mandatory_needed = false;
403 bool are_workers_needed = false;
404
405 if (work_type != work_spawned) {
406 // Local memory fence here and below is required to avoid missed wakeups; see the comment below.
407 // Starvation resistant tasks require concurrency, so missed wakeups are unacceptable.
408 atomic_fence_seq_cst();
409 }
410
411 if (work_type == work_enqueued && my_num_slots > my_num_reserved_slots) {
412 is_mandatory_needed = my_mandatory_concurrency.test_and_set();
413 }
414
415 // Double-check idiom that, in case of spawning, is deliberately sloppy about memory fences.
416 // Technically, to avoid missed wakeups, there should be a full memory fence between the point we
417 // released the task pool (i.e. spawned task) and read the arena's state. However, adding such a
418 // fence might hurt overall performance more than it helps, because the fence would be executed
419 // on every task pool release, even when stealing does not occur. Since TBB allows parallelism,
420 // but never promises parallelism, the missed wakeup is not a correctness problem.
421 are_workers_needed = my_pool_state.test_and_set();
422
423 if (is_mandatory_needed || are_workers_needed) {
424 int mandatory_delta = is_mandatory_needed ? 1 : 0;
425 int workers_delta = are_workers_needed ? my_max_num_workers : 0;
426
427 if (is_mandatory_needed && is_arena_workerless()) {
428 // Set workers_delta to 1 to keep arena invariants consistent
429 workers_delta = 1;
430 }
431
432 request_workers(mandatory_delta, workers_delta, /* wakeup_threads = */ true);
433 }
434 }
435
steal_task(unsigned arena_index,FastRandom & frnd,execution_data_ext & ed,isolation_type isolation)436 inline d1::task* arena::steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation) {
437 auto slot_num_limit = my_limit.load(std::memory_order_relaxed);
438 if (slot_num_limit == 1) {
439 // No slots to steal from
440 return nullptr;
441 }
442 // Try to steal a task from a random victim.
443 std::size_t k = frnd.get() % (slot_num_limit - 1);
444 // The following condition excludes the external thread that might have
445 // already taken our previous place in the arena from the list .
446 // of potential victims. But since such a situation can take
447 // place only in case of significant oversubscription, keeping
448 // the checks simple seems to be preferable to complicating the code.
449 if (k >= arena_index) {
450 ++k; // Adjusts random distribution to exclude self
451 }
452 arena_slot* victim = &my_slots[k];
453 d1::task **pool = victim->task_pool.load(std::memory_order_relaxed);
454 d1::task *t = nullptr;
455 if (pool == EmptyTaskPool || !(t = victim->steal_task(*this, isolation, k))) {
456 return nullptr;
457 }
458 if (task_accessor::is_proxy_task(*t)) {
459 task_proxy &tp = *(task_proxy*)t;
460 d1::slot_id slot = tp.slot;
461 t = tp.extract_task<task_proxy::pool_bit>();
462 if (!t) {
463 // Proxy was empty, so it's our responsibility to free it
464 tp.allocator.delete_object(&tp, ed);
465 return nullptr;
466 }
467 // Note affinity is called for any stolen task (proxy or general)
468 ed.affinity_slot = slot;
469 } else {
470 // Note affinity is called for any stolen task (proxy or general)
471 ed.affinity_slot = d1::any_slot;
472 }
473 // Update task owner thread id to identify stealing
474 ed.original_slot = k;
475 return t;
476 }
477
478 template<task_stream_accessor_type accessor>
get_stream_task(task_stream<accessor> & stream,unsigned & hint)479 inline d1::task* arena::get_stream_task(task_stream<accessor>& stream, unsigned& hint) {
480 if (stream.empty())
481 return nullptr;
482 return stream.pop(subsequent_lane_selector(hint));
483 }
484
485 #if __TBB_PREVIEW_CRITICAL_TASKS
486 // Retrieves critical task respecting isolation level, if provided. The rule is:
487 // 1) If no outer critical task and no isolation => take any critical task
488 // 2) If working on an outer critical task and no isolation => cannot take any critical task
489 // 3) If no outer critical task but isolated => respect isolation
490 // 4) If working on an outer critical task and isolated => respect isolation
491 // Hint is used to keep some LIFO-ness, start search with the lane that was used during push operation.
get_critical_task(unsigned & hint,isolation_type isolation)492 inline d1::task* arena::get_critical_task(unsigned& hint, isolation_type isolation) {
493 if (my_critical_task_stream.empty())
494 return nullptr;
495
496 if ( isolation != no_isolation ) {
497 return my_critical_task_stream.pop_specific( hint, isolation );
498 } else {
499 return my_critical_task_stream.pop(preceding_lane_selector(hint));
500 }
501 }
502 #endif // __TBB_PREVIEW_CRITICAL_TASKS
503
504 } // namespace r1
505 } // namespace detail
506 } // namespace tbb
507
508 #endif /* _TBB_arena_H */
509