xref: /oneTBB/src/tbb/arena.cpp (revision 7b022651)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "threading_control.h"
20 #include "arena.h"
21 #include "itt_notify.h"
22 #include "semaphore.h"
23 #include "waiters.h"
24 #include "oneapi/tbb/detail/_task.h"
25 #include "oneapi/tbb/info.h"
26 #include "oneapi/tbb/tbb_allocator.h"
27 
28 #include <atomic>
29 #include <cstring>
30 #include <functional>
31 
32 namespace tbb {
33 namespace detail {
34 namespace r1 {
35 
36 #if __TBB_ARENA_BINDING
37 class numa_binding_observer : public tbb::task_scheduler_observer {
38     binding_handler* my_binding_handler;
39 public:
numa_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)40     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
41         : task_scheduler_observer(*ta)
42         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
43     {}
44 
on_scheduler_entry(bool)45     void on_scheduler_entry( bool ) override {
46         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
47     }
48 
on_scheduler_exit(bool)49     void on_scheduler_exit( bool ) override {
50         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
51     }
52 
~numa_binding_observer()53     ~numa_binding_observer() override{
54         destroy_binding_handler(my_binding_handler);
55     }
56 };
57 
construct_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
59     numa_binding_observer* binding_observer = nullptr;
60     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
61         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
62         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
63         binding_observer->observe(true);
64     }
65     return binding_observer;
66 }
67 
destroy_binding_observer(numa_binding_observer * binding_observer)68 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
69     __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
70     binding_observer->observe(false);
71     binding_observer->~numa_binding_observer();
72     deallocate_memory(binding_observer);
73 }
74 #endif /*!__TBB_ARENA_BINDING*/
75 
on_thread_leaving(unsigned ref_param)76 void arena::on_thread_leaving(unsigned ref_param) {
77     //
78     // Implementation of arena destruction synchronization logic contained various
79     // bugs/flaws at the different stages of its evolution, so below is a detailed
80     // description of the issues taken into consideration in the framework of the
81     // current design.
82     //
83     // In case of using fire-and-forget tasks (scheduled via task::enqueue())
84     // external thread is allowed to leave its arena before all its work is executed,
85     // and market may temporarily revoke all workers from this arena. Since revoked
86     // workers never attempt to reset arena state to EMPTY and cancel its request
87     // to RML for threads, the arena object is destroyed only when both the last
88     // thread is leaving it and arena's state is EMPTY (that is its external thread
89     // left and it does not contain any work).
90     // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
91     // be done here (or anywhere else in the external thread to that matter); doing so
92     // can result either in arena's premature destruction (at least without
93     // additional costly checks in workers) or in unnecessary arena state changes
94     // (and ensuing workers migration).
95     //
96     // A worker that checks for work presence and transitions arena to the EMPTY
97     // state (in snapshot taking procedure arena::out_of_work()) updates
98     // arena::my_pool_state first and only then arena::my_num_workers_requested.
99     // So the check for work absence must be done against the latter field.
100     //
101     // In a time window between decrementing the active threads count and checking
102     // if there is an outstanding request for workers. New worker thread may arrive,
103     // finish remaining work, set arena state to empty, and leave decrementing its
104     // refcount and destroying. Then the current thread will destroy the arena
105     // the second time. To preclude it a local copy of the outstanding request
106     // value can be stored before decrementing active threads count.
107     //
108     // But this technique may cause two other problem. When the stored request is
109     // zero, it is possible that arena still has threads and they can generate new
110     // tasks and thus re-establish non-zero requests. Then all the threads can be
111     // revoked (as described above) leaving this thread the last one, and causing
112     // it to destroy non-empty arena.
113     //
114     // The other problem takes place when the stored request is non-zero. Another
115     // thread may complete the work, set arena state to empty, and leave without
116     // arena destruction before this thread decrements the refcount. This thread
117     // cannot destroy the arena either. Thus the arena may be "orphaned".
118     //
119     // In both cases we cannot dereference arena pointer after the refcount is
120     // decremented, as our arena may already be destroyed.
121     //
122     // If this is the external thread, the market is protected by refcount to it.
123     // In case of workers market's liveness is ensured by the RML connection
124     // rundown protocol, according to which the client (i.e. the market) lives
125     // until RML server notifies it about connection termination, and this
126     // notification is fired only after all workers return into RML.
127     //
128     // Thus if we decremented refcount to zero we ask the market to check arena
129     // state (including the fact if it is alive) under the lock.
130     //
131 
132     __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
133 
134     // When there is no workers someone must free arena, as
135     // without workers, no one calls out_of_work().
136     if (ref_param == ref_external && !my_mandatory_concurrency.test()) {
137         out_of_work();
138     }
139 
140     threading_control* tc = my_threading_control;
141     auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client);
142     // Release our reference to sync with destroy_client
143     unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
144     // do not access `this` it might be destroyed already
145     if (remaining_ref == 0) {
146         if (tc->try_destroy_client(tc_client_snapshot)) {
147             // We are requested to destroy ourself
148             free_arena();
149         }
150     }
151 }
152 
occupy_free_slot_in_range(thread_data & tls,std::size_t lower,std::size_t upper)153 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
154     if ( lower >= upper ) return out_of_arena;
155     // Start search for an empty slot from the one we occupied the last time
156     std::size_t index = tls.my_arena_index;
157     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
158     __TBB_ASSERT( index >= lower && index < upper, nullptr);
159     // Find a free slot
160     for ( std::size_t i = index; i < upper; ++i )
161         if (my_slots[i].try_occupy()) return i;
162     for ( std::size_t i = lower; i < index; ++i )
163         if (my_slots[i].try_occupy()) return i;
164     return out_of_arena;
165 }
166 
167 template <bool as_worker>
occupy_free_slot(thread_data & tls)168 std::size_t arena::occupy_free_slot(thread_data& tls) {
169     // Firstly, external threads try to occupy reserved slots
170     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
171     if ( index == out_of_arena ) {
172         // Secondly, all threads try to occupy all non-reserved slots
173         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
174         // Likely this arena is already saturated
175         if ( index == out_of_arena )
176             return out_of_arena;
177     }
178 
179     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
180     return index;
181 }
182 
calculate_stealing_threshold()183 std::uintptr_t arena::calculate_stealing_threshold() {
184     stack_anchor_type anchor;
185     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size());
186 }
187 
process(thread_data & tls)188 void arena::process(thread_data& tls) {
189     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
190     __TBB_ASSERT( is_alive(my_guard), nullptr);
191     __TBB_ASSERT( my_num_slots >= 1, nullptr);
192 
193     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
194     if (index == out_of_arena) {
195         on_thread_leaving(ref_worker);
196         return;
197     }
198 
199     my_tc_client.get_pm_client()->register_thread();
200 
201     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
202     tls.attach_arena(*this, index);
203     // worker thread enters the dispatch loop to look for a work
204     tls.my_inbox.set_is_idle(true);
205     if (tls.my_arena_slot->is_task_pool_published()) {
206         tls.my_inbox.set_is_idle(false);
207     }
208 
209     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
210     tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
211     __TBB_ASSERT(task_disp.can_steal(), nullptr);
212 
213     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
214     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
215 
216     // Waiting on special object tied to this arena
217     outermost_worker_waiter waiter(*this);
218     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
219     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
220     // attached to it.
221     tls.my_inbox.set_is_idle(true);
222 
223     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
224     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
225     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
226 
227     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
228     tls.my_last_observer = nullptr;
229 
230     tls.leave_task_dispatcher();
231 
232     // Arena slot detach (arena may be used in market::process)
233     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
234     tls.my_arena_slot->release();
235     tls.my_arena_slot = nullptr;
236     tls.my_inbox.detach();
237     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
238     __TBB_ASSERT(is_alive(my_guard), nullptr);
239 
240     my_tc_client.get_pm_client()->unregister_thread();
241 
242     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
243     // that arena may be temporarily left unpopulated by threads. See comments in
244     // arena::on_thread_leaving() for more details.
245     on_thread_leaving(ref_worker);
246     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
247 }
248 
arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)249 arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) {
250     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
251     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
252     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
253     my_threading_control = control;
254     my_limit = 1;
255     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
256     my_num_slots = num_arena_slots(num_slots, num_reserved_slots);
257     my_num_reserved_slots = num_reserved_slots;
258     my_max_num_workers = num_slots-num_reserved_slots;
259     my_priority_level = priority_level;
260     my_references = ref_external; // accounts for the external thread
261     my_observers.my_arena = this;
262     my_co_cache.init(4 * num_slots);
263     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
264     // Initialize the default context. It should be allocated before task_dispatch construction.
265     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
266         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
267     // Construct slots. Mark internal synchronization elements for the tools.
268     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
269     for( unsigned i = 0; i < my_num_slots; ++i ) {
270         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
271         __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
272         __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
273         mailbox(i).construct();
274         my_slots[i].init_task_streams(i);
275         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
276         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
277     }
278     my_fifo_task_stream.initialize(my_num_slots);
279     my_resume_task_stream.initialize(my_num_slots);
280 #if __TBB_PREVIEW_CRITICAL_TASKS
281     my_critical_task_stream.initialize(my_num_slots);
282 #endif
283     my_mandatory_requests = 0;
284 }
285 
allocate_arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)286 arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
287                               unsigned priority_level)
288 {
289     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
290     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
291     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
292     std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots));
293     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
294     // Zero all slots to indicate that they are empty
295     std::memset( storage, 0, n );
296 
297     return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
298         arena(control, num_slots, num_reserved_slots, priority_level);
299 }
300 
free_arena()301 void arena::free_arena () {
302     __TBB_ASSERT( is_alive(my_guard), nullptr);
303     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
304     __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
305     __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" );
306 #if __TBB_ARENA_BINDING
307     if (my_numa_binding_observer != nullptr) {
308         destroy_binding_observer(my_numa_binding_observer);
309         my_numa_binding_observer = nullptr;
310     }
311 #endif /*__TBB_ARENA_BINDING*/
312     poison_value( my_guard );
313     for ( unsigned i = 0; i < my_num_slots; ++i ) {
314         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
315         // TODO: understand the assertion and modify
316         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
317         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
318         my_slots[i].free_task_pool();
319         mailbox(i).drain();
320         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
321     }
322     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
323     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
324     // Cleanup coroutines/schedulers cache
325     my_co_cache.cleanup();
326     my_default_ctx->~task_group_context();
327     cache_aligned_deallocate(my_default_ctx);
328 #if __TBB_PREVIEW_CRITICAL_TASKS
329     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
330 #endif
331     // Clear enfources synchronization with observe(false)
332     my_observers.clear();
333 
334     void* storage  = &mailbox(my_num_slots-1);
335     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
336     this->~arena();
337 #if TBB_USE_ASSERT > 1
338     std::memset( storage, 0, allocation_size(my_num_slots) );
339 #endif /* TBB_USE_ASSERT */
340     cache_aligned_deallocate( storage );
341 }
342 
has_enqueued_tasks()343 bool arena::has_enqueued_tasks() {
344     return !my_fifo_task_stream.empty();
345 }
346 
request_workers(int mandatory_delta,int workers_delta,bool wakeup_threads)347 void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) {
348     my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta);
349 
350     if (wakeup_threads) {
351         // Notify all sleeping threads that work has appeared in the arena.
352         get_waiting_threads_monitor().notify([&] (market_context context) {
353             return this == context.my_arena_addr;
354         });
355     }
356 }
357 
has_tasks()358 bool arena::has_tasks() {
359     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
360     std::size_t n = my_limit.load(std::memory_order_acquire);
361     bool tasks_are_available = false;
362     for (std::size_t k = 0; k < n && !tasks_are_available; ++k) {
363         tasks_are_available = !my_slots[k].is_empty();
364     }
365     tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty();
366 #if __TBB_PREVIEW_CRITICAL_TASKS
367     tasks_are_available = tasks_are_available || !my_critical_task_stream.empty();
368 #endif
369     return tasks_are_available;
370 }
371 
out_of_work()372 void arena::out_of_work() {
373     // We should try unset my_pool_state first due to keep arena invariants in consistent state
374     // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant
375     bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); });
376     bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); });
377 
378     if (disable_mandatory || release_workers) {
379         int mandatory_delta = disable_mandatory ? -1 : 0;
380         int workers_delta = release_workers ? -(int)my_max_num_workers : 0;
381 
382         if (disable_mandatory && is_arena_workerless()) {
383             // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now
384             workers_delta = -1;
385         }
386         request_workers(mandatory_delta, workers_delta);
387     }
388 }
389 
set_top_priority(bool is_top_priority)390 void arena::set_top_priority(bool is_top_priority) {
391     my_is_top_priority.store(is_top_priority, std::memory_order_relaxed);
392 }
393 
is_top_priority() const394 bool arena::is_top_priority() const {
395     return my_is_top_priority.load(std::memory_order_relaxed);
396 }
397 
try_join()398 bool arena::try_join() {
399     if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) {
400         my_references += arena::ref_worker;
401         return true;
402     }
403     return false;
404 }
405 
set_allotment(unsigned allotment)406 void arena::set_allotment(unsigned allotment) {
407     if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) {
408         my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
409     }
410 }
411 
update_concurrency(unsigned allotment)412 int arena::update_concurrency(unsigned allotment) {
413     int delta = allotment - my_num_workers_allotted.load(std::memory_order_relaxed);
414     if (delta != 0) {
415         my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
416     }
417     return delta;
418 }
419 
update_request(int mandatory_delta,int workers_delta)420 std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) {
421     __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
422 
423     int min_workers_request = 0;
424     int max_workers_request = 0;
425 
426     // Calculate min request
427     my_mandatory_requests += mandatory_delta;
428     min_workers_request = my_mandatory_requests > 0 ? 1 : 0;
429 
430     // Calculate max request
431     my_total_num_workers_requested += workers_delta;
432     // Clamp worker request into interval [0, my_max_num_workers]
433     max_workers_request = clamp(my_total_num_workers_requested, 0,
434         min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers);
435 
436     return { min_workers_request, max_workers_request };
437 }
438 
get_waiting_threads_monitor()439 thread_control_monitor& arena::get_waiting_threads_monitor() {
440     return my_threading_control->get_waiting_threads_monitor();
441 }
442 
enqueue_task(d1::task & t,d1::task_group_context & ctx,thread_data & td)443 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
444     task_group_context_impl::bind_to(ctx, &td);
445     task_accessor::context(t) = &ctx;
446     task_accessor::isolation(t) = no_isolation;
447     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
448     advertise_new_work<work_enqueued>();
449 }
450 
create(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned arena_priority_level,d1::constraints constraints)451 arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) {
452     __TBB_ASSERT(num_slots > 0, NULL);
453     __TBB_ASSERT(num_reserved_slots <= num_slots, NULL);
454     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
455     arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level);
456     a.my_tc_client = control->create_client(a);
457     // We should not publish arena until all fields are initialized
458     control->publish_client(a.my_tc_client, constraints);
459     return a;
460 }
461 
462 } // namespace r1
463 } // namespace detail
464 } // namespace tbb
465 
466 // Enable task_arena.h
467 #include "oneapi/tbb/task_arena.h" // task_arena_base
468 
469 namespace tbb {
470 namespace detail {
471 namespace r1 {
472 
473 #if TBB_USE_ASSERT
assert_arena_priority_valid(tbb::task_arena::priority a_priority)474 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
475     bool is_arena_priority_correct =
476         a_priority == tbb::task_arena::priority::high   ||
477         a_priority == tbb::task_arena::priority::normal ||
478         a_priority == tbb::task_arena::priority::low;
479     __TBB_ASSERT( is_arena_priority_correct,
480                   "Task arena priority should be equal to one of the predefined values." );
481 }
482 #else
483 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
484 #endif
485 
arena_priority_level(tbb::task_arena::priority a_priority)486 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
487     assert_arena_priority_valid( a_priority );
488     return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
489 }
490 
arena_priority(unsigned priority_level)491 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
492     auto priority = tbb::task_arena::priority(
493         (d1::num_priority_levels - priority_level) * d1::priority_stride
494     );
495     assert_arena_priority_valid( priority );
496     return priority;
497 }
498 
499 struct task_arena_impl {
500     static void initialize(d1::task_arena_base&);
501     static void terminate(d1::task_arena_base&);
502     static bool attach(d1::task_arena_base&);
503     static void execute(d1::task_arena_base&, d1::delegate_base&);
504     static void wait(d1::task_arena_base&);
505     static int max_concurrency(const d1::task_arena_base*);
506     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
507 };
508 
initialize(d1::task_arena_base & ta)509 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
510     task_arena_impl::initialize(ta);
511 }
terminate(d1::task_arena_base & ta)512 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
513     task_arena_impl::terminate(ta);
514 }
attach(d1::task_arena_base & ta)515 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
516     return task_arena_impl::attach(ta);
517 }
execute(d1::task_arena_base & ta,d1::delegate_base & d)518 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
519     task_arena_impl::execute(ta, d);
520 }
wait(d1::task_arena_base & ta)521 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
522     task_arena_impl::wait(ta);
523 }
524 
max_concurrency(const d1::task_arena_base * ta)525 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
526     return task_arena_impl::max_concurrency(ta);
527 }
528 
enqueue(d1::task & t,d1::task_arena_base * ta)529 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
530     task_arena_impl::enqueue(t, nullptr, ta);
531 }
532 
enqueue(d1::task & t,d1::task_group_context & ctx,d1::task_arena_base * ta)533 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
534     task_arena_impl::enqueue(t, &ctx, ta);
535 }
536 
initialize(d1::task_arena_base & ta)537 void task_arena_impl::initialize(d1::task_arena_base& ta) {
538     // Enforce global market initialization to properly initialize soft limit
539     (void)governor::get_thread_data();
540     d1::constraints arena_constraints;
541 
542 #if __TBB_ARENA_BINDING
543     arena_constraints = d1::constraints{}
544         .set_core_type(ta.core_type())
545         .set_max_threads_per_core(ta.max_threads_per_core())
546         .set_numa_id(ta.my_numa_id);
547 #endif /*__TBB_ARENA_BINDING*/
548 
549     if (ta.my_max_concurrency < 1) {
550 #if __TBB_ARENA_BINDING
551         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
552 #else /*!__TBB_ARENA_BINDING*/
553         ta.my_max_concurrency = (int)governor::default_num_threads();
554 #endif /*!__TBB_ARENA_BINDING*/
555     }
556 
557     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
558     unsigned priority_level = arena_priority_level(ta.my_priority);
559     threading_control* thr_control = threading_control::register_public_reference();
560     arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints);
561 
562     ta.my_arena.store(&a, std::memory_order_release);
563 #if __TBB_CPUBIND_PRESENT
564     a.my_numa_binding_observer = construct_binding_observer(
565         static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
566 #endif /*__TBB_CPUBIND_PRESENT*/
567 }
568 
terminate(d1::task_arena_base & ta)569 void task_arena_impl::terminate(d1::task_arena_base& ta) {
570     arena* a = ta.my_arena.load(std::memory_order_relaxed);
571     assert_pointer_valid(a);
572     threading_control::unregister_public_reference(/*blocking_terminate=*/false);
573     a->on_thread_leaving(arena::ref_external);
574     ta.my_arena.store(nullptr, std::memory_order_relaxed);
575 }
576 
attach(d1::task_arena_base & ta)577 bool task_arena_impl::attach(d1::task_arena_base& ta) {
578     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
579     thread_data* td = governor::get_thread_data_if_initialized();
580     if( td && td->my_arena ) {
581         arena* a = td->my_arena;
582         // There is an active arena to attach to.
583         // It's still used by s, so won't be destroyed right away.
584         __TBB_ASSERT(a->my_references > 0, nullptr);
585         a->my_references += arena::ref_external;
586         ta.my_num_reserved_slots = a->my_num_reserved_slots;
587         ta.my_priority = arena_priority(a->my_priority_level);
588         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
589         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr);
590         ta.my_arena.store(a, std::memory_order_release);
591         // increases threading_control's ref count for task_arena
592         threading_control::register_public_reference();
593         return true;
594     }
595     return false;
596 }
597 
enqueue(d1::task & t,d1::task_group_context * c,d1::task_arena_base * ta)598 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
599     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
600     assert_pointer_valid(td, "thread_data pointer should not be null");
601     arena* a = ta ?
602               ta->my_arena.load(std::memory_order_relaxed)
603             : td->my_arena
604     ;
605     assert_pointer_valid(a, "arena pointer should not be null");
606     auto* ctx = c ? c : a->my_default_ctx;
607     assert_pointer_valid(ctx, "context pointer should not be null");
608     // Is there a better place for checking the state of ctx?
609      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
610                   "The task will not be executed because its task_group_context is cancelled.");
611      a->enqueue_task(t, *ctx, *td);
612 }
613 
614 class nested_arena_context : no_copy {
615 public:
nested_arena_context(thread_data & td,arena & nested_arena,std::size_t slot_index)616     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
617         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
618     {
619         if (td.my_arena != &nested_arena) {
620             m_orig_arena = td.my_arena;
621             m_orig_slot_index = td.my_arena_index;
622             m_orig_last_observer = td.my_last_observer;
623 
624             td.detach_task_dispatcher();
625             td.attach_arena(nested_arena, slot_index);
626             if (td.my_inbox.is_idle_state(true))
627                 td.my_inbox.set_is_idle(false);
628             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
629             td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
630 
631             // If the calling thread occupies the slots out of external thread reserve we need to notify the
632             // market that this arena requires one worker less.
633             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
634                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
635             }
636 
637             td.my_last_observer = nullptr;
638             // The task_arena::execute method considers each calling thread as an external thread.
639             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
640         }
641 
642         m_task_dispatcher = td.my_task_dispatcher;
643         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
644         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
645         m_task_dispatcher->m_properties.critical_task_allowed = true;
646 
647         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
648         ed_ext.context = td.my_arena->my_default_ctx;
649         ed_ext.original_slot = td.my_arena_index;
650         ed_ext.affinity_slot = d1::no_slot;
651         ed_ext.task_disp = td.my_task_dispatcher;
652         ed_ext.isolation = no_isolation;
653 
654         __TBB_ASSERT(td.my_arena_slot, nullptr);
655         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
656         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
657     }
~nested_arena_context()658     ~nested_arena_context() {
659         thread_data& td = *m_task_dispatcher->m_thread_data;
660         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
661         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
662         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
663         if (m_orig_arena) {
664             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
665             td.my_last_observer = m_orig_last_observer;
666 
667             // Notify the market that this thread releasing a one slot
668             // that can be used by a worker thread.
669             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
670                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
671             }
672 
673             td.leave_task_dispatcher();
674             td.my_arena_slot->release();
675             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
676 
677             td.attach_arena(*m_orig_arena, m_orig_slot_index);
678             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
679             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
680         }
681         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
682     }
683 
684 private:
685     execution_data_ext    m_orig_execute_data_ext{};
686     arena*              m_orig_arena{ nullptr };
687     observer_proxy*     m_orig_last_observer{ nullptr };
688     task_dispatcher*    m_task_dispatcher{ nullptr };
689     unsigned            m_orig_slot_index{};
690     bool                m_orig_fifo_tasks_allowed{};
691     bool                m_orig_critical_task_allowed{};
692 };
693 
694 class delegated_task : public d1::task {
695     d1::delegate_base&  m_delegate;
696     concurrent_monitor& m_monitor;
697     d1::wait_context&   m_wait_ctx;
698     std::atomic<bool>   m_completed;
execute(d1::execution_data & ed)699     d1::task* execute(d1::execution_data& ed) override {
700         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
701         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
702         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
703             "The execute data shall point to the current task dispatcher execute data");
704         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
705 
706         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
707         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
708         try_call([&] {
709             m_delegate();
710         }).on_completion([&] {
711             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
712             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
713         });
714 
715         finalize();
716         return nullptr;
717     }
cancel(d1::execution_data &)718     d1::task* cancel(d1::execution_data&) override {
719         finalize();
720         return nullptr;
721     }
finalize()722     void finalize() {
723         m_wait_ctx.release(); // must precede the wakeup
724         m_monitor.notify([this] (std::uintptr_t ctx) {
725             return ctx == std::uintptr_t(&m_delegate);
726         }); // do not relax, it needs a fence!
727         m_completed.store(true, std::memory_order_release);
728     }
729 public:
delegated_task(d1::delegate_base & d,concurrent_monitor & s,d1::wait_context & wo)730     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
731         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
~delegated_task()732     ~delegated_task() override {
733         // The destructor can be called earlier than the m_monitor is notified
734         // because the waiting thread can be released after m_wait_ctx.release_wait.
735         // To close that race we wait for the m_completed signal.
736         spin_wait_until_eq(m_completed, true);
737     }
738 };
739 
execute(d1::task_arena_base & ta,d1::delegate_base & d)740 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
741     arena* a = ta.my_arena.load(std::memory_order_relaxed);
742     __TBB_ASSERT(a != nullptr, nullptr);
743     thread_data* td = governor::get_thread_data();
744 
745     bool same_arena = td->my_arena == a;
746     std::size_t index1 = td->my_arena_index;
747     if (!same_arena) {
748         index1 = a->occupy_free_slot</*as_worker */false>(*td);
749         if (index1 == arena::out_of_arena) {
750             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
751             d1::wait_context wo(1);
752             d1::task_group_context exec_context(d1::task_group_context::isolated);
753             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
754 
755             delegated_task dt(d, a->my_exit_monitors, wo);
756             a->enqueue_task( dt, exec_context, *td);
757             size_t index2 = arena::out_of_arena;
758             do {
759                 a->my_exit_monitors.prepare_wait(waiter);
760                 if (!wo.continue_execution()) {
761                     a->my_exit_monitors.cancel_wait(waiter);
762                     break;
763                 }
764                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
765                 if (index2 != arena::out_of_arena) {
766                     a->my_exit_monitors.cancel_wait(waiter);
767                     nested_arena_context scope(*td, *a, index2 );
768                     r1::wait(wo, exec_context);
769                     __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
770                     break;
771                 }
772                 a->my_exit_monitors.commit_wait(waiter);
773             } while (wo.continue_execution());
774             if (index2 == arena::out_of_arena) {
775                 // notify a waiting thread even if this thread did not enter arena,
776                 // in case it was woken by a leaving thread but did not need to enter
777                 a->my_exit_monitors.notify_one(); // do not relax!
778             }
779             // process possible exception
780             auto exception = exec_context.my_exception.load(std::memory_order_acquire);
781             if (exception) {
782                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
783                 exception->throw_self();
784             }
785             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
786             return;
787         } // if (index1 == arena::out_of_arena)
788     } // if (!same_arena)
789 
790     context_guard_helper</*report_tasks=*/false> context_guard;
791     context_guard.set_ctx(a->my_default_ctx);
792     nested_arena_context scope(*td, *a, index1);
793 #if _WIN64
794     try {
795 #endif
796         d();
797         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
798 #if _WIN64
799     } catch (...) {
800         context_guard.restore_default();
801         throw;
802     }
803 #endif
804 }
805 
wait(d1::task_arena_base & ta)806 void task_arena_impl::wait(d1::task_arena_base& ta) {
807     arena* a = ta.my_arena.load(std::memory_order_relaxed);
808     __TBB_ASSERT(a != nullptr, nullptr);
809     thread_data* td = governor::get_thread_data();
810     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
811     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
812     if (a->my_max_num_workers != 0) {
813         while (a->num_workers_active() || !a->is_empty()) {
814             yield();
815         }
816     }
817 }
818 
max_concurrency(const d1::task_arena_base * ta)819 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
820     arena* a = nullptr;
821     if( ta ) // for special cases of ta->max_concurrency()
822         a = ta->my_arena.load(std::memory_order_relaxed);
823     else if( thread_data* td = governor::get_thread_data_if_initialized() )
824         a = td->my_arena; // the current arena if any
825 
826     if( a ) { // Get parameters from the arena
827         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
828         int mandatory_worker = 0;
829         if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) {
830             mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0;
831         }
832         return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker;
833     }
834 
835     if (ta && ta->my_max_concurrency == 1) {
836         return 1;
837     }
838 
839 #if __TBB_ARENA_BINDING
840     if (ta) {
841         d1::constraints arena_constraints = d1::constraints{}
842             .set_numa_id(ta->my_numa_id)
843             .set_core_type(ta->core_type())
844             .set_max_threads_per_core(ta->max_threads_per_core());
845         return (int)default_concurrency(arena_constraints);
846     }
847 #endif /*!__TBB_ARENA_BINDING*/
848 
849     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
850     return int(governor::default_num_threads());
851 }
852 
isolate_within_arena(d1::delegate_base & d,std::intptr_t isolation)853 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
854     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
855     thread_data* tls = governor::get_thread_data();
856     assert_pointers_valid(tls, tls->my_task_dispatcher);
857     task_dispatcher* dispatcher = tls->my_task_dispatcher;
858     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
859     try_call([&] {
860         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
861         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
862         // Save the current isolation value and set new one
863         previous_isolation = dispatcher->set_isolation(current_isolation);
864         // Isolation within this callable
865         d();
866     }).on_completion([&] {
867         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
868         dispatcher->set_isolation(previous_isolation);
869     });
870 }
871 
872 } // namespace r1
873 } // namespace detail
874 } // namespace tbb
875