xref: /oneTBB/src/tbb/arena.cpp (revision 478de5b1)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/info.h"
25 #include "oneapi/tbb/tbb_allocator.h"
26 
27 #include <atomic>
28 #include <cstring>
29 #include <functional>
30 
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34 
35 #if __TBB_ARENA_BINDING
36 class numa_binding_observer : public tbb::task_scheduler_observer {
37     binding_handler* my_binding_handler;
38 public:
39     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
40         : task_scheduler_observer(*ta)
41         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
42     {}
43 
44     void on_scheduler_entry( bool ) override {
45         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
46     }
47 
48     void on_scheduler_exit( bool ) override {
49         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
50     }
51 
52     ~numa_binding_observer(){
53         destroy_binding_handler(my_binding_handler);
54     }
55 };
56 
57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
58     numa_binding_observer* binding_observer = nullptr;
59     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
60         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
61         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
62         binding_observer->observe(true);
63     }
64     return binding_observer;
65 }
66 
67 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
68     __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer");
69     binding_observer->observe(false);
70     binding_observer->~numa_binding_observer();
71     deallocate_memory(binding_observer);
72 }
73 #endif /*!__TBB_ARENA_BINDING*/
74 
75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
76     if ( lower >= upper ) return out_of_arena;
77     // Start search for an empty slot from the one we occupied the last time
78     std::size_t index = tls.my_arena_index;
79     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
80     __TBB_ASSERT( index >= lower && index < upper, NULL );
81     // Find a free slot
82     for ( std::size_t i = index; i < upper; ++i )
83         if (my_slots[i].try_occupy()) return i;
84     for ( std::size_t i = lower; i < index; ++i )
85         if (my_slots[i].try_occupy()) return i;
86     return out_of_arena;
87 }
88 
89 template <bool as_worker>
90 std::size_t arena::occupy_free_slot(thread_data& tls) {
91     // Firstly, external threads try to occupy reserved slots
92     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
93     if ( index == out_of_arena ) {
94         // Secondly, all threads try to occupy all non-reserved slots
95         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
96         // Likely this arena is already saturated
97         if ( index == out_of_arena )
98             return out_of_arena;
99     }
100 
101     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
102     return index;
103 }
104 
105 std::uintptr_t arena::calculate_stealing_threshold() {
106     stack_anchor_type anchor;
107     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
108 }
109 
110 void arena::process(thread_data& tls) {
111     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
112     __TBB_ASSERT( is_alive(my_guard), nullptr);
113     __TBB_ASSERT( my_num_slots > 1, nullptr);
114 
115     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
116     if (index == out_of_arena) {
117         on_thread_leaving<ref_worker>();
118         return;
119     }
120     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
121     tls.attach_arena(*this, index);
122     // worker thread enters the dispatch loop to look for a work
123     tls.my_inbox.set_is_idle(true);
124     if (tls.my_arena_slot->is_task_pool_published()) {
125             tls.my_inbox.set_is_idle(false);
126     }
127 
128     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
129     task_disp.set_stealing_threshold(calculate_stealing_threshold());
130     __TBB_ASSERT(task_disp.can_steal(), nullptr);
131     tls.attach_task_dispatcher(task_disp);
132 
133     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
134     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
135 
136     // Waiting on special object tied to this arena
137     outermost_worker_waiter waiter(*this);
138     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
139     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
140     // attached to it.
141     tls.my_inbox.set_is_idle(true);
142 
143     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
144     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
145     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
146 
147     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
148     tls.my_last_observer = nullptr;
149 
150     task_disp.set_stealing_threshold(0);
151     tls.detach_task_dispatcher();
152 
153     // Arena slot detach (arena may be used in market::process)
154     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
155     tls.my_arena_slot->release();
156     tls.my_arena_slot = nullptr;
157     tls.my_inbox.detach();
158     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
159     __TBB_ASSERT(is_alive(my_guard), nullptr);
160 
161     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
162     // that arena may be temporarily left unpopulated by threads. See comments in
163     // arena::on_thread_leaving() for more details.
164     on_thread_leaving<ref_worker>();
165     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
166 }
167 
168 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
169 {
170     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
171     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
172     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
173     my_market = &m;
174     my_limit = 1;
175     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
176     my_num_slots = num_arena_slots(num_slots);
177     my_num_reserved_slots = num_reserved_slots;
178     my_max_num_workers = num_slots-num_reserved_slots;
179     my_priority_level = priority_level;
180     my_references = ref_external; // accounts for the external thread
181     my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
182     my_observers.my_arena = this;
183     my_co_cache.init(4 * num_slots);
184     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
185     // Initialize the default context. It should be allocated before task_dispatch construction.
186     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
187         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
188     // Construct slots. Mark internal synchronization elements for the tools.
189     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
190     for( unsigned i = 0; i < my_num_slots; ++i ) {
191         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
192         __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
193         __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
194         mailbox(i).construct();
195         my_slots[i].init_task_streams(i);
196         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
197         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
198     }
199     my_fifo_task_stream.initialize(my_num_slots);
200     my_resume_task_stream.initialize(my_num_slots);
201 #if __TBB_PREVIEW_CRITICAL_TASKS
202     my_critical_task_stream.initialize(my_num_slots);
203 #endif
204 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
205     my_local_concurrency_requests = 0;
206     my_local_concurrency_flag.clear();
207     my_global_concurrency_mode.store(false, std::memory_order_relaxed);
208 #endif
209 }
210 
211 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
212                               unsigned priority_level )
213 {
214     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
215     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
216     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
217     std::size_t n = allocation_size(num_arena_slots(num_slots));
218     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
219     // Zero all slots to indicate that they are empty
220     std::memset( storage, 0, n );
221     return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) )
222         arena(m, num_slots, num_reserved_slots, priority_level);
223 }
224 
225 void arena::free_arena () {
226     __TBB_ASSERT( is_alive(my_guard), NULL );
227     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
228     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
229     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
230                   "Inconsistent state of a dying arena" );
231 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
232     __TBB_ASSERT( !my_global_concurrency_mode, NULL );
233 #endif
234     poison_value( my_guard );
235     std::intptr_t drained = 0;
236     for ( unsigned i = 0; i < my_num_slots; ++i ) {
237         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
238         // TODO: understand the assertion and modify
239         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
240         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
241         my_slots[i].free_task_pool();
242         drained += mailbox(i).drain();
243         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
244     }
245     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
246     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
247     // Cleanup coroutines/schedulers cache
248     my_co_cache.cleanup();
249     my_default_ctx->~task_group_context();
250     cache_aligned_deallocate(my_default_ctx);
251 #if __TBB_PREVIEW_CRITICAL_TASKS
252     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
253 #endif
254     // remove an internal reference
255     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
256     if ( !my_observers.empty() ) {
257         my_observers.clear();
258     }
259     void* storage  = &mailbox(my_num_slots-1);
260     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL );
261     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
262     this->~arena();
263 #if TBB_USE_ASSERT > 1
264     std::memset( storage, 0, allocation_size(my_num_slots) );
265 #endif /* TBB_USE_ASSERT */
266     cache_aligned_deallocate( storage );
267 }
268 
269 bool arena::has_enqueued_tasks() {
270     return !my_fifo_task_stream.empty();
271 }
272 
273 bool arena::is_out_of_work() {
274 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
275     if (my_local_concurrency_flag.try_clear_if([this] {
276         return !has_enqueued_tasks();
277     })) {
278         my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true);
279     }
280 #endif
281 
282     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
283     switch (my_pool_state.load(std::memory_order_acquire)) {
284     case SNAPSHOT_EMPTY:
285         return true;
286     case SNAPSHOT_FULL: {
287         // Use unique id for "busy" in order to avoid ABA problems.
288         const pool_state_t busy = pool_state_t(&busy);
289         // Helper for CAS execution
290         pool_state_t expected_state;
291 
292         // Request permission to take snapshot
293         expected_state = SNAPSHOT_FULL;
294         if (my_pool_state.compare_exchange_strong(expected_state, busy)) {
295             // Got permission. Take the snapshot.
296             // NOTE: This is not a lock, as the state can be set to FULL at
297             //       any moment by a thread that spawns/enqueues new task.
298             std::size_t n = my_limit.load(std::memory_order_acquire);
299             // Make local copies of volatile parameters. Their change during
300             // snapshot taking procedure invalidates the attempt, and returns
301             // this thread into the dispatch loop.
302             std::size_t k;
303             for (k = 0; k < n; ++k) {
304                 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
305                     my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed))
306                 {
307                     // k-th primary task pool is nonempty and does contain tasks.
308                     break;
309                 }
310                 if (my_pool_state.load(std::memory_order_acquire) != busy)
311                     return false; // the work was published
312             }
313             bool work_absent = k == n;
314             // Test and test-and-set.
315             if (my_pool_state.load(std::memory_order_acquire) == busy) {
316                 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty();
317 #if __TBB_PREVIEW_CRITICAL_TASKS
318                 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
319 #endif
320                 work_absent = work_absent && no_stream_tasks;
321                 if (work_absent) {
322                     // save current demand value before setting SNAPSHOT_EMPTY,
323                     // to avoid race with advertise_new_work.
324                     int current_demand = (int)my_max_num_workers;
325                     expected_state = busy;
326                     if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
327                         // This thread transitioned pool to empty state, and thus is
328                         // responsible for telling the market that there is no work to do.
329                         my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false);
330                         return true;
331                     }
332                     return false;
333                 }
334                 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
335                 expected_state = busy;
336                 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL);
337             }
338         }
339         return false;
340     }
341     default:
342         // Another thread is taking a snapshot.
343         return false;
344     }
345 }
346 
347 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
348     task_group_context_impl::bind_to(ctx, &td);
349     task_accessor::context(t) = &ctx;
350     task_accessor::isolation(t) = no_isolation;
351     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
352     advertise_new_work<work_enqueued>();
353 }
354 
355 } // namespace r1
356 } // namespace detail
357 } // namespace tbb
358 
359 // Enable task_arena.h
360 #include "oneapi/tbb/task_arena.h" // task_arena_base
361 
362 namespace tbb {
363 namespace detail {
364 namespace r1 {
365 
366 #if TBB_USE_ASSERT
367 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
368     bool is_arena_priority_correct =
369         a_priority == tbb::task_arena::priority::high   ||
370         a_priority == tbb::task_arena::priority::normal ||
371         a_priority == tbb::task_arena::priority::low;
372     __TBB_ASSERT( is_arena_priority_correct,
373                   "Task arena priority should be equal to one of the predefined values." );
374 }
375 #else
376 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
377 #endif
378 
379 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
380     assert_arena_priority_valid( a_priority );
381     return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
382 }
383 
384 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
385     auto priority = tbb::task_arena::priority(
386         (market::num_priority_levels - priority_level) * d1::priority_stride
387     );
388     assert_arena_priority_valid( priority );
389     return priority;
390 }
391 
392 struct task_arena_impl {
393     static void initialize(d1::task_arena_base&);
394     static void terminate(d1::task_arena_base&);
395     static bool attach(d1::task_arena_base&);
396     static void execute(d1::task_arena_base&, d1::delegate_base&);
397     static void wait(d1::task_arena_base&);
398     static int max_concurrency(const d1::task_arena_base*);
399     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
400 };
401 
402 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
403     task_arena_impl::initialize(ta);
404 }
405 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
406     task_arena_impl::terminate(ta);
407 }
408 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
409     return task_arena_impl::attach(ta);
410 }
411 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
412     task_arena_impl::execute(ta, d);
413 }
414 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
415     task_arena_impl::wait(ta);
416 }
417 
418 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
419     return task_arena_impl::max_concurrency(ta);
420 }
421 
422 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
423     task_arena_impl::enqueue(t, nullptr, ta);
424 }
425 
426 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
427     task_arena_impl::enqueue(t, &ctx, ta);
428 }
429 
430 void task_arena_impl::initialize(d1::task_arena_base& ta) {
431     governor::one_time_init();
432     if (ta.my_max_concurrency < 1) {
433 #if __TBB_ARENA_BINDING
434 
435 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
436         d1::constraints arena_constraints = d1::constraints{}
437             .set_core_type(ta.core_type())
438             .set_max_threads_per_core(ta.max_threads_per_core())
439             .set_numa_id(ta.my_numa_id);
440         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
441 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
442         ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id);
443 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
444 
445 #else /*!__TBB_ARENA_BINDING*/
446         ta.my_max_concurrency = (int)governor::default_num_threads();
447 #endif /*!__TBB_ARENA_BINDING*/
448     }
449 
450     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
451     unsigned priority_level = arena_priority_level(ta.my_priority);
452     arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
453     ta.my_arena.store(a, std::memory_order_release);
454     // add an internal market reference; a public reference was added in create_arena
455     market::global_market( /*is_public=*/false);
456 #if __TBB_ARENA_BINDING
457     a->my_numa_binding_observer = construct_binding_observer(
458         static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
459 #endif /*__TBB_ARENA_BINDING*/
460 }
461 
462 void task_arena_impl::terminate(d1::task_arena_base& ta) {
463     arena* a = ta.my_arena.load(std::memory_order_relaxed);
464     assert_pointer_valid(a);
465 #if __TBB_ARENA_BINDING
466     if(a->my_numa_binding_observer != nullptr ) {
467         destroy_binding_observer(a->my_numa_binding_observer);
468         a->my_numa_binding_observer = nullptr;
469     }
470 #endif /*__TBB_ARENA_BINDING*/
471     a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
472     a->on_thread_leaving<arena::ref_external>();
473     ta.my_arena.store(nullptr, std::memory_order_relaxed);
474 }
475 
476 bool task_arena_impl::attach(d1::task_arena_base& ta) {
477     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
478     thread_data* td = governor::get_thread_data_if_initialized();
479     if( td && td->my_arena ) {
480         arena* a = td->my_arena;
481         // There is an active arena to attach to.
482         // It's still used by s, so won't be destroyed right away.
483         __TBB_ASSERT(a->my_references > 0, NULL );
484         a->my_references += arena::ref_external;
485         ta.my_num_reserved_slots = a->my_num_reserved_slots;
486         ta.my_priority = arena_priority(a->my_priority_level);
487         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
488         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL);
489         ta.my_arena.store(a, std::memory_order_release);
490         // increases market's ref count for task_arena
491         market::global_market( /*is_public=*/true );
492         return true;
493     }
494     return false;
495 }
496 
497 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
498     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
499     assert_pointer_valid(td, "thread_data pointer should not be null");
500     arena* a = ta ?
501               ta->my_arena.load(std::memory_order_relaxed)
502             : td->my_arena
503     ;
504     assert_pointer_valid(a, "arena pointer should not be null");
505     auto* ctx = c ? c : a->my_default_ctx;
506     assert_pointer_valid(ctx, "context pointer should not be null");
507     // Is there a better place for checking the state of ctx?
508      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
509                   "The task will not be executed because its task_group_context is cancelled.");
510      a->enqueue_task(t, *ctx, *td);
511 }
512 
513 class nested_arena_context : no_copy {
514 public:
515     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
516         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
517     {
518         if (td.my_arena != &nested_arena) {
519             m_orig_arena = td.my_arena;
520             m_orig_slot_index = td.my_arena_index;
521             m_orig_last_observer = td.my_last_observer;
522 
523             td.detach_task_dispatcher();
524             td.attach_arena(nested_arena, slot_index);
525             if (td.my_inbox.is_idle_state(true))
526                 td.my_inbox.set_is_idle(false);
527             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
528             task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold);
529             td.attach_task_dispatcher(task_disp);
530 
531             // If the calling thread occupies the slots out of external thread reserve we need to notify the
532             // market that this arena requires one worker less.
533             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
534                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false);
535             }
536 
537             td.my_last_observer = nullptr;
538             // The task_arena::execute method considers each calling thread as an external thread.
539             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
540         }
541 
542         m_task_dispatcher = td.my_task_dispatcher;
543         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
544         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
545         m_task_dispatcher->m_properties.critical_task_allowed = true;
546 
547         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
548         ed_ext.context = td.my_arena->my_default_ctx;
549         ed_ext.original_slot = td.my_arena_index;
550         ed_ext.affinity_slot = d1::no_slot;
551         ed_ext.task_disp = td.my_task_dispatcher;
552         ed_ext.isolation = no_isolation;
553 
554         __TBB_ASSERT(td.my_arena_slot, nullptr);
555         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
556         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
557     }
558     ~nested_arena_context() {
559         thread_data& td = *m_task_dispatcher->m_thread_data;
560         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
561         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
562         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
563         if (m_orig_arena) {
564             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
565             td.my_last_observer = m_orig_last_observer;
566 
567             // Notify the market that this thread releasing a one slot
568             // that can be used by a worker thread.
569             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
570                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false);
571             }
572 
573             td.my_task_dispatcher->set_stealing_threshold(0);
574             td.detach_task_dispatcher();
575             td.my_arena_slot->release();
576             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
577 
578             td.attach_arena(*m_orig_arena, m_orig_slot_index);
579             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
580             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
581         }
582         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
583     }
584 
585 private:
586     execution_data_ext    m_orig_execute_data_ext{};
587     arena*              m_orig_arena{ nullptr };
588     observer_proxy*     m_orig_last_observer{ nullptr };
589     task_dispatcher*    m_task_dispatcher{ nullptr };
590     unsigned            m_orig_slot_index{};
591     bool                m_orig_fifo_tasks_allowed{};
592     bool                m_orig_critical_task_allowed{};
593 };
594 
595 class delegated_task : public d1::task {
596     d1::delegate_base&  m_delegate;
597     concurrent_monitor& m_monitor;
598     d1::wait_context&   m_wait_ctx;
599     std::atomic<bool>   m_completed;
600     d1::task* execute(d1::execution_data& ed) override {
601         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
602         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
603         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
604             "The execute data shall point to the current task dispatcher execute data");
605         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
606 
607         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
608         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
609         try_call([&] {
610             m_delegate();
611         }).on_completion([&] {
612             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
613             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
614         });
615 
616         finalize();
617         return nullptr;
618     }
619     d1::task* cancel(d1::execution_data&) override {
620         finalize();
621         return nullptr;
622     }
623     void finalize() {
624         m_wait_ctx.release(); // must precede the wakeup
625         m_monitor.notify([this](std::uintptr_t ctx) {
626             return ctx == std::uintptr_t(&m_delegate);
627         }); // do not relax, it needs a fence!
628         m_completed.store(true, std::memory_order_release);
629     }
630 public:
631     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
632         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
633     ~delegated_task() {
634         // The destructor can be called earlier than the m_monitor is notified
635         // because the waiting thread can be released after m_wait_ctx.release_wait.
636         // To close that race we wait for the m_completed signal.
637         spin_wait_until_eq(m_completed, true);
638     }
639 };
640 
641 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
642     arena* a = ta.my_arena.load(std::memory_order_relaxed);
643     __TBB_ASSERT(a != nullptr, nullptr);
644     thread_data* td = governor::get_thread_data();
645 
646     bool same_arena = td->my_arena == a;
647     std::size_t index1 = td->my_arena_index;
648     if (!same_arena) {
649         index1 = a->occupy_free_slot</*as_worker */false>(*td);
650         if (index1 == arena::out_of_arena) {
651             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
652             d1::wait_context wo(1);
653             d1::task_group_context exec_context(d1::task_group_context::isolated);
654             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
655 
656             delegated_task dt(d, a->my_exit_monitors, wo);
657             a->enqueue_task( dt, exec_context, *td);
658             size_t index2 = arena::out_of_arena;
659             do {
660                 a->my_exit_monitors.prepare_wait(waiter);
661                 if (!wo.continue_execution()) {
662                     a->my_exit_monitors.cancel_wait(waiter);
663                     break;
664                 }
665                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
666                 if (index2 != arena::out_of_arena) {
667                     a->my_exit_monitors.cancel_wait(waiter);
668                     nested_arena_context scope(*td, *a, index2 );
669                     r1::wait(wo, exec_context);
670                     __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
671                     break;
672                 }
673                 a->my_exit_monitors.commit_wait(waiter);
674             } while (wo.continue_execution());
675             if (index2 == arena::out_of_arena) {
676                 // notify a waiting thread even if this thread did not enter arena,
677                 // in case it was woken by a leaving thread but did not need to enter
678                 a->my_exit_monitors.notify_one(); // do not relax!
679             }
680             // process possible exception
681             if (exec_context.my_exception) {
682                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
683                 exec_context.my_exception->throw_self();
684             }
685             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
686             return;
687         } // if (index1 == arena::out_of_arena)
688     } // if (!same_arena)
689 
690     context_guard_helper</*report_tasks=*/false> context_guard;
691     context_guard.set_ctx(a->my_default_ctx);
692     nested_arena_context scope(*td, *a, index1);
693 #if _WIN64
694     try {
695 #endif
696         d();
697         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
698 #if _WIN64
699     } catch (...) {
700         context_guard.restore_default();
701         throw;
702     }
703 #endif
704 }
705 
706 void task_arena_impl::wait(d1::task_arena_base& ta) {
707     arena* a = ta.my_arena.load(std::memory_order_relaxed);
708     __TBB_ASSERT(a != nullptr, nullptr);
709     thread_data* td = governor::get_thread_data();
710     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
711     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
712     if (a->my_max_num_workers != 0) {
713         while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
714             yield();
715         }
716     }
717 }
718 
719 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
720     arena* a = nullptr;
721     if( ta ) // for special cases of ta->max_concurrency()
722         a = ta->my_arena.load(std::memory_order_relaxed);
723     else if( thread_data* td = governor::get_thread_data_if_initialized() )
724         a = td->my_arena; // the current arena if any
725 
726     if( a ) { // Get parameters from the arena
727         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
728         return a->my_num_reserved_slots + a->my_max_num_workers
729 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
730             + (a->my_local_concurrency_flag.test() ? 1 : 0)
731 #endif
732             ;
733     }
734 
735     if (ta && ta->my_max_concurrency == 1) {
736         return 1;
737     }
738 
739 #if __TBB_ARENA_BINDING
740     if (ta) {
741 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
742         d1::constraints arena_constraints = d1::constraints{}
743             .set_numa_id(ta->my_numa_id)
744             .set_core_type(ta->core_type())
745             .set_max_threads_per_core(ta->max_threads_per_core());
746         return (int)default_concurrency(arena_constraints);
747 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
748         return (int)default_concurrency(ta->my_numa_id);
749 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
750     }
751 #endif /*!__TBB_ARENA_BINDING*/
752 
753     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL );
754     return int(governor::default_num_threads());
755 }
756 
757 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
758     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
759     thread_data* tls = governor::get_thread_data();
760     assert_pointers_valid(tls, tls->my_task_dispatcher);
761     task_dispatcher* dispatcher = tls->my_task_dispatcher;
762     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
763     try_call([&] {
764         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
765         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
766         // Save the current isolation value and set new one
767         previous_isolation = dispatcher->set_isolation(current_isolation);
768         // Isolation within this callable
769         d();
770     }).on_completion([&] {
771         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL);
772         dispatcher->set_isolation(previous_isolation);
773     });
774 }
775 
776 } // namespace r1
777 } // namespace detail
778 } // namespace tbb
779 
780