xref: /oneTBB/src/tbb/arena.cpp (revision b14b68a5)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/info.h"
25 #include "oneapi/tbb/tbb_allocator.h"
26 
27 #include <atomic>
28 #include <cstring>
29 #include <functional>
30 
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34 
35 #if __TBB_ARENA_BINDING
36 class numa_binding_observer : public tbb::task_scheduler_observer {
37     binding_handler* my_binding_handler;
38 public:
39     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
40         : task_scheduler_observer(*ta)
41         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
42     {}
43 
44     void on_scheduler_entry( bool ) override {
45         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
46     }
47 
48     void on_scheduler_exit( bool ) override {
49         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
50     }
51 
52     ~numa_binding_observer() override{
53         destroy_binding_handler(my_binding_handler);
54     }
55 };
56 
57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
58     numa_binding_observer* binding_observer = nullptr;
59     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
60         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
61         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
62         binding_observer->observe(true);
63     }
64     return binding_observer;
65 }
66 
67 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
68     __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
69     binding_observer->observe(false);
70     binding_observer->~numa_binding_observer();
71     deallocate_memory(binding_observer);
72 }
73 #endif /*!__TBB_ARENA_BINDING*/
74 
75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
76     if ( lower >= upper ) return out_of_arena;
77     // Start search for an empty slot from the one we occupied the last time
78     std::size_t index = tls.my_arena_index;
79     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
80     __TBB_ASSERT( index >= lower && index < upper, nullptr);
81     // Find a free slot
82     for ( std::size_t i = index; i < upper; ++i )
83         if (my_slots[i].try_occupy()) return i;
84     for ( std::size_t i = lower; i < index; ++i )
85         if (my_slots[i].try_occupy()) return i;
86     return out_of_arena;
87 }
88 
89 template <bool as_worker>
90 std::size_t arena::occupy_free_slot(thread_data& tls) {
91     // Firstly, external threads try to occupy reserved slots
92     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
93     if ( index == out_of_arena ) {
94         // Secondly, all threads try to occupy all non-reserved slots
95         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
96         // Likely this arena is already saturated
97         if ( index == out_of_arena )
98             return out_of_arena;
99     }
100 
101     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
102     return index;
103 }
104 
105 std::uintptr_t arena::calculate_stealing_threshold() {
106     stack_anchor_type anchor;
107     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
108 }
109 
110 void arena::process(thread_data& tls) {
111     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
112     __TBB_ASSERT( is_alive(my_guard), nullptr);
113     __TBB_ASSERT( my_num_slots > 1, nullptr);
114 
115     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
116     if (index == out_of_arena) {
117         on_thread_leaving<ref_worker>();
118         return;
119     }
120     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
121     tls.attach_arena(*this, index);
122     // worker thread enters the dispatch loop to look for a work
123     tls.my_inbox.set_is_idle(true);
124     if (tls.my_arena_slot->is_task_pool_published()) {
125         tls.my_inbox.set_is_idle(false);
126     }
127 
128     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
129     tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
130     __TBB_ASSERT(task_disp.can_steal(), nullptr);
131 
132     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
133     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
134 
135     // Waiting on special object tied to this arena
136     outermost_worker_waiter waiter(*this);
137     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
138     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
139     // attached to it.
140     tls.my_inbox.set_is_idle(true);
141 
142     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
143     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
144     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
145 
146     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
147     tls.my_last_observer = nullptr;
148 
149     tls.leave_task_dispatcher();
150 
151     // Arena slot detach (arena may be used in market::process)
152     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
153     tls.my_arena_slot->release();
154     tls.my_arena_slot = nullptr;
155     tls.my_inbox.detach();
156     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
157     __TBB_ASSERT(is_alive(my_guard), nullptr);
158 
159     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
160     // that arena may be temporarily left unpopulated by threads. See comments in
161     // arena::on_thread_leaving() for more details.
162     on_thread_leaving<ref_worker>();
163     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
164 }
165 
166 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
167 {
168     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
169     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
170     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
171     my_market = &m;
172     my_limit = 1;
173     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
174     my_num_slots = num_arena_slots(num_slots);
175     my_num_reserved_slots = num_reserved_slots;
176     my_max_num_workers = num_slots-num_reserved_slots;
177     my_priority_level = priority_level;
178     my_references = ref_external; // accounts for the external thread
179     my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
180     my_observers.my_arena = this;
181     my_co_cache.init(4 * num_slots);
182     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
183     // Initialize the default context. It should be allocated before task_dispatch construction.
184     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
185         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
186     // Construct slots. Mark internal synchronization elements for the tools.
187     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
188     for( unsigned i = 0; i < my_num_slots; ++i ) {
189         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
190         __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
191         __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
192         mailbox(i).construct();
193         my_slots[i].init_task_streams(i);
194         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
195         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
196     }
197     my_fifo_task_stream.initialize(my_num_slots);
198     my_resume_task_stream.initialize(my_num_slots);
199 #if __TBB_PREVIEW_CRITICAL_TASKS
200     my_critical_task_stream.initialize(my_num_slots);
201 #endif
202 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
203     my_local_concurrency_requests = 0;
204     my_local_concurrency_flag.clear();
205     my_global_concurrency_mode.store(false, std::memory_order_relaxed);
206 #endif
207 }
208 
209 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
210                               unsigned priority_level )
211 {
212     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
213     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
214     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
215     std::size_t n = allocation_size(num_arena_slots(num_slots));
216     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
217     // Zero all slots to indicate that they are empty
218     std::memset( storage, 0, n );
219     return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) )
220         arena(m, num_slots, num_reserved_slots, priority_level);
221 }
222 
223 void arena::free_arena () {
224     __TBB_ASSERT( is_alive(my_guard), nullptr);
225     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
226     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
227     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
228                   "Inconsistent state of a dying arena" );
229 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
230     __TBB_ASSERT( !my_global_concurrency_mode, nullptr);
231 #endif
232 #if __TBB_ARENA_BINDING
233     if (my_numa_binding_observer != nullptr) {
234         destroy_binding_observer(my_numa_binding_observer);
235         my_numa_binding_observer = nullptr;
236     }
237 #endif /*__TBB_ARENA_BINDING*/
238     poison_value( my_guard );
239     for ( unsigned i = 0; i < my_num_slots; ++i ) {
240         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
241         // TODO: understand the assertion and modify
242         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
243         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
244         my_slots[i].free_task_pool();
245         mailbox(i).drain();
246         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
247     }
248     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
249     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
250     // Cleanup coroutines/schedulers cache
251     my_co_cache.cleanup();
252     my_default_ctx->~task_group_context();
253     cache_aligned_deallocate(my_default_ctx);
254 #if __TBB_PREVIEW_CRITICAL_TASKS
255     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
256 #endif
257     // remove an internal reference
258     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
259 
260     // Clear enfources synchronization with observe(false)
261     my_observers.clear();
262 
263     void* storage  = &mailbox(my_num_slots-1);
264     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
265     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, nullptr);
266     this->~arena();
267 #if TBB_USE_ASSERT > 1
268     std::memset( storage, 0, allocation_size(my_num_slots) );
269 #endif /* TBB_USE_ASSERT */
270     cache_aligned_deallocate( storage );
271 }
272 
273 bool arena::has_enqueued_tasks() {
274     return !my_fifo_task_stream.empty();
275 }
276 
277 bool arena::is_out_of_work() {
278 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
279     if (my_local_concurrency_flag.try_clear_if([this] {
280         return !has_enqueued_tasks();
281     })) {
282         my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true);
283     }
284 #endif
285 
286     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
287     switch (my_pool_state.load(std::memory_order_acquire)) {
288     case SNAPSHOT_EMPTY:
289         return true;
290     case SNAPSHOT_FULL: {
291         // Use unique id for "busy" in order to avoid ABA problems.
292         const pool_state_t busy = pool_state_t(&busy);
293         // Helper for CAS execution
294         pool_state_t expected_state;
295 
296         // Request permission to take snapshot
297         expected_state = SNAPSHOT_FULL;
298         if (my_pool_state.compare_exchange_strong(expected_state, busy)) {
299             // Got permission. Take the snapshot.
300             // NOTE: This is not a lock, as the state can be set to FULL at
301             //       any moment by a thread that spawns/enqueues new task.
302             std::size_t n = my_limit.load(std::memory_order_acquire);
303             // Make local copies of volatile parameters. Their change during
304             // snapshot taking procedure invalidates the attempt, and returns
305             // this thread into the dispatch loop.
306             std::size_t k;
307             for (k = 0; k < n; ++k) {
308                 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
309                     my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed))
310                 {
311                     // k-th primary task pool is nonempty and does contain tasks.
312                     break;
313                 }
314                 if (my_pool_state.load(std::memory_order_acquire) != busy)
315                     return false; // the work was published
316             }
317             bool work_absent = k == n;
318             // Test and test-and-set.
319             if (my_pool_state.load(std::memory_order_acquire) == busy) {
320                 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty();
321 #if __TBB_PREVIEW_CRITICAL_TASKS
322                 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
323 #endif
324                 work_absent = work_absent && no_stream_tasks;
325                 if (work_absent) {
326                     // save current demand value before setting SNAPSHOT_EMPTY,
327                     // to avoid race with advertise_new_work.
328                     int current_demand = (int)my_max_num_workers;
329                     expected_state = busy;
330                     if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
331                         // This thread transitioned pool to empty state, and thus is
332                         // responsible for telling the market that there is no work to do.
333                         my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false);
334                         return true;
335                     }
336                     return false;
337                 }
338                 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
339                 expected_state = busy;
340                 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL);
341             }
342         }
343         return false;
344     }
345     default:
346         // Another thread is taking a snapshot.
347         return false;
348     }
349 }
350 
351 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
352     task_group_context_impl::bind_to(ctx, &td);
353     task_accessor::context(t) = &ctx;
354     task_accessor::isolation(t) = no_isolation;
355     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
356     advertise_new_work<work_enqueued>();
357 }
358 
359 } // namespace r1
360 } // namespace detail
361 } // namespace tbb
362 
363 // Enable task_arena.h
364 #include "oneapi/tbb/task_arena.h" // task_arena_base
365 
366 namespace tbb {
367 namespace detail {
368 namespace r1 {
369 
370 #if TBB_USE_ASSERT
371 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
372     bool is_arena_priority_correct =
373         a_priority == tbb::task_arena::priority::high   ||
374         a_priority == tbb::task_arena::priority::normal ||
375         a_priority == tbb::task_arena::priority::low;
376     __TBB_ASSERT( is_arena_priority_correct,
377                   "Task arena priority should be equal to one of the predefined values." );
378 }
379 #else
380 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
381 #endif
382 
383 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
384     assert_arena_priority_valid( a_priority );
385     return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
386 }
387 
388 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
389     auto priority = tbb::task_arena::priority(
390         (market::num_priority_levels - priority_level) * d1::priority_stride
391     );
392     assert_arena_priority_valid( priority );
393     return priority;
394 }
395 
396 struct task_arena_impl {
397     static void initialize(d1::task_arena_base&);
398     static void terminate(d1::task_arena_base&);
399     static bool attach(d1::task_arena_base&);
400     static void execute(d1::task_arena_base&, d1::delegate_base&);
401     static void wait(d1::task_arena_base&);
402     static int max_concurrency(const d1::task_arena_base*);
403     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
404 };
405 
406 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
407     task_arena_impl::initialize(ta);
408 }
409 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
410     task_arena_impl::terminate(ta);
411 }
412 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
413     return task_arena_impl::attach(ta);
414 }
415 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
416     task_arena_impl::execute(ta, d);
417 }
418 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
419     task_arena_impl::wait(ta);
420 }
421 
422 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
423     return task_arena_impl::max_concurrency(ta);
424 }
425 
426 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
427     task_arena_impl::enqueue(t, nullptr, ta);
428 }
429 
430 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
431     task_arena_impl::enqueue(t, &ctx, ta);
432 }
433 
434 void task_arena_impl::initialize(d1::task_arena_base& ta) {
435     // Enforce global market initialization to properly initialize soft limit
436     (void)governor::get_thread_data();
437     if (ta.my_max_concurrency < 1) {
438 #if __TBB_ARENA_BINDING
439 
440 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
441         d1::constraints arena_constraints = d1::constraints{}
442             .set_core_type(ta.core_type())
443             .set_max_threads_per_core(ta.max_threads_per_core())
444             .set_numa_id(ta.my_numa_id);
445         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
446 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
447         ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id);
448 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
449 
450 #else /*!__TBB_ARENA_BINDING*/
451         ta.my_max_concurrency = (int)governor::default_num_threads();
452 #endif /*!__TBB_ARENA_BINDING*/
453     }
454 
455     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
456     unsigned priority_level = arena_priority_level(ta.my_priority);
457     arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
458     ta.my_arena.store(a, std::memory_order_release);
459     // add an internal market reference; a public reference was added in create_arena
460     market::global_market( /*is_public=*/false);
461 #if __TBB_ARENA_BINDING
462     a->my_numa_binding_observer = construct_binding_observer(
463         static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
464 #endif /*__TBB_ARENA_BINDING*/
465 }
466 
467 void task_arena_impl::terminate(d1::task_arena_base& ta) {
468     arena* a = ta.my_arena.load(std::memory_order_relaxed);
469     assert_pointer_valid(a);
470     a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
471     a->on_thread_leaving<arena::ref_external>();
472     ta.my_arena.store(nullptr, std::memory_order_relaxed);
473 }
474 
475 bool task_arena_impl::attach(d1::task_arena_base& ta) {
476     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
477     thread_data* td = governor::get_thread_data_if_initialized();
478     if( td && td->my_arena ) {
479         arena* a = td->my_arena;
480         // There is an active arena to attach to.
481         // It's still used by s, so won't be destroyed right away.
482         __TBB_ASSERT(a->my_references > 0, nullptr);
483         a->my_references += arena::ref_external;
484         ta.my_num_reserved_slots = a->my_num_reserved_slots;
485         ta.my_priority = arena_priority(a->my_priority_level);
486         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
487         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, nullptr);
488         ta.my_arena.store(a, std::memory_order_release);
489         // increases market's ref count for task_arena
490         market::global_market( /*is_public=*/true );
491         return true;
492     }
493     return false;
494 }
495 
496 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
497     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
498     assert_pointer_valid(td, "thread_data pointer should not be null");
499     arena* a = ta ?
500               ta->my_arena.load(std::memory_order_relaxed)
501             : td->my_arena
502     ;
503     assert_pointer_valid(a, "arena pointer should not be null");
504     auto* ctx = c ? c : a->my_default_ctx;
505     assert_pointer_valid(ctx, "context pointer should not be null");
506     // Is there a better place for checking the state of ctx?
507      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
508                   "The task will not be executed because its task_group_context is cancelled.");
509      a->enqueue_task(t, *ctx, *td);
510 }
511 
512 class nested_arena_context : no_copy {
513 public:
514     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
515         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
516     {
517         if (td.my_arena != &nested_arena) {
518             m_orig_arena = td.my_arena;
519             m_orig_slot_index = td.my_arena_index;
520             m_orig_last_observer = td.my_last_observer;
521 
522             td.detach_task_dispatcher();
523             td.attach_arena(nested_arena, slot_index);
524             if (td.my_inbox.is_idle_state(true))
525                 td.my_inbox.set_is_idle(false);
526             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
527             td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
528 
529             // If the calling thread occupies the slots out of external thread reserve we need to notify the
530             // market that this arena requires one worker less.
531             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
532                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false);
533             }
534 
535             td.my_last_observer = nullptr;
536             // The task_arena::execute method considers each calling thread as an external thread.
537             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
538         }
539 
540         m_task_dispatcher = td.my_task_dispatcher;
541         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
542         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
543         m_task_dispatcher->m_properties.critical_task_allowed = true;
544 
545         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
546         ed_ext.context = td.my_arena->my_default_ctx;
547         ed_ext.original_slot = td.my_arena_index;
548         ed_ext.affinity_slot = d1::no_slot;
549         ed_ext.task_disp = td.my_task_dispatcher;
550         ed_ext.isolation = no_isolation;
551 
552         __TBB_ASSERT(td.my_arena_slot, nullptr);
553         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
554         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
555     }
556     ~nested_arena_context() {
557         thread_data& td = *m_task_dispatcher->m_thread_data;
558         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
559         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
560         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
561         if (m_orig_arena) {
562             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
563             td.my_last_observer = m_orig_last_observer;
564 
565             // Notify the market that this thread releasing a one slot
566             // that can be used by a worker thread.
567             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
568                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false);
569             }
570 
571             td.leave_task_dispatcher();
572             td.my_arena_slot->release();
573             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
574 
575             td.attach_arena(*m_orig_arena, m_orig_slot_index);
576             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
577             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
578         }
579         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
580     }
581 
582 private:
583     execution_data_ext    m_orig_execute_data_ext{};
584     arena*              m_orig_arena{ nullptr };
585     observer_proxy*     m_orig_last_observer{ nullptr };
586     task_dispatcher*    m_task_dispatcher{ nullptr };
587     unsigned            m_orig_slot_index{};
588     bool                m_orig_fifo_tasks_allowed{};
589     bool                m_orig_critical_task_allowed{};
590 };
591 
592 class delegated_task : public d1::task {
593     d1::delegate_base&  m_delegate;
594     concurrent_monitor& m_monitor;
595     d1::wait_context&   m_wait_ctx;
596     std::atomic<bool>   m_completed;
597     d1::task* execute(d1::execution_data& ed) override {
598         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
599         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
600         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
601             "The execute data shall point to the current task dispatcher execute data");
602         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
603 
604         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
605         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
606         try_call([&] {
607             m_delegate();
608         }).on_completion([&] {
609             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
610             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
611         });
612 
613         finalize();
614         return nullptr;
615     }
616     d1::task* cancel(d1::execution_data&) override {
617         finalize();
618         return nullptr;
619     }
620     void finalize() {
621         m_wait_ctx.release(); // must precede the wakeup
622         m_monitor.notify([this](std::uintptr_t ctx) {
623             return ctx == std::uintptr_t(&m_delegate);
624         }); // do not relax, it needs a fence!
625         m_completed.store(true, std::memory_order_release);
626     }
627 public:
628     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
629         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
630     ~delegated_task() override {
631         // The destructor can be called earlier than the m_monitor is notified
632         // because the waiting thread can be released after m_wait_ctx.release_wait.
633         // To close that race we wait for the m_completed signal.
634         spin_wait_until_eq(m_completed, true);
635     }
636 };
637 
638 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
639     arena* a = ta.my_arena.load(std::memory_order_relaxed);
640     __TBB_ASSERT(a != nullptr, nullptr);
641     thread_data* td = governor::get_thread_data();
642 
643     bool same_arena = td->my_arena == a;
644     std::size_t index1 = td->my_arena_index;
645     if (!same_arena) {
646         index1 = a->occupy_free_slot</*as_worker */false>(*td);
647         if (index1 == arena::out_of_arena) {
648             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
649             d1::wait_context wo(1);
650             d1::task_group_context exec_context(d1::task_group_context::isolated);
651             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
652 
653             delegated_task dt(d, a->my_exit_monitors, wo);
654             a->enqueue_task( dt, exec_context, *td);
655             size_t index2 = arena::out_of_arena;
656             do {
657                 a->my_exit_monitors.prepare_wait(waiter);
658                 if (!wo.continue_execution()) {
659                     a->my_exit_monitors.cancel_wait(waiter);
660                     break;
661                 }
662                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
663                 if (index2 != arena::out_of_arena) {
664                     a->my_exit_monitors.cancel_wait(waiter);
665                     nested_arena_context scope(*td, *a, index2 );
666                     r1::wait(wo, exec_context);
667                     __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
668                     break;
669                 }
670                 a->my_exit_monitors.commit_wait(waiter);
671             } while (wo.continue_execution());
672             if (index2 == arena::out_of_arena) {
673                 // notify a waiting thread even if this thread did not enter arena,
674                 // in case it was woken by a leaving thread but did not need to enter
675                 a->my_exit_monitors.notify_one(); // do not relax!
676             }
677             // process possible exception
678             auto exception = exec_context.my_exception.load(std::memory_order_acquire);
679             if (exception) {
680                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
681                 exception->throw_self();
682             }
683             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
684             return;
685         } // if (index1 == arena::out_of_arena)
686     } // if (!same_arena)
687 
688     context_guard_helper</*report_tasks=*/false> context_guard;
689     context_guard.set_ctx(a->my_default_ctx);
690     nested_arena_context scope(*td, *a, index1);
691 #if _WIN64
692     try {
693 #endif
694         d();
695         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
696 #if _WIN64
697     } catch (...) {
698         context_guard.restore_default();
699         throw;
700     }
701 #endif
702 }
703 
704 void task_arena_impl::wait(d1::task_arena_base& ta) {
705     arena* a = ta.my_arena.load(std::memory_order_relaxed);
706     __TBB_ASSERT(a != nullptr, nullptr);
707     thread_data* td = governor::get_thread_data();
708     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
709     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
710     if (a->my_max_num_workers != 0) {
711         while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
712             yield();
713         }
714     }
715 }
716 
717 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
718     arena* a = nullptr;
719     if( ta ) // for special cases of ta->max_concurrency()
720         a = ta->my_arena.load(std::memory_order_relaxed);
721     else if( thread_data* td = governor::get_thread_data_if_initialized() )
722         a = td->my_arena; // the current arena if any
723 
724     if( a ) { // Get parameters from the arena
725         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
726         return a->my_num_reserved_slots + a->my_max_num_workers
727 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
728             + (a->my_local_concurrency_flag.test() ? 1 : 0)
729 #endif
730             ;
731     }
732 
733     if (ta && ta->my_max_concurrency == 1) {
734         return 1;
735     }
736 
737 #if __TBB_ARENA_BINDING
738     if (ta) {
739 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
740         d1::constraints arena_constraints = d1::constraints{}
741             .set_numa_id(ta->my_numa_id)
742             .set_core_type(ta->core_type())
743             .set_max_threads_per_core(ta->max_threads_per_core());
744         return (int)default_concurrency(arena_constraints);
745 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
746         return (int)default_concurrency(ta->my_numa_id);
747 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
748     }
749 #endif /*!__TBB_ARENA_BINDING*/
750 
751     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
752     return int(governor::default_num_threads());
753 }
754 
755 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
756     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
757     thread_data* tls = governor::get_thread_data();
758     assert_pointers_valid(tls, tls->my_task_dispatcher);
759     task_dispatcher* dispatcher = tls->my_task_dispatcher;
760     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
761     try_call([&] {
762         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
763         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
764         // Save the current isolation value and set new one
765         previous_isolation = dispatcher->set_isolation(current_isolation);
766         // Isolation within this callable
767         d();
768     }).on_completion([&] {
769         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
770         dispatcher->set_isolation(previous_isolation);
771     });
772 }
773 
774 } // namespace r1
775 } // namespace detail
776 } // namespace tbb
777