xref: /oneTBB/src/tbb/arena.cpp (revision f8f7f738)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/info.h"
25 #include "oneapi/tbb/tbb_allocator.h"
26 
27 #include <atomic>
28 #include <cstring>
29 #include <functional>
30 
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34 
35 #if __TBB_ARENA_BINDING
36 class numa_binding_observer : public tbb::task_scheduler_observer {
37     binding_handler* my_binding_handler;
38 public:
39     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
40         : task_scheduler_observer(*ta)
41         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
42     {}
43 
44     void on_scheduler_entry( bool ) override {
45         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
46     }
47 
48     void on_scheduler_exit( bool ) override {
49         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
50     }
51 
52     ~numa_binding_observer() override{
53         destroy_binding_handler(my_binding_handler);
54     }
55 };
56 
57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
58     numa_binding_observer* binding_observer = nullptr;
59     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
60         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
61         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
62         binding_observer->observe(true);
63     }
64     return binding_observer;
65 }
66 
67 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
68     __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
69     binding_observer->observe(false);
70     binding_observer->~numa_binding_observer();
71     deallocate_memory(binding_observer);
72 }
73 #endif /*!__TBB_ARENA_BINDING*/
74 
75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
76     if ( lower >= upper ) return out_of_arena;
77     // Start search for an empty slot from the one we occupied the last time
78     std::size_t index = tls.my_arena_index;
79     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
80     __TBB_ASSERT( index >= lower && index < upper, nullptr);
81     // Find a free slot
82     for ( std::size_t i = index; i < upper; ++i )
83         if (my_slots[i].try_occupy()) return i;
84     for ( std::size_t i = lower; i < index; ++i )
85         if (my_slots[i].try_occupy()) return i;
86     return out_of_arena;
87 }
88 
89 template <bool as_worker>
90 std::size_t arena::occupy_free_slot(thread_data& tls) {
91     // Firstly, external threads try to occupy reserved slots
92     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
93     if ( index == out_of_arena ) {
94         // Secondly, all threads try to occupy all non-reserved slots
95         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
96         // Likely this arena is already saturated
97         if ( index == out_of_arena )
98             return out_of_arena;
99     }
100 
101     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
102     return index;
103 }
104 
105 std::uintptr_t arena::calculate_stealing_threshold() {
106     stack_anchor_type anchor;
107     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
108 }
109 
110 void arena::process(thread_data& tls) {
111     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
112     __TBB_ASSERT( is_alive(my_guard), nullptr);
113     __TBB_ASSERT( my_num_slots >= 1, nullptr);
114 
115     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
116     if (index == out_of_arena) {
117         on_thread_leaving<ref_worker>();
118         return;
119     }
120     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
121     tls.attach_arena(*this, index);
122     // worker thread enters the dispatch loop to look for a work
123     tls.my_inbox.set_is_idle(true);
124     if (tls.my_arena_slot->is_task_pool_published()) {
125         tls.my_inbox.set_is_idle(false);
126     }
127 
128     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
129     tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
130     __TBB_ASSERT(task_disp.can_steal(), nullptr);
131 
132     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
133     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
134 
135     // Waiting on special object tied to this arena
136     outermost_worker_waiter waiter(*this);
137     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
138     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
139     // attached to it.
140     tls.my_inbox.set_is_idle(true);
141 
142     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
143     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
144     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
145 
146     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
147     tls.my_last_observer = nullptr;
148 
149     tls.leave_task_dispatcher();
150 
151     // Arena slot detach (arena may be used in market::process)
152     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
153     tls.my_arena_slot->release();
154     tls.my_arena_slot = nullptr;
155     tls.my_inbox.detach();
156     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
157     __TBB_ASSERT(is_alive(my_guard), nullptr);
158 
159     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
160     // that arena may be temporarily left unpopulated by threads. See comments in
161     // arena::on_thread_leaving() for more details.
162     on_thread_leaving<ref_worker>();
163     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
164 }
165 
166 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
167 {
168     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
169     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
170     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
171     my_market = &m;
172     my_limit = 1;
173     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
174     my_num_slots = num_arena_slots(num_slots, num_reserved_slots);
175     my_num_reserved_slots = num_reserved_slots;
176     my_max_num_workers = num_slots-num_reserved_slots;
177     my_priority_level = priority_level;
178     my_references = ref_external; // accounts for the external thread
179     my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
180     my_observers.my_arena = this;
181     my_co_cache.init(4 * num_slots);
182     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
183     // Initialize the default context. It should be allocated before task_dispatch construction.
184     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
185         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
186     // Construct slots. Mark internal synchronization elements for the tools.
187     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
188     for( unsigned i = 0; i < my_num_slots; ++i ) {
189         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
190         __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
191         __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
192         mailbox(i).construct();
193         my_slots[i].init_task_streams(i);
194         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
195         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
196     }
197     my_fifo_task_stream.initialize(my_num_slots);
198     my_resume_task_stream.initialize(my_num_slots);
199 #if __TBB_PREVIEW_CRITICAL_TASKS
200     my_critical_task_stream.initialize(my_num_slots);
201 #endif
202 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
203     my_local_concurrency_requests = 0;
204     my_local_concurrency_flag.clear();
205     my_global_concurrency_mode.store(false, std::memory_order_relaxed);
206 #endif
207 }
208 
209 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
210                               unsigned priority_level )
211 {
212     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
213     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
214     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
215     std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots));
216     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
217     // Zero all slots to indicate that they are empty
218     std::memset( storage, 0, n );
219     return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
220         arena(m, num_slots, num_reserved_slots, priority_level);
221 }
222 
223 void arena::free_arena () {
224     __TBB_ASSERT( is_alive(my_guard), nullptr);
225     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
226     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
227     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
228                   "Inconsistent state of a dying arena" );
229 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
230     __TBB_ASSERT( !my_global_concurrency_mode, nullptr);
231 #endif
232 #if __TBB_ARENA_BINDING
233     if (my_numa_binding_observer != nullptr) {
234         destroy_binding_observer(my_numa_binding_observer);
235         my_numa_binding_observer = nullptr;
236     }
237 #endif /*__TBB_ARENA_BINDING*/
238     poison_value( my_guard );
239     for ( unsigned i = 0; i < my_num_slots; ++i ) {
240         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
241         // TODO: understand the assertion and modify
242         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
243         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
244         my_slots[i].free_task_pool();
245         mailbox(i).drain();
246         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
247     }
248     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
249     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
250     // Cleanup coroutines/schedulers cache
251     my_co_cache.cleanup();
252     my_default_ctx->~task_group_context();
253     cache_aligned_deallocate(my_default_ctx);
254 #if __TBB_PREVIEW_CRITICAL_TASKS
255     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
256 #endif
257     // remove an internal reference
258     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
259 
260     // Clear enfources synchronization with observe(false)
261     my_observers.clear();
262 
263     void* storage  = &mailbox(my_num_slots-1);
264     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
265     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, nullptr);
266     this->~arena();
267 #if TBB_USE_ASSERT > 1
268     std::memset( storage, 0, allocation_size(my_num_slots) );
269 #endif /* TBB_USE_ASSERT */
270     cache_aligned_deallocate( storage );
271 }
272 
273 bool arena::has_enqueued_tasks() {
274     return !my_fifo_task_stream.empty();
275 }
276 
277 bool arena::is_out_of_work() {
278 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
279     if (my_local_concurrency_flag.try_clear_if([this] {
280         return !has_enqueued_tasks();
281     })) {
282         my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true);
283     }
284 #endif
285 
286     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
287     switch (my_pool_state.load(std::memory_order_acquire)) {
288     case SNAPSHOT_EMPTY:
289         return true;
290     case SNAPSHOT_FULL: {
291         // Use unique id for "busy" in order to avoid ABA problems.
292         const pool_state_t busy = pool_state_t(&busy);
293         // Helper for CAS execution
294         pool_state_t expected_state;
295 
296         // Request permission to take snapshot
297         expected_state = SNAPSHOT_FULL;
298         if (my_pool_state.compare_exchange_strong(expected_state, busy)) {
299             // Got permission. Take the snapshot.
300             // NOTE: This is not a lock, as the state can be set to FULL at
301             //       any moment by a thread that spawns/enqueues new task.
302             std::size_t n = my_limit.load(std::memory_order_acquire);
303             // Make local copies of volatile parameters. Their change during
304             // snapshot taking procedure invalidates the attempt, and returns
305             // this thread into the dispatch loop.
306             std::size_t k;
307             for (k = 0; k < n; ++k) {
308                 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
309                     my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed))
310                 {
311                     // k-th primary task pool is nonempty and does contain tasks.
312                     break;
313                 }
314                 if (my_pool_state.load(std::memory_order_acquire) != busy)
315                     return false; // the work was published
316             }
317             bool work_absent = k == n;
318             // Test and test-and-set.
319             if (my_pool_state.load(std::memory_order_acquire) == busy) {
320                 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty();
321 #if __TBB_PREVIEW_CRITICAL_TASKS
322                 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
323 #endif
324                 work_absent = work_absent && no_stream_tasks;
325                 if (work_absent) {
326                     // save current demand value before setting SNAPSHOT_EMPTY,
327                     // to avoid race with advertise_new_work.
328                     int current_demand = (int)my_max_num_workers;
329                     expected_state = busy;
330                     if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
331                         // This thread transitioned pool to empty state, and thus is
332                         // responsible for telling the market that there is no work to do.
333                         my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false);
334                         return true;
335                     }
336                     return false;
337                 }
338                 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
339                 expected_state = busy;
340                 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL);
341             }
342         }
343         return false;
344     }
345     default:
346         // Another thread is taking a snapshot.
347         return false;
348     }
349 }
350 
351 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
352     task_group_context_impl::bind_to(ctx, &td);
353     task_accessor::context(t) = &ctx;
354     task_accessor::isolation(t) = no_isolation;
355     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
356     advertise_new_work<work_enqueued>();
357 }
358 
359 } // namespace r1
360 } // namespace detail
361 } // namespace tbb
362 
363 // Enable task_arena.h
364 #include "oneapi/tbb/task_arena.h" // task_arena_base
365 
366 namespace tbb {
367 namespace detail {
368 namespace r1 {
369 
370 #if TBB_USE_ASSERT
371 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
372     bool is_arena_priority_correct =
373         a_priority == tbb::task_arena::priority::high   ||
374         a_priority == tbb::task_arena::priority::normal ||
375         a_priority == tbb::task_arena::priority::low;
376     __TBB_ASSERT( is_arena_priority_correct,
377                   "Task arena priority should be equal to one of the predefined values." );
378 }
379 #else
380 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
381 #endif
382 
383 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
384     assert_arena_priority_valid( a_priority );
385     return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
386 }
387 
388 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
389     auto priority = tbb::task_arena::priority(
390         (market::num_priority_levels - priority_level) * d1::priority_stride
391     );
392     assert_arena_priority_valid( priority );
393     return priority;
394 }
395 
396 struct task_arena_impl {
397     static void initialize(d1::task_arena_base&);
398     static void terminate(d1::task_arena_base&);
399     static bool attach(d1::task_arena_base&);
400     static void execute(d1::task_arena_base&, d1::delegate_base&);
401     static void wait(d1::task_arena_base&);
402     static int max_concurrency(const d1::task_arena_base*);
403     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
404 };
405 
406 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
407     task_arena_impl::initialize(ta);
408 }
409 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
410     task_arena_impl::terminate(ta);
411 }
412 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
413     return task_arena_impl::attach(ta);
414 }
415 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
416     task_arena_impl::execute(ta, d);
417 }
418 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
419     task_arena_impl::wait(ta);
420 }
421 
422 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
423     return task_arena_impl::max_concurrency(ta);
424 }
425 
426 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
427     task_arena_impl::enqueue(t, nullptr, ta);
428 }
429 
430 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
431     task_arena_impl::enqueue(t, &ctx, ta);
432 }
433 
434 void task_arena_impl::initialize(d1::task_arena_base& ta) {
435     // Enforce global market initialization to properly initialize soft limit
436     (void)governor::get_thread_data();
437     if (ta.my_max_concurrency < 1) {
438 #if __TBB_ARENA_BINDING
439         d1::constraints arena_constraints = d1::constraints{}
440             .set_core_type(ta.core_type())
441             .set_max_threads_per_core(ta.max_threads_per_core())
442             .set_numa_id(ta.my_numa_id);
443         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
444 #else /*!__TBB_ARENA_BINDING*/
445         ta.my_max_concurrency = (int)governor::default_num_threads();
446 #endif /*!__TBB_ARENA_BINDING*/
447     }
448 
449     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
450     unsigned priority_level = arena_priority_level(ta.my_priority);
451     arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
452     ta.my_arena.store(a, std::memory_order_release);
453     // add an internal market reference; a public reference was added in create_arena
454     market::global_market( /*is_public=*/false);
455 #if __TBB_ARENA_BINDING
456     a->my_numa_binding_observer = construct_binding_observer(
457         static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
458 #endif /*__TBB_ARENA_BINDING*/
459 }
460 
461 void task_arena_impl::terminate(d1::task_arena_base& ta) {
462     arena* a = ta.my_arena.load(std::memory_order_relaxed);
463     assert_pointer_valid(a);
464     a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
465     a->on_thread_leaving<arena::ref_external>();
466     ta.my_arena.store(nullptr, std::memory_order_relaxed);
467 }
468 
469 bool task_arena_impl::attach(d1::task_arena_base& ta) {
470     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
471     thread_data* td = governor::get_thread_data_if_initialized();
472     if( td && td->my_arena ) {
473         arena* a = td->my_arena;
474         // There is an active arena to attach to.
475         // It's still used by s, so won't be destroyed right away.
476         __TBB_ASSERT(a->my_references > 0, nullptr);
477         a->my_references += arena::ref_external;
478         ta.my_num_reserved_slots = a->my_num_reserved_slots;
479         ta.my_priority = arena_priority(a->my_priority_level);
480         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
481         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr);
482         ta.my_arena.store(a, std::memory_order_release);
483         // increases market's ref count for task_arena
484         market::global_market( /*is_public=*/true );
485         return true;
486     }
487     return false;
488 }
489 
490 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
491     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
492     assert_pointer_valid(td, "thread_data pointer should not be null");
493     arena* a = ta ?
494               ta->my_arena.load(std::memory_order_relaxed)
495             : td->my_arena
496     ;
497     assert_pointer_valid(a, "arena pointer should not be null");
498     auto* ctx = c ? c : a->my_default_ctx;
499     assert_pointer_valid(ctx, "context pointer should not be null");
500     // Is there a better place for checking the state of ctx?
501      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
502                   "The task will not be executed because its task_group_context is cancelled.");
503      a->enqueue_task(t, *ctx, *td);
504 }
505 
506 class nested_arena_context : no_copy {
507 public:
508     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
509         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
510     {
511         if (td.my_arena != &nested_arena) {
512             m_orig_arena = td.my_arena;
513             m_orig_slot_index = td.my_arena_index;
514             m_orig_last_observer = td.my_last_observer;
515 
516             td.detach_task_dispatcher();
517             td.attach_arena(nested_arena, slot_index);
518             if (td.my_inbox.is_idle_state(true))
519                 td.my_inbox.set_is_idle(false);
520             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
521             td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
522 
523             // If the calling thread occupies the slots out of external thread reserve we need to notify the
524             // market that this arena requires one worker less.
525             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
526                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false);
527             }
528 
529             td.my_last_observer = nullptr;
530             // The task_arena::execute method considers each calling thread as an external thread.
531             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
532         }
533 
534         m_task_dispatcher = td.my_task_dispatcher;
535         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
536         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
537         m_task_dispatcher->m_properties.critical_task_allowed = true;
538 
539         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
540         ed_ext.context = td.my_arena->my_default_ctx;
541         ed_ext.original_slot = td.my_arena_index;
542         ed_ext.affinity_slot = d1::no_slot;
543         ed_ext.task_disp = td.my_task_dispatcher;
544         ed_ext.isolation = no_isolation;
545 
546         __TBB_ASSERT(td.my_arena_slot, nullptr);
547         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
548         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
549     }
550     ~nested_arena_context() {
551         thread_data& td = *m_task_dispatcher->m_thread_data;
552         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
553         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
554         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
555         if (m_orig_arena) {
556             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
557             td.my_last_observer = m_orig_last_observer;
558 
559             // Notify the market that this thread releasing a one slot
560             // that can be used by a worker thread.
561             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
562                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false);
563             }
564 
565             td.leave_task_dispatcher();
566             td.my_arena_slot->release();
567             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
568 
569             td.attach_arena(*m_orig_arena, m_orig_slot_index);
570             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
571             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
572         }
573         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
574     }
575 
576 private:
577     execution_data_ext    m_orig_execute_data_ext{};
578     arena*              m_orig_arena{ nullptr };
579     observer_proxy*     m_orig_last_observer{ nullptr };
580     task_dispatcher*    m_task_dispatcher{ nullptr };
581     unsigned            m_orig_slot_index{};
582     bool                m_orig_fifo_tasks_allowed{};
583     bool                m_orig_critical_task_allowed{};
584 };
585 
586 class delegated_task : public d1::task {
587     d1::delegate_base&  m_delegate;
588     concurrent_monitor& m_monitor;
589     d1::wait_context&   m_wait_ctx;
590     std::atomic<bool>   m_completed;
591     d1::task* execute(d1::execution_data& ed) override {
592         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
593         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
594         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
595             "The execute data shall point to the current task dispatcher execute data");
596         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
597 
598         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
599         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
600         try_call([&] {
601             m_delegate();
602         }).on_completion([&] {
603             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
604             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
605         });
606 
607         finalize();
608         return nullptr;
609     }
610     d1::task* cancel(d1::execution_data&) override {
611         finalize();
612         return nullptr;
613     }
614     void finalize() {
615         m_wait_ctx.release(); // must precede the wakeup
616         m_monitor.notify([this](std::uintptr_t ctx) {
617             return ctx == std::uintptr_t(&m_delegate);
618         }); // do not relax, it needs a fence!
619         m_completed.store(true, std::memory_order_release);
620     }
621 public:
622     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
623         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
624     ~delegated_task() override {
625         // The destructor can be called earlier than the m_monitor is notified
626         // because the waiting thread can be released after m_wait_ctx.release_wait.
627         // To close that race we wait for the m_completed signal.
628         spin_wait_until_eq(m_completed, true);
629     }
630 };
631 
632 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
633     arena* a = ta.my_arena.load(std::memory_order_relaxed);
634     __TBB_ASSERT(a != nullptr, nullptr);
635     thread_data* td = governor::get_thread_data();
636 
637     bool same_arena = td->my_arena == a;
638     std::size_t index1 = td->my_arena_index;
639     if (!same_arena) {
640         index1 = a->occupy_free_slot</*as_worker */false>(*td);
641         if (index1 == arena::out_of_arena) {
642             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
643             d1::wait_context wo(1);
644             d1::task_group_context exec_context(d1::task_group_context::isolated);
645             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
646 
647             delegated_task dt(d, a->my_exit_monitors, wo);
648             a->enqueue_task( dt, exec_context, *td);
649             size_t index2 = arena::out_of_arena;
650             do {
651                 a->my_exit_monitors.prepare_wait(waiter);
652                 if (!wo.continue_execution()) {
653                     a->my_exit_monitors.cancel_wait(waiter);
654                     break;
655                 }
656                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
657                 if (index2 != arena::out_of_arena) {
658                     a->my_exit_monitors.cancel_wait(waiter);
659                     nested_arena_context scope(*td, *a, index2 );
660                     r1::wait(wo, exec_context);
661                     __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
662                     break;
663                 }
664                 a->my_exit_monitors.commit_wait(waiter);
665             } while (wo.continue_execution());
666             if (index2 == arena::out_of_arena) {
667                 // notify a waiting thread even if this thread did not enter arena,
668                 // in case it was woken by a leaving thread but did not need to enter
669                 a->my_exit_monitors.notify_one(); // do not relax!
670             }
671             // process possible exception
672             auto exception = exec_context.my_exception.load(std::memory_order_acquire);
673             if (exception) {
674                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
675                 exception->throw_self();
676             }
677             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
678             return;
679         } // if (index1 == arena::out_of_arena)
680     } // if (!same_arena)
681 
682     context_guard_helper</*report_tasks=*/false> context_guard;
683     context_guard.set_ctx(a->my_default_ctx);
684     nested_arena_context scope(*td, *a, index1);
685 #if _WIN64
686     try {
687 #endif
688         d();
689         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
690 #if _WIN64
691     } catch (...) {
692         context_guard.restore_default();
693         throw;
694     }
695 #endif
696 }
697 
698 void task_arena_impl::wait(d1::task_arena_base& ta) {
699     arena* a = ta.my_arena.load(std::memory_order_relaxed);
700     __TBB_ASSERT(a != nullptr, nullptr);
701     thread_data* td = governor::get_thread_data();
702     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
703     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
704     if (a->my_max_num_workers != 0) {
705         while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
706             yield();
707         }
708     }
709 }
710 
711 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
712     arena* a = nullptr;
713     if( ta ) // for special cases of ta->max_concurrency()
714         a = ta->my_arena.load(std::memory_order_relaxed);
715     else if( thread_data* td = governor::get_thread_data_if_initialized() )
716         a = td->my_arena; // the current arena if any
717 
718     if( a ) { // Get parameters from the arena
719         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
720         return a->my_num_reserved_slots + a->my_max_num_workers
721 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
722             + (a->my_local_concurrency_flag.test() ? 1 : 0)
723 #endif
724             ;
725     }
726 
727     if (ta && ta->my_max_concurrency == 1) {
728         return 1;
729     }
730 
731 #if __TBB_ARENA_BINDING
732     if (ta) {
733         d1::constraints arena_constraints = d1::constraints{}
734             .set_numa_id(ta->my_numa_id)
735             .set_core_type(ta->core_type())
736             .set_max_threads_per_core(ta->max_threads_per_core());
737         return (int)default_concurrency(arena_constraints);
738     }
739 #endif /*!__TBB_ARENA_BINDING*/
740 
741     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
742     return int(governor::default_num_threads());
743 }
744 
745 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
746     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
747     thread_data* tls = governor::get_thread_data();
748     assert_pointers_valid(tls, tls->my_task_dispatcher);
749     task_dispatcher* dispatcher = tls->my_task_dispatcher;
750     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
751     try_call([&] {
752         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
753         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
754         // Save the current isolation value and set new one
755         previous_isolation = dispatcher->set_isolation(current_isolation);
756         // Isolation within this callable
757         d();
758     }).on_completion([&] {
759         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
760         dispatcher->set_isolation(previous_isolation);
761     });
762 }
763 
764 } // namespace r1
765 } // namespace detail
766 } // namespace tbb
767