xref: /oneTBB/src/tbb/arena.h (revision f71c92ae)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_arena_H
18 #define _TBB_arena_H
19 
20 #include <atomic>
21 #include <cstring>
22 
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/detail/_utils.h"
25 #include "oneapi/tbb/spin_mutex.h"
26 
27 #include "scheduler_common.h"
28 #include "intrusive_list.h"
29 #include "task_stream.h"
30 #include "arena_slot.h"
31 #include "rml_tbb.h"
32 #include "mailbox.h"
33 #include "governor.h"
34 #include "concurrent_monitor.h"
35 #include "observer_proxy.h"
36 #include "thread_control_monitor.h"
37 #include "threading_control_client.h"
38 
39 namespace tbb {
40 namespace detail {
41 namespace r1 {
42 
43 class task_dispatcher;
44 class task_group_context;
45 class threading_control;
46 class allocate_root_with_context_proxy;
47 
48 #if __TBB_ARENA_BINDING
49 class numa_binding_observer;
50 #endif /*__TBB_ARENA_BINDING*/
51 
52 //! Bounded coroutines cache LIFO ring buffer
53 class arena_co_cache {
54     //! Ring buffer storage
55     task_dispatcher** my_co_scheduler_cache;
56     //! Current cache index
57     unsigned my_head;
58     //! Cache capacity for arena
59     unsigned my_max_index;
60     //! Accessor lock for modification operations
61     tbb::spin_mutex my_co_cache_mutex;
62 
next_index()63     unsigned next_index() {
64         return ( my_head == my_max_index ) ? 0 : my_head + 1;
65     }
66 
prev_index()67     unsigned prev_index() {
68         return ( my_head == 0 ) ? my_max_index : my_head - 1;
69     }
70 
internal_empty()71     bool internal_empty() {
72         return my_co_scheduler_cache[prev_index()] == nullptr;
73     }
74 
internal_task_dispatcher_cleanup(task_dispatcher * to_cleanup)75     void internal_task_dispatcher_cleanup(task_dispatcher* to_cleanup) {
76         to_cleanup->~task_dispatcher();
77         cache_aligned_deallocate(to_cleanup);
78     }
79 
80 public:
init(unsigned cache_capacity)81     void init(unsigned cache_capacity) {
82         std::size_t alloc_size = cache_capacity * sizeof(task_dispatcher*);
83         my_co_scheduler_cache = (task_dispatcher**)cache_aligned_allocate(alloc_size);
84         std::memset( my_co_scheduler_cache, 0, alloc_size );
85         my_head = 0;
86         my_max_index = cache_capacity - 1;
87     }
88 
cleanup()89     void cleanup() {
90         while (task_dispatcher* to_cleanup = pop()) {
91             internal_task_dispatcher_cleanup(to_cleanup);
92         }
93         cache_aligned_deallocate(my_co_scheduler_cache);
94     }
95 
96     //! Insert scheduler to the current available place.
97     //! Replace an old value, if necessary.
push(task_dispatcher * s)98     void push(task_dispatcher* s) {
99         task_dispatcher* to_cleanup = nullptr;
100         {
101             tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
102             // Check if we are replacing some existing buffer entrance
103             if (my_co_scheduler_cache[my_head] != nullptr) {
104                 to_cleanup = my_co_scheduler_cache[my_head];
105             }
106             // Store the cached value
107             my_co_scheduler_cache[my_head] = s;
108             // Move head index to the next slot
109             my_head = next_index();
110         }
111         // Cleanup replaced buffer if any
112         if (to_cleanup) {
113             internal_task_dispatcher_cleanup(to_cleanup);
114         }
115     }
116 
117     //! Get a cached scheduler if any
pop()118     task_dispatcher* pop() {
119         tbb::spin_mutex::scoped_lock lock(my_co_cache_mutex);
120         // No cached coroutine
121         if (internal_empty()) {
122             return nullptr;
123         }
124         // Move head index to the currently available value
125         my_head = prev_index();
126         // Retrieve the value from the buffer
127         task_dispatcher* to_return = my_co_scheduler_cache[my_head];
128         // Clear the previous entrance value
129         my_co_scheduler_cache[my_head] = nullptr;
130         return to_return;
131     }
132 };
133 
134 struct stack_anchor_type {
135     stack_anchor_type() = default;
136     stack_anchor_type(const stack_anchor_type&) = delete;
137 };
138 
139 class atomic_flag {
140     static const std::uintptr_t SET = 1;
141     static const std::uintptr_t UNSET = 0;
142     std::atomic<std::uintptr_t> my_state{UNSET};
143 public:
test_and_set()144     bool test_and_set() {
145         std::uintptr_t state = my_state.load(std::memory_order_acquire);
146         switch (state) {
147         case SET:
148             return false;
149         default: /* busy */
150             if (my_state.compare_exchange_strong(state, SET)) {
151                 // We interrupted clear transaction
152                 return false;
153             }
154             if (state != UNSET) {
155                 // We lost our epoch
156                 return false;
157             }
158             // We are too late but still in the same epoch
159             __TBB_fallthrough;
160         case UNSET:
161             return my_state.compare_exchange_strong(state, SET);
162         }
163     }
164     template <typename Pred>
try_clear_if(Pred && pred)165     bool try_clear_if(Pred&& pred) {
166         std::uintptr_t busy = std::uintptr_t(&busy);
167         std::uintptr_t state = my_state.load(std::memory_order_acquire);
168         if (state == SET && my_state.compare_exchange_strong(state, busy)) {
169             if (pred()) {
170                 return my_state.compare_exchange_strong(busy, UNSET);
171             }
172             // The result of the next operation is discarded, always false should be returned.
173             my_state.compare_exchange_strong(busy, SET);
174         }
175         return false;
176     }
177     bool test(std::memory_order order = std::memory_order_acquire) {
178         return my_state.load(order) != UNSET;
179     }
180 };
181 
182 //! The structure of an arena, except the array of slots.
183 /** Separated in order to simplify padding.
184     Intrusive list node base class is used by market to form a list of arenas. **/
185 // TODO: Analyze arena_base cache lines placement
186 struct arena_base : padded<intrusive_list_node> {
187     //! The number of workers that have been marked out by the resource manager to service the arena.
188     std::atomic<unsigned> my_num_workers_allotted;   // heavy use in stealing loop
189 
190     //! Reference counter for the arena.
191     /** Worker and external thread references are counted separately: first several bits are for references
192         from external thread threads or explicit task_arenas (see arena::ref_external_bits below);
193         the rest counts the number of workers servicing the arena. */
194     std::atomic<unsigned> my_references;     // heavy use in stealing loop
195 
196     //! The maximal number of currently busy slots.
197     std::atomic<unsigned> my_limit;          // heavy use in stealing loop
198 
199     //! Task pool for the tasks scheduled via task::enqueue() method.
200     /** Such scheduling guarantees eventual execution even if
201         - new tasks are constantly coming (by extracting scheduled tasks in
202           relaxed FIFO order);
203         - the enqueuing thread does not call any of wait_for_all methods. **/
204     task_stream<front_accessor> my_fifo_task_stream; // heavy use in stealing loop
205 
206     //! Task pool for the tasks scheduled via tbb::resume() function.
207     task_stream<front_accessor> my_resume_task_stream; // heavy use in stealing loop
208 
209 #if __TBB_PREVIEW_CRITICAL_TASKS
210     //! Task pool for the tasks with critical property set.
211     /** Critical tasks are scheduled for execution ahead of other sources (including local task pool
212         and even bypassed tasks) unless the thread already executes a critical task in an outer
213         dispatch loop **/
214     // used on the hot path of the task dispatch loop
215     task_stream<back_nonnull_accessor> my_critical_task_stream;
216 #endif
217 
218     //! The total number of workers that are requested from the resource manager.
219     int my_total_num_workers_requested;
220 
221     //! The index in the array of per priority lists of arenas this object is in.
222     /*const*/ unsigned my_priority_level;
223 
224     //! The max priority level of arena in permit manager.
225     std::atomic<bool> my_is_top_priority{false};
226 
227     //! Current task pool state and estimate of available tasks amount.
228     atomic_flag my_pool_state;
229 
230     //! The list of local observers attached to this arena.
231     observer_list my_observers;
232 
233 #if __TBB_ARENA_BINDING
234     //! Pointer to internal observer that allows to bind threads in arena to certain NUMA node.
235     numa_binding_observer* my_numa_binding_observer{nullptr};
236 #endif /*__TBB_ARENA_BINDING*/
237 
238     // Below are rarely modified members
239 
240     threading_control* my_threading_control;
241 
242     //! Default task group context.
243     d1::task_group_context* my_default_ctx;
244 
245     //! Waiting object for external threads that cannot join the arena.
246     concurrent_monitor my_exit_monitors;
247 
248     //! Coroutines (task_dispathers) cache buffer
249     arena_co_cache my_co_cache;
250 
251     // arena needs an extra worker despite the arena limit
252     atomic_flag my_mandatory_concurrency;
253     // the number of local mandatory concurrency requests
254     int my_mandatory_requests;
255 
256     //! The number of slots in the arena.
257     unsigned my_num_slots;
258     //! The number of reserved slots (can be occupied only by external threads).
259     unsigned my_num_reserved_slots;
260     //! The number of workers requested by the external thread owning the arena.
261     unsigned my_max_num_workers;
262 
263     threading_control_client my_tc_client;
264 
265 #if TBB_USE_ASSERT
266     //! Used to trap accesses to the object after its destruction.
267     std::uintptr_t my_guard;
268 #endif /* TBB_USE_ASSERT */
269 }; // struct arena_base
270 
271 class arena: public padded<arena_base>
272 {
273 public:
274     using base_type = padded<arena_base>;
275 
276     //! Types of work advertised by advertise_new_work()
277     enum new_work_type {
278         work_spawned,
279         wakeup,
280         work_enqueued
281     };
282 
283     //! Constructor
284     arena(threading_control* control, unsigned max_num_workers, unsigned num_reserved_slots, unsigned priority_level);
285 
286     //! Allocate an instance of arena.
287     static arena& allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
288                                   unsigned priority_level);
289 
290     static arena& create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints = d1::constraints{});
291 
num_arena_slots(unsigned num_slots,unsigned num_reserved_slots)292     static int unsigned num_arena_slots ( unsigned num_slots, unsigned num_reserved_slots ) {
293         return num_reserved_slots == 0 ? num_slots : max(2u, num_slots);
294     }
295 
allocation_size(unsigned num_slots)296     static int allocation_size( unsigned num_slots ) {
297         return sizeof(base_type) + num_slots * (sizeof(mail_outbox) + sizeof(arena_slot) + sizeof(task_dispatcher));
298     }
299 
300     //! Get reference to mailbox corresponding to given slot_id
mailbox(d1::slot_id slot)301     mail_outbox& mailbox( d1::slot_id slot ) {
302         __TBB_ASSERT( slot != d1::no_slot, "affinity should be specified" );
303 
304         return reinterpret_cast<mail_outbox*>(this)[-(int)(slot+1)]; // cast to 'int' is redundant but left for readability
305     }
306 
307     //! Completes arena shutdown, destructs and deallocates it.
308     void free_arena();
309 
310     //! The number of least significant bits for external references
311     static const unsigned ref_external_bits = 12; // up to 4095 external and 1M workers
312 
313     //! Reference increment values for externals and workers
314     static const unsigned ref_external = 1;
315     static const unsigned ref_worker   = 1 << ref_external_bits;
316 
317     //! The number of workers active in the arena.
num_workers_active()318     unsigned num_workers_active() const {
319         return my_references.load(std::memory_order_acquire) >> ref_external_bits;
320     }
321 
322     //! Check if the recall is requested by the market.
is_recall_requested()323     bool is_recall_requested() const {
324         return num_workers_active() > my_num_workers_allotted.load(std::memory_order_relaxed);
325     }
326 
327     void request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads = false);
328 
329     //! If necessary, raise a flag that there is new job in arena.
330     template<arena::new_work_type work_type> void advertise_new_work();
331 
332     //! Attempts to steal a task from a randomly chosen arena slot
333     d1::task* steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation);
334 
335     //! Get a task from a global starvation resistant queue
336     template<task_stream_accessor_type accessor>
337     d1::task* get_stream_task(task_stream<accessor>& stream, unsigned& hint);
338 
339 #if __TBB_PREVIEW_CRITICAL_TASKS
340     //! Tries to find a critical task in global critical task stream
341     d1::task* get_critical_task(unsigned& hint, isolation_type isolation);
342 #endif
343 
344     //! Check if there is job anywhere in arena.
345     void out_of_work();
346 
347     //! enqueue a task into starvation-resistance queue
348     void enqueue_task(d1::task&, d1::task_group_context&, thread_data&);
349 
350     //! Registers the worker with the arena and enters TBB scheduler dispatch loop
351     void process(thread_data&);
352 
353     //! Notification that the thread leaves its arena
354 
355     void on_thread_leaving(unsigned ref_param);
356 
357     //! Check for the presence of enqueued tasks
358     bool has_enqueued_tasks();
359 
360     //! Check for the presence of any tasks
361     bool has_tasks();
362 
is_empty()363     bool is_empty() { return my_pool_state.test() == /* EMPTY */ false; }
364 
365     thread_control_monitor& get_waiting_threads_monitor();
366 
367     static const std::size_t out_of_arena = ~size_t(0);
368     //! Tries to occupy a slot in the arena. On success, returns the slot index; if no slot is available, returns out_of_arena.
369     template <bool as_worker>
370     std::size_t occupy_free_slot(thread_data&);
371     //! Tries to occupy a slot in the specified range.
372     std::size_t occupy_free_slot_in_range(thread_data& tls, std::size_t lower, std::size_t upper);
373 
374     std::uintptr_t calculate_stealing_threshold();
375 
priority_level()376     unsigned priority_level() { return my_priority_level; }
377 
has_request()378     bool has_request() { return my_total_num_workers_requested; }
379 
references()380     unsigned references() const { return my_references.load(std::memory_order_acquire); }
381 
is_arena_workerless()382     bool is_arena_workerless() const { return my_max_num_workers == 0; }
383 
384     void set_top_priority(bool);
385 
386     bool is_top_priority() const;
387 
388     bool try_join();
389 
390     void set_allotment(unsigned allotment);
391 
392     int update_concurrency(unsigned concurrency);
393 
394     std::pair</*min workers = */ int, /*max workers = */ int> update_request(int mandatory_delta, int workers_delta);
395 
396     /** Must be the last data field */
397     arena_slot my_slots[1];
398 }; // class arena
399 
400 template <arena::new_work_type work_type>
advertise_new_work()401 void arena::advertise_new_work() {
402     bool is_mandatory_needed = false;
403     bool are_workers_needed = false;
404 
405     if (work_type != work_spawned) {
406         // Local memory fence here and below is required to avoid missed wakeups; see the comment below.
407         // Starvation resistant tasks require concurrency, so missed wakeups are unacceptable.
408         atomic_fence_seq_cst();
409     }
410 
411     if (work_type == work_enqueued && my_num_slots > my_num_reserved_slots) {
412         is_mandatory_needed = my_mandatory_concurrency.test_and_set();
413     }
414 
415     // Double-check idiom that, in case of spawning, is deliberately sloppy about memory fences.
416     // Technically, to avoid missed wakeups, there should be a full memory fence between the point we
417     // released the task pool (i.e. spawned task) and read the arena's state.  However, adding such a
418     // fence might hurt overall performance more than it helps, because the fence would be executed
419     // on every task pool release, even when stealing does not occur.  Since TBB allows parallelism,
420     // but never promises parallelism, the missed wakeup is not a correctness problem.
421     are_workers_needed = my_pool_state.test_and_set();
422 
423     if (is_mandatory_needed || are_workers_needed) {
424         int mandatory_delta = is_mandatory_needed ? 1 : 0;
425         int workers_delta = are_workers_needed ? my_max_num_workers : 0;
426 
427         if (is_mandatory_needed && is_arena_workerless()) {
428             // Set workers_delta to 1 to keep arena invariants consistent
429             workers_delta = 1;
430         }
431 
432         request_workers(mandatory_delta, workers_delta, /* wakeup_threads = */ true);
433     }
434 }
435 
steal_task(unsigned arena_index,FastRandom & frnd,execution_data_ext & ed,isolation_type isolation)436 inline d1::task* arena::steal_task(unsigned arena_index, FastRandom& frnd, execution_data_ext& ed, isolation_type isolation) {
437     auto slot_num_limit = my_limit.load(std::memory_order_relaxed);
438     if (slot_num_limit == 1) {
439         // No slots to steal from
440         return nullptr;
441     }
442     // Try to steal a task from a random victim.
443     std::size_t k = frnd.get() % (slot_num_limit - 1);
444     // The following condition excludes the external thread that might have
445     // already taken our previous place in the arena from the list .
446     // of potential victims. But since such a situation can take
447     // place only in case of significant oversubscription, keeping
448     // the checks simple seems to be preferable to complicating the code.
449     if (k >= arena_index) {
450         ++k; // Adjusts random distribution to exclude self
451     }
452     arena_slot* victim = &my_slots[k];
453     d1::task **pool = victim->task_pool.load(std::memory_order_relaxed);
454     d1::task *t = nullptr;
455     if (pool == EmptyTaskPool || !(t = victim->steal_task(*this, isolation, k))) {
456         return nullptr;
457     }
458     if (task_accessor::is_proxy_task(*t)) {
459         task_proxy &tp = *(task_proxy*)t;
460         d1::slot_id slot = tp.slot;
461         t = tp.extract_task<task_proxy::pool_bit>();
462         if (!t) {
463             // Proxy was empty, so it's our responsibility to free it
464             tp.allocator.delete_object(&tp, ed);
465             return nullptr;
466         }
467         // Note affinity is called for any stolen task (proxy or general)
468         ed.affinity_slot = slot;
469     } else {
470         // Note affinity is called for any stolen task (proxy or general)
471         ed.affinity_slot = d1::any_slot;
472     }
473     // Update task owner thread id to identify stealing
474     ed.original_slot = k;
475     return t;
476 }
477 
478 template<task_stream_accessor_type accessor>
get_stream_task(task_stream<accessor> & stream,unsigned & hint)479 inline d1::task* arena::get_stream_task(task_stream<accessor>& stream, unsigned& hint) {
480     if (stream.empty())
481         return nullptr;
482     return stream.pop(subsequent_lane_selector(hint));
483 }
484 
485 #if __TBB_PREVIEW_CRITICAL_TASKS
486 // Retrieves critical task respecting isolation level, if provided. The rule is:
487 // 1) If no outer critical task and no isolation => take any critical task
488 // 2) If working on an outer critical task and no isolation => cannot take any critical task
489 // 3) If no outer critical task but isolated => respect isolation
490 // 4) If working on an outer critical task and isolated => respect isolation
491 // Hint is used to keep some LIFO-ness, start search with the lane that was used during push operation.
get_critical_task(unsigned & hint,isolation_type isolation)492 inline d1::task* arena::get_critical_task(unsigned& hint, isolation_type isolation) {
493     if (my_critical_task_stream.empty())
494         return nullptr;
495 
496     if ( isolation != no_isolation ) {
497         return my_critical_task_stream.pop_specific( hint, isolation );
498     } else {
499         return my_critical_task_stream.pop(preceding_lane_selector(hint));
500     }
501 }
502 #endif // __TBB_PREVIEW_CRITICAL_TASKS
503 
504 } // namespace r1
505 } // namespace detail
506 } // namespace tbb
507 
508 #endif /* _TBB_arena_H */
509