xref: /oneTBB/src/tbb/arena.cpp (revision c4a799df)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "threading_control.h"
20 #include "arena.h"
21 #include "itt_notify.h"
22 #include "semaphore.h"
23 #include "waiters.h"
24 #include "oneapi/tbb/detail/_task.h"
25 #include "oneapi/tbb/info.h"
26 #include "oneapi/tbb/tbb_allocator.h"
27 
28 #include <atomic>
29 #include <cstring>
30 #include <functional>
31 
32 namespace tbb {
33 namespace detail {
34 namespace r1 {
35 
36 #if __TBB_ARENA_BINDING
37 class numa_binding_observer : public tbb::task_scheduler_observer {
38     binding_handler* my_binding_handler;
39 public:
40     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
41         : task_scheduler_observer(*ta)
42         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
43     {}
44 
45     void on_scheduler_entry( bool ) override {
46         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
47     }
48 
49     void on_scheduler_exit( bool ) override {
50         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
51     }
52 
53     ~numa_binding_observer() override{
54         destroy_binding_handler(my_binding_handler);
55     }
56 };
57 
58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
59     numa_binding_observer* binding_observer = nullptr;
60     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
61         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
62         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
63         binding_observer->observe(true);
64     }
65     return binding_observer;
66 }
67 
68 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
69     __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
70     binding_observer->observe(false);
71     binding_observer->~numa_binding_observer();
72     deallocate_memory(binding_observer);
73 }
74 #endif /*!__TBB_ARENA_BINDING*/
75 
76 void arena::on_thread_leaving(unsigned ref_param) {
77     //
78     // Implementation of arena destruction synchronization logic contained various
79     // bugs/flaws at the different stages of its evolution, so below is a detailed
80     // description of the issues taken into consideration in the framework of the
81     // current design.
82     //
83     // In case of using fire-and-forget tasks (scheduled via task::enqueue())
84     // external thread is allowed to leave its arena before all its work is executed,
85     // and market may temporarily revoke all workers from this arena. Since revoked
86     // workers never attempt to reset arena state to EMPTY and cancel its request
87     // to RML for threads, the arena object is destroyed only when both the last
88     // thread is leaving it and arena's state is EMPTY (that is its external thread
89     // left and it does not contain any work).
90     // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
91     // be done here (or anywhere else in the external thread to that matter); doing so
92     // can result either in arena's premature destruction (at least without
93     // additional costly checks in workers) or in unnecessary arena state changes
94     // (and ensuing workers migration).
95     //
96     // A worker that checks for work presence and transitions arena to the EMPTY
97     // state (in snapshot taking procedure arena::out_of_work()) updates
98     // arena::my_pool_state first and only then arena::my_num_workers_requested.
99     // So the check for work absence must be done against the latter field.
100     //
101     // In a time window between decrementing the active threads count and checking
102     // if there is an outstanding request for workers. New worker thread may arrive,
103     // finish remaining work, set arena state to empty, and leave decrementing its
104     // refcount and destroying. Then the current thread will destroy the arena
105     // the second time. To preclude it a local copy of the outstanding request
106     // value can be stored before decrementing active threads count.
107     //
108     // But this technique may cause two other problem. When the stored request is
109     // zero, it is possible that arena still has threads and they can generate new
110     // tasks and thus re-establish non-zero requests. Then all the threads can be
111     // revoked (as described above) leaving this thread the last one, and causing
112     // it to destroy non-empty arena.
113     //
114     // The other problem takes place when the stored request is non-zero. Another
115     // thread may complete the work, set arena state to empty, and leave without
116     // arena destruction before this thread decrements the refcount. This thread
117     // cannot destroy the arena either. Thus the arena may be "orphaned".
118     //
119     // In both cases we cannot dereference arena pointer after the refcount is
120     // decremented, as our arena may already be destroyed.
121     //
122     // If this is the external thread, the market is protected by refcount to it.
123     // In case of workers market's liveness is ensured by the RML connection
124     // rundown protocol, according to which the client (i.e. the market) lives
125     // until RML server notifies it about connection termination, and this
126     // notification is fired only after all workers return into RML.
127     //
128     // Thus if we decremented refcount to zero we ask the market to check arena
129     // state (including the fact if it is alive) under the lock.
130     //
131 
132     __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
133 
134     // When there is no workers someone must free arena, as
135     // without workers, no one calls out_of_work().
136     if (ref_param == ref_external && !my_mandatory_concurrency.test()) {
137         out_of_work();
138     }
139 
140     threading_control* tc = my_threading_control;
141     auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client);
142     // Release our reference to sync with destroy_client
143     unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
144     // do not access `this` it might be destroyed already
145     if (remaining_ref == 0) {
146         if (tc->try_destroy_client(tc_client_snapshot)) {
147             // We are requested to destroy ourself
148             free_arena();
149         }
150     }
151 }
152 
153 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
154     if ( lower >= upper ) return out_of_arena;
155     // Start search for an empty slot from the one we occupied the last time
156     std::size_t index = tls.my_arena_index;
157     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
158     __TBB_ASSERT( index >= lower && index < upper, nullptr);
159     // Find a free slot
160     for ( std::size_t i = index; i < upper; ++i )
161         if (my_slots[i].try_occupy()) return i;
162     for ( std::size_t i = lower; i < index; ++i )
163         if (my_slots[i].try_occupy()) return i;
164     return out_of_arena;
165 }
166 
167 template <bool as_worker>
168 std::size_t arena::occupy_free_slot(thread_data& tls) {
169     // Firstly, external threads try to occupy reserved slots
170     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
171     if ( index == out_of_arena ) {
172         // Secondly, all threads try to occupy all non-reserved slots
173         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
174         // Likely this arena is already saturated
175         if ( index == out_of_arena )
176             return out_of_arena;
177     }
178 
179     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
180     return index;
181 }
182 
183 std::uintptr_t arena::calculate_stealing_threshold() {
184     stack_anchor_type anchor;
185     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size());
186 }
187 
188 void arena::process(thread_data& tls) {
189     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
190     __TBB_ASSERT( is_alive(my_guard), nullptr);
191     __TBB_ASSERT( my_num_slots >= 1, nullptr);
192 
193     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
194     if (index == out_of_arena) {
195         on_thread_leaving(ref_worker);
196         return;
197     }
198 
199     my_tc_client.get_pm_client()->register_thread();
200 
201     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
202     tls.attach_arena(*this, index);
203     // worker thread enters the dispatch loop to look for a work
204     tls.my_inbox.set_is_idle(true);
205     if (tls.my_arena_slot->is_task_pool_published()) {
206         tls.my_inbox.set_is_idle(false);
207     }
208 
209     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
210     tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
211     __TBB_ASSERT(task_disp.can_steal(), nullptr);
212 
213     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
214     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
215 
216     // Waiting on special object tied to this arena
217     outermost_worker_waiter waiter(*this);
218     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
219     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
220     // attached to it.
221     tls.my_inbox.set_is_idle(true);
222 
223     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
224     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
225     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
226 
227     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
228     tls.my_last_observer = nullptr;
229 
230     tls.leave_task_dispatcher();
231 
232     // Arena slot detach (arena may be used in market::process)
233     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
234     tls.my_arena_slot->release();
235     tls.my_arena_slot = nullptr;
236     tls.my_inbox.detach();
237     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
238     __TBB_ASSERT(is_alive(my_guard), nullptr);
239 
240     my_tc_client.get_pm_client()->unregister_thread();
241 
242     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
243     // that arena may be temporarily left unpopulated by threads. See comments in
244     // arena::on_thread_leaving() for more details.
245     on_thread_leaving(ref_worker);
246     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
247 }
248 
249 arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) {
250     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
251     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
252     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
253     my_threading_control = control;
254     my_limit = 1;
255     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
256     my_num_slots = num_arena_slots(num_slots, num_reserved_slots);
257     my_num_reserved_slots = num_reserved_slots;
258     my_max_num_workers = num_slots-num_reserved_slots;
259     my_priority_level = priority_level;
260     my_references = ref_external; // accounts for the external thread
261     my_observers.my_arena = this;
262     my_co_cache.init(4 * num_slots);
263     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
264     // Initialize the default context. It should be allocated before task_dispatch construction.
265     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
266         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
267     // Construct slots. Mark internal synchronization elements for the tools.
268     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
269     for( unsigned i = 0; i < my_num_slots; ++i ) {
270         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
271         __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
272         __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
273         mailbox(i).construct();
274         my_slots[i].init_task_streams(i);
275         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
276         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
277     }
278     my_fifo_task_stream.initialize(my_num_slots);
279     my_resume_task_stream.initialize(my_num_slots);
280 #if __TBB_PREVIEW_CRITICAL_TASKS
281     my_critical_task_stream.initialize(my_num_slots);
282 #endif
283     my_mandatory_requests = 0;
284 }
285 
286 arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
287                               unsigned priority_level)
288 {
289     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
290     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
291     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
292     std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots));
293     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
294     // Zero all slots to indicate that they are empty
295     std::memset( storage, 0, n );
296 
297     return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
298         arena(control, num_slots, num_reserved_slots, priority_level);
299 }
300 
301 void arena::free_arena () {
302     __TBB_ASSERT( is_alive(my_guard), nullptr);
303     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
304     __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
305     __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" );
306 #if __TBB_ARENA_BINDING
307     if (my_numa_binding_observer != nullptr) {
308         destroy_binding_observer(my_numa_binding_observer);
309         my_numa_binding_observer = nullptr;
310     }
311 #endif /*__TBB_ARENA_BINDING*/
312     poison_value( my_guard );
313     for ( unsigned i = 0; i < my_num_slots; ++i ) {
314         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
315         // TODO: understand the assertion and modify
316         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
317         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
318         my_slots[i].free_task_pool();
319         mailbox(i).drain();
320         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
321     }
322     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
323     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
324     // Cleanup coroutines/schedulers cache
325     my_co_cache.cleanup();
326     my_default_ctx->~task_group_context();
327     cache_aligned_deallocate(my_default_ctx);
328 #if __TBB_PREVIEW_CRITICAL_TASKS
329     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
330 #endif
331     // Clear enfources synchronization with observe(false)
332     my_observers.clear();
333 
334     void* storage  = &mailbox(my_num_slots-1);
335     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
336     this->~arena();
337 #if TBB_USE_ASSERT > 1
338     std::memset( storage, 0, allocation_size(my_num_slots) );
339 #endif /* TBB_USE_ASSERT */
340     cache_aligned_deallocate( storage );
341 }
342 
343 bool arena::has_enqueued_tasks() {
344     return !my_fifo_task_stream.empty();
345 }
346 
347 void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) {
348     my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta);
349 
350     if (wakeup_threads) {
351         // Notify all sleeping threads that work has appeared in the arena.
352         get_waiting_threads_monitor().notify([&] (market_context context) {
353             return this == context.my_arena_addr;
354         });
355     }
356 }
357 
358 bool arena::has_tasks() {
359     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
360     std::size_t n = my_limit.load(std::memory_order_acquire);
361     bool tasks_are_available = false;
362     for (std::size_t k = 0; k < n && !tasks_are_available; ++k) {
363         tasks_are_available = !my_slots[k].is_empty();
364     }
365     tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty();
366 #if __TBB_PREVIEW_CRITICAL_TASKS
367     tasks_are_available = tasks_are_available || !my_critical_task_stream.empty();
368 #endif
369     return tasks_are_available;
370 }
371 
372 void arena::out_of_work() {
373     // We should try unset my_pool_state first due to keep arena invariants in consistent state
374     // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant
375     bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); });
376     bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); });
377 
378     if (disable_mandatory || release_workers) {
379         int mandatory_delta = disable_mandatory ? -1 : 0;
380         int workers_delta = release_workers ? -(int)my_max_num_workers : 0;
381 
382         if (disable_mandatory && is_arena_workerless()) {
383             // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now
384             workers_delta = -1;
385         }
386         request_workers(mandatory_delta, workers_delta);
387     }
388 }
389 
390 void arena::set_top_priority(bool is_top_priority) {
391     my_is_top_priority.store(is_top_priority, std::memory_order_relaxed);
392 }
393 
394 bool arena::is_top_priority() const {
395     return my_is_top_priority.load(std::memory_order_relaxed);
396 }
397 
398 bool arena::try_join() {
399     if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) {
400         my_references += arena::ref_worker;
401         return true;
402     }
403     return false;
404 }
405 
406 void arena::set_allotment(unsigned allotment) {
407     if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) {
408         my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
409     }
410 }
411 
412 int arena::update_concurrency(unsigned allotment) {
413     set_allotment(allotment);
414     return allotment - static_cast<int>(my_num_workers_allotted.load(std::memory_order_relaxed));
415 }
416 
417 std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) {
418     __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
419 
420     int min_workers_request = 0;
421     int max_workers_request = 0;
422 
423     // Calculate min request
424     my_mandatory_requests += mandatory_delta;
425     min_workers_request = my_mandatory_requests > 0 ? 1 : 0;
426 
427     // Calculate max request
428     my_total_num_workers_requested += workers_delta;
429     // Clamp worker request into interval [0, my_max_num_workers]
430     max_workers_request = clamp(my_total_num_workers_requested, 0,
431         min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers);
432 
433     return { min_workers_request, max_workers_request };
434 }
435 
436 thread_control_monitor& arena::get_waiting_threads_monitor() {
437     return my_threading_control->get_waiting_threads_monitor();
438 }
439 
440 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
441     task_group_context_impl::bind_to(ctx, &td);
442     task_accessor::context(t) = &ctx;
443     task_accessor::isolation(t) = no_isolation;
444     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
445     advertise_new_work<work_enqueued>();
446 }
447 
448 arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) {
449     __TBB_ASSERT(num_slots > 0, NULL);
450     __TBB_ASSERT(num_reserved_slots <= num_slots, NULL);
451     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
452     arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level);
453     a.my_tc_client = control->create_client(a);
454     // We should not publish arena until all fields are initialized
455     control->publish_client(a.my_tc_client, constraints);
456     return a;
457 }
458 
459 } // namespace r1
460 } // namespace detail
461 } // namespace tbb
462 
463 // Enable task_arena.h
464 #include "oneapi/tbb/task_arena.h" // task_arena_base
465 
466 namespace tbb {
467 namespace detail {
468 namespace r1 {
469 
470 #if TBB_USE_ASSERT
471 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
472     bool is_arena_priority_correct =
473         a_priority == tbb::task_arena::priority::high   ||
474         a_priority == tbb::task_arena::priority::normal ||
475         a_priority == tbb::task_arena::priority::low;
476     __TBB_ASSERT( is_arena_priority_correct,
477                   "Task arena priority should be equal to one of the predefined values." );
478 }
479 #else
480 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
481 #endif
482 
483 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
484     assert_arena_priority_valid( a_priority );
485     return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
486 }
487 
488 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
489     auto priority = tbb::task_arena::priority(
490         (d1::num_priority_levels - priority_level) * d1::priority_stride
491     );
492     assert_arena_priority_valid( priority );
493     return priority;
494 }
495 
496 struct task_arena_impl {
497     static void initialize(d1::task_arena_base&);
498     static void terminate(d1::task_arena_base&);
499     static bool attach(d1::task_arena_base&);
500     static void execute(d1::task_arena_base&, d1::delegate_base&);
501     static void wait(d1::task_arena_base&);
502     static int max_concurrency(const d1::task_arena_base*);
503     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
504 };
505 
506 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
507     task_arena_impl::initialize(ta);
508 }
509 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
510     task_arena_impl::terminate(ta);
511 }
512 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
513     return task_arena_impl::attach(ta);
514 }
515 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
516     task_arena_impl::execute(ta, d);
517 }
518 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
519     task_arena_impl::wait(ta);
520 }
521 
522 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
523     return task_arena_impl::max_concurrency(ta);
524 }
525 
526 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
527     task_arena_impl::enqueue(t, nullptr, ta);
528 }
529 
530 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
531     task_arena_impl::enqueue(t, &ctx, ta);
532 }
533 
534 void task_arena_impl::initialize(d1::task_arena_base& ta) {
535     // Enforce global market initialization to properly initialize soft limit
536     (void)governor::get_thread_data();
537     d1::constraints arena_constraints;
538 
539 #if __TBB_ARENA_BINDING
540     arena_constraints = d1::constraints{}
541         .set_core_type(ta.core_type())
542         .set_max_threads_per_core(ta.max_threads_per_core())
543         .set_numa_id(ta.my_numa_id);
544 #endif /*__TBB_ARENA_BINDING*/
545 
546     if (ta.my_max_concurrency < 1) {
547 #if __TBB_ARENA_BINDING
548         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
549 #else /*!__TBB_ARENA_BINDING*/
550         ta.my_max_concurrency = (int)governor::default_num_threads();
551 #endif /*!__TBB_ARENA_BINDING*/
552     }
553 
554     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
555     unsigned priority_level = arena_priority_level(ta.my_priority);
556     threading_control* thr_control = threading_control::register_public_reference();
557     arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints);
558 
559     ta.my_arena.store(&a, std::memory_order_release);
560 #if __TBB_CPUBIND_PRESENT
561     a.my_numa_binding_observer = construct_binding_observer(
562         static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
563 #endif /*__TBB_CPUBIND_PRESENT*/
564 }
565 
566 void task_arena_impl::terminate(d1::task_arena_base& ta) {
567     arena* a = ta.my_arena.load(std::memory_order_relaxed);
568     assert_pointer_valid(a);
569     threading_control::unregister_public_reference(/*blocking_terminate=*/false);
570     a->on_thread_leaving(arena::ref_external);
571     ta.my_arena.store(nullptr, std::memory_order_relaxed);
572 }
573 
574 bool task_arena_impl::attach(d1::task_arena_base& ta) {
575     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
576     thread_data* td = governor::get_thread_data_if_initialized();
577     if( td && td->my_arena ) {
578         arena* a = td->my_arena;
579         // There is an active arena to attach to.
580         // It's still used by s, so won't be destroyed right away.
581         __TBB_ASSERT(a->my_references > 0, nullptr);
582         a->my_references += arena::ref_external;
583         ta.my_num_reserved_slots = a->my_num_reserved_slots;
584         ta.my_priority = arena_priority(a->my_priority_level);
585         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
586         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr);
587         ta.my_arena.store(a, std::memory_order_release);
588         // increases threading_control's ref count for task_arena
589         threading_control::register_public_reference();
590         return true;
591     }
592     return false;
593 }
594 
595 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
596     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
597     assert_pointer_valid(td, "thread_data pointer should not be null");
598     arena* a = ta ?
599               ta->my_arena.load(std::memory_order_relaxed)
600             : td->my_arena
601     ;
602     assert_pointer_valid(a, "arena pointer should not be null");
603     auto* ctx = c ? c : a->my_default_ctx;
604     assert_pointer_valid(ctx, "context pointer should not be null");
605     // Is there a better place for checking the state of ctx?
606      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
607                   "The task will not be executed because its task_group_context is cancelled.");
608      a->enqueue_task(t, *ctx, *td);
609 }
610 
611 class nested_arena_context : no_copy {
612 public:
613     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
614         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
615     {
616         if (td.my_arena != &nested_arena) {
617             m_orig_arena = td.my_arena;
618             m_orig_slot_index = td.my_arena_index;
619             m_orig_last_observer = td.my_last_observer;
620 
621             td.detach_task_dispatcher();
622             td.attach_arena(nested_arena, slot_index);
623             if (td.my_inbox.is_idle_state(true))
624                 td.my_inbox.set_is_idle(false);
625             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
626             td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
627 
628             // If the calling thread occupies the slots out of external thread reserve we need to notify the
629             // market that this arena requires one worker less.
630             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
631                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
632             }
633 
634             td.my_last_observer = nullptr;
635             // The task_arena::execute method considers each calling thread as an external thread.
636             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
637         }
638 
639         m_task_dispatcher = td.my_task_dispatcher;
640         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
641         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
642         m_task_dispatcher->m_properties.critical_task_allowed = true;
643 
644         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
645         ed_ext.context = td.my_arena->my_default_ctx;
646         ed_ext.original_slot = td.my_arena_index;
647         ed_ext.affinity_slot = d1::no_slot;
648         ed_ext.task_disp = td.my_task_dispatcher;
649         ed_ext.isolation = no_isolation;
650 
651         __TBB_ASSERT(td.my_arena_slot, nullptr);
652         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
653         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
654     }
655     ~nested_arena_context() {
656         thread_data& td = *m_task_dispatcher->m_thread_data;
657         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
658         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
659         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
660         if (m_orig_arena) {
661             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
662             td.my_last_observer = m_orig_last_observer;
663 
664             // Notify the market that this thread releasing a one slot
665             // that can be used by a worker thread.
666             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
667                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
668             }
669 
670             td.leave_task_dispatcher();
671             td.my_arena_slot->release();
672             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
673 
674             td.attach_arena(*m_orig_arena, m_orig_slot_index);
675             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
676             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
677         }
678         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
679     }
680 
681 private:
682     execution_data_ext    m_orig_execute_data_ext{};
683     arena*              m_orig_arena{ nullptr };
684     observer_proxy*     m_orig_last_observer{ nullptr };
685     task_dispatcher*    m_task_dispatcher{ nullptr };
686     unsigned            m_orig_slot_index{};
687     bool                m_orig_fifo_tasks_allowed{};
688     bool                m_orig_critical_task_allowed{};
689 };
690 
691 class delegated_task : public d1::task {
692     d1::delegate_base&  m_delegate;
693     concurrent_monitor& m_monitor;
694     d1::wait_context&   m_wait_ctx;
695     std::atomic<bool>   m_completed;
696     d1::task* execute(d1::execution_data& ed) override {
697         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
698         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
699         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
700             "The execute data shall point to the current task dispatcher execute data");
701         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
702 
703         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
704         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
705         try_call([&] {
706             m_delegate();
707         }).on_completion([&] {
708             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
709             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
710         });
711 
712         finalize();
713         return nullptr;
714     }
715     d1::task* cancel(d1::execution_data&) override {
716         finalize();
717         return nullptr;
718     }
719     void finalize() {
720         m_wait_ctx.release(); // must precede the wakeup
721         m_monitor.notify([this] (std::uintptr_t ctx) {
722             return ctx == std::uintptr_t(&m_delegate);
723         }); // do not relax, it needs a fence!
724         m_completed.store(true, std::memory_order_release);
725     }
726 public:
727     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
728         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
729     ~delegated_task() override {
730         // The destructor can be called earlier than the m_monitor is notified
731         // because the waiting thread can be released after m_wait_ctx.release_wait.
732         // To close that race we wait for the m_completed signal.
733         spin_wait_until_eq(m_completed, true);
734     }
735 };
736 
737 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
738     arena* a = ta.my_arena.load(std::memory_order_relaxed);
739     __TBB_ASSERT(a != nullptr, nullptr);
740     thread_data* td = governor::get_thread_data();
741 
742     bool same_arena = td->my_arena == a;
743     std::size_t index1 = td->my_arena_index;
744     if (!same_arena) {
745         index1 = a->occupy_free_slot</*as_worker */false>(*td);
746         if (index1 == arena::out_of_arena) {
747             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
748             d1::wait_context wo(1);
749             d1::task_group_context exec_context(d1::task_group_context::isolated);
750             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
751 
752             delegated_task dt(d, a->my_exit_monitors, wo);
753             a->enqueue_task( dt, exec_context, *td);
754             size_t index2 = arena::out_of_arena;
755             do {
756                 a->my_exit_monitors.prepare_wait(waiter);
757                 if (!wo.continue_execution()) {
758                     a->my_exit_monitors.cancel_wait(waiter);
759                     break;
760                 }
761                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
762                 if (index2 != arena::out_of_arena) {
763                     a->my_exit_monitors.cancel_wait(waiter);
764                     nested_arena_context scope(*td, *a, index2 );
765                     r1::wait(wo, exec_context);
766                     __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
767                     break;
768                 }
769                 a->my_exit_monitors.commit_wait(waiter);
770             } while (wo.continue_execution());
771             if (index2 == arena::out_of_arena) {
772                 // notify a waiting thread even if this thread did not enter arena,
773                 // in case it was woken by a leaving thread but did not need to enter
774                 a->my_exit_monitors.notify_one(); // do not relax!
775             }
776             // process possible exception
777             auto exception = exec_context.my_exception.load(std::memory_order_acquire);
778             if (exception) {
779                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
780                 exception->throw_self();
781             }
782             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
783             return;
784         } // if (index1 == arena::out_of_arena)
785     } // if (!same_arena)
786 
787     context_guard_helper</*report_tasks=*/false> context_guard;
788     context_guard.set_ctx(a->my_default_ctx);
789     nested_arena_context scope(*td, *a, index1);
790 #if _WIN64
791     try {
792 #endif
793         d();
794         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
795 #if _WIN64
796     } catch (...) {
797         context_guard.restore_default();
798         throw;
799     }
800 #endif
801 }
802 
803 void task_arena_impl::wait(d1::task_arena_base& ta) {
804     arena* a = ta.my_arena.load(std::memory_order_relaxed);
805     __TBB_ASSERT(a != nullptr, nullptr);
806     thread_data* td = governor::get_thread_data();
807     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
808     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
809     if (a->my_max_num_workers != 0) {
810         while (a->num_workers_active() || !a->is_empty()) {
811             yield();
812         }
813     }
814 }
815 
816 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
817     arena* a = nullptr;
818     if( ta ) // for special cases of ta->max_concurrency()
819         a = ta->my_arena.load(std::memory_order_relaxed);
820     else if( thread_data* td = governor::get_thread_data_if_initialized() )
821         a = td->my_arena; // the current arena if any
822 
823     if( a ) { // Get parameters from the arena
824         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
825         int mandatory_worker = 0;
826         if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) {
827             mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0;
828         }
829         return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker;
830     }
831 
832     if (ta && ta->my_max_concurrency == 1) {
833         return 1;
834     }
835 
836 #if __TBB_ARENA_BINDING
837     if (ta) {
838         d1::constraints arena_constraints = d1::constraints{}
839             .set_numa_id(ta->my_numa_id)
840             .set_core_type(ta->core_type())
841             .set_max_threads_per_core(ta->max_threads_per_core());
842         return (int)default_concurrency(arena_constraints);
843     }
844 #endif /*!__TBB_ARENA_BINDING*/
845 
846     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
847     return int(governor::default_num_threads());
848 }
849 
850 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
851     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
852     thread_data* tls = governor::get_thread_data();
853     assert_pointers_valid(tls, tls->my_task_dispatcher);
854     task_dispatcher* dispatcher = tls->my_task_dispatcher;
855     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
856     try_call([&] {
857         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
858         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
859         // Save the current isolation value and set new one
860         previous_isolation = dispatcher->set_isolation(current_isolation);
861         // Isolation within this callable
862         d();
863     }).on_completion([&] {
864         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
865         dispatcher->set_isolation(previous_isolation);
866     });
867 }
868 
869 } // namespace r1
870 } // namespace detail
871 } // namespace tbb
872