xref: /oneTBB/src/tbb/arena.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/info.h"
25 #include "oneapi/tbb/tbb_allocator.h"
26 
27 #include <atomic>
28 #include <cstring>
29 #include <functional>
30 
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34 
35 #if __TBB_ARENA_BINDING
36 class numa_binding_observer : public tbb::task_scheduler_observer {
37     binding_handler* my_binding_handler;
38 public:
39     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
40         : task_scheduler_observer(*ta)
41         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
42     {}
43 
44     void on_scheduler_entry( bool ) override {
45         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
46     }
47 
48     void on_scheduler_exit( bool ) override {
49         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
50     }
51 
52     ~numa_binding_observer(){
53         destroy_binding_handler(my_binding_handler);
54     }
55 };
56 
57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
58     numa_binding_observer* binding_observer = nullptr;
59     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
60         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
61         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
62         binding_observer->observe(true);
63     }
64     return binding_observer;
65 }
66 
67 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
68     __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer");
69     binding_observer->observe(false);
70     binding_observer->~numa_binding_observer();
71     deallocate_memory(binding_observer);
72 }
73 #endif /*!__TBB_ARENA_BINDING*/
74 
75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
76     if ( lower >= upper ) return out_of_arena;
77     // Start search for an empty slot from the one we occupied the last time
78     std::size_t index = tls.my_arena_index;
79     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
80     __TBB_ASSERT( index >= lower && index < upper, NULL );
81     // Find a free slot
82     for ( std::size_t i = index; i < upper; ++i )
83         if (my_slots[i].try_occupy()) return i;
84     for ( std::size_t i = lower; i < index; ++i )
85         if (my_slots[i].try_occupy()) return i;
86     return out_of_arena;
87 }
88 
89 template <bool as_worker>
90 std::size_t arena::occupy_free_slot(thread_data& tls) {
91     // Firstly, external threads try to occupy reserved slots
92     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
93     if ( index == out_of_arena ) {
94         // Secondly, all threads try to occupy all non-reserved slots
95         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
96         // Likely this arena is already saturated
97         if ( index == out_of_arena )
98             return out_of_arena;
99     }
100 
101     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
102     return index;
103 }
104 
105 std::uintptr_t arena::calculate_stealing_threshold() {
106     stack_anchor_type anchor;
107     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
108 }
109 
110 void arena::process(thread_data& tls) {
111     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
112     __TBB_ASSERT( is_alive(my_guard), nullptr);
113     __TBB_ASSERT( my_num_slots > 1, nullptr);
114 
115     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
116     if (index == out_of_arena) {
117         on_thread_leaving<ref_worker>();
118         return;
119     }
120     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
121     tls.attach_arena(*this, index);
122 
123     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
124     task_disp.set_stealing_threshold(calculate_stealing_threshold());
125     __TBB_ASSERT(task_disp.can_steal(), nullptr);
126     tls.attach_task_dispatcher(task_disp);
127 
128     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
129     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
130 
131     // Waiting on special object tied to this arena
132     outermost_worker_waiter waiter(*this);
133     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
134     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
135     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
136     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
137 
138     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
139     tls.my_last_observer = nullptr;
140 
141     task_disp.set_stealing_threshold(0);
142     tls.detach_task_dispatcher();
143 
144     // Arena slot detach (arena may be used in market::process)
145     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
146     tls.my_arena_slot->release();
147     tls.my_arena_slot = nullptr;
148     tls.my_inbox.detach();
149     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
150     __TBB_ASSERT(is_alive(my_guard), nullptr);
151 
152     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
153     // that arena may be temporarily left unpopulated by threads. See comments in
154     // arena::on_thread_leaving() for more details.
155     on_thread_leaving<ref_worker>();
156     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
157 }
158 
159 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
160 {
161     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
162     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
163     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
164     my_market = &m;
165     my_limit = 1;
166     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
167     my_num_slots = num_arena_slots(num_slots);
168     my_num_reserved_slots = num_reserved_slots;
169     my_max_num_workers = num_slots-num_reserved_slots;
170     my_priority_level = priority_level;
171     my_references = ref_external; // accounts for the external thread
172     my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
173     my_observers.my_arena = this;
174     my_co_cache.init(4 * num_slots);
175     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
176     // Initialize the default context. It should be allocated before task_dispatch construction.
177     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
178         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
179     // Construct slots. Mark internal synchronization elements for the tools.
180     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
181     for( unsigned i = 0; i < my_num_slots; ++i ) {
182         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
183         __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
184         __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
185         mailbox(i).construct();
186         my_slots[i].init_task_streams(i);
187         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
188         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
189     }
190     my_fifo_task_stream.initialize(my_num_slots);
191     my_resume_task_stream.initialize(my_num_slots);
192 #if __TBB_PREVIEW_CRITICAL_TASKS
193     my_critical_task_stream.initialize(my_num_slots);
194 #endif
195 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
196     my_local_concurrency_requests = 0;
197     my_local_concurrency_flag.clear();
198     my_global_concurrency_mode.store(false, std::memory_order_relaxed);
199 #endif
200 }
201 
202 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
203                               unsigned priority_level )
204 {
205     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
206     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
207     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
208     std::size_t n = allocation_size(num_arena_slots(num_slots));
209     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
210     // Zero all slots to indicate that they are empty
211     std::memset( storage, 0, n );
212     return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) )
213         arena(m, num_slots, num_reserved_slots, priority_level);
214 }
215 
216 void arena::free_arena () {
217     __TBB_ASSERT( is_alive(my_guard), NULL );
218     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
219     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
220     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
221                   "Inconsistent state of a dying arena" );
222 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
223     __TBB_ASSERT( !my_global_concurrency_mode, NULL );
224 #endif
225     poison_value( my_guard );
226     std::intptr_t drained = 0;
227     for ( unsigned i = 0; i < my_num_slots; ++i ) {
228         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
229         // TODO: understand the assertion and modify
230         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
231         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
232         my_slots[i].free_task_pool();
233         drained += mailbox(i).drain();
234         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
235     }
236     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
237     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
238     // Cleanup coroutines/schedulers cache
239     my_co_cache.cleanup();
240     my_default_ctx->~task_group_context();
241     cache_aligned_deallocate(my_default_ctx);
242 #if __TBB_PREVIEW_CRITICAL_TASKS
243     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
244 #endif
245     // remove an internal reference
246     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
247     if ( !my_observers.empty() ) {
248         my_observers.clear();
249     }
250     void* storage  = &mailbox(my_num_slots-1);
251     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL );
252     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
253     this->~arena();
254 #if TBB_USE_ASSERT > 1
255     std::memset( storage, 0, allocation_size(my_num_slots) );
256 #endif /* TBB_USE_ASSERT */
257     cache_aligned_deallocate( storage );
258 }
259 
260 bool arena::has_enqueued_tasks() {
261     return !my_fifo_task_stream.empty();
262 }
263 
264 bool arena::is_out_of_work() {
265 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
266     if (my_local_concurrency_flag.try_clear_if([this] {
267         return !has_enqueued_tasks();
268     })) {
269         my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true);
270     }
271 #endif
272 
273     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
274     switch (my_pool_state.load(std::memory_order_acquire)) {
275     case SNAPSHOT_EMPTY:
276         return true;
277     case SNAPSHOT_FULL: {
278         // Use unique id for "busy" in order to avoid ABA problems.
279         const pool_state_t busy = pool_state_t(&busy);
280         // Helper for CAS execution
281         pool_state_t expected_state;
282 
283         // Request permission to take snapshot
284         expected_state = SNAPSHOT_FULL;
285         if (my_pool_state.compare_exchange_strong(expected_state, busy)) {
286             // Got permission. Take the snapshot.
287             // NOTE: This is not a lock, as the state can be set to FULL at
288             //       any moment by a thread that spawns/enqueues new task.
289             std::size_t n = my_limit.load(std::memory_order_acquire);
290             // Make local copies of volatile parameters. Their change during
291             // snapshot taking procedure invalidates the attempt, and returns
292             // this thread into the dispatch loop.
293             std::size_t k;
294             for (k = 0; k < n; ++k) {
295                 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
296                     my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed))
297                 {
298                     // k-th primary task pool is nonempty and does contain tasks.
299                     break;
300                 }
301                 if (my_pool_state.load(std::memory_order_acquire) != busy)
302                     return false; // the work was published
303             }
304             bool work_absent = k == n;
305             // Test and test-and-set.
306             if (my_pool_state.load(std::memory_order_acquire) == busy) {
307                 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty();
308 #if __TBB_PREVIEW_CRITICAL_TASKS
309                 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
310 #endif
311                 work_absent = work_absent && no_stream_tasks;
312                 if (work_absent) {
313                     // save current demand value before setting SNAPSHOT_EMPTY,
314                     // to avoid race with advertise_new_work.
315                     int current_demand = (int)my_max_num_workers;
316                     expected_state = busy;
317                     if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
318                         // This thread transitioned pool to empty state, and thus is
319                         // responsible for telling the market that there is no work to do.
320                         my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false);
321                         return true;
322                     }
323                     return false;
324                 }
325                 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
326                 expected_state = busy;
327                 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL);
328             }
329         }
330         return false;
331     }
332     default:
333         // Another thread is taking a snapshot.
334         return false;
335     }
336 }
337 
338 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
339     task_group_context_impl::bind_to(ctx, &td);
340     task_accessor::context(t) = &ctx;
341     task_accessor::isolation(t) = no_isolation;
342     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
343     advertise_new_work<work_enqueued>();
344 }
345 
346 } // namespace r1
347 } // namespace detail
348 } // namespace tbb
349 
350 // Enable task_arena.h
351 #include "oneapi/tbb/task_arena.h" // task_arena_base
352 
353 namespace tbb {
354 namespace detail {
355 namespace r1 {
356 
357 #if TBB_USE_ASSERT
358 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
359     bool is_arena_priority_correct =
360         a_priority == tbb::task_arena::priority::high   ||
361         a_priority == tbb::task_arena::priority::normal ||
362         a_priority == tbb::task_arena::priority::low;
363     __TBB_ASSERT( is_arena_priority_correct,
364                   "Task arena priority should be equal to one of the predefined values." );
365 }
366 #else
367 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
368 #endif
369 
370 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
371     assert_arena_priority_valid( a_priority );
372     return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
373 }
374 
375 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
376     auto priority = tbb::task_arena::priority(
377         (market::num_priority_levels - priority_level) * d1::priority_stride
378     );
379     assert_arena_priority_valid( priority );
380     return priority;
381 }
382 
383 struct task_arena_impl {
384     static void initialize(d1::task_arena_base&);
385     static void terminate(d1::task_arena_base&);
386     static bool attach(d1::task_arena_base&);
387     static void execute(d1::task_arena_base&, d1::delegate_base&);
388     static void wait(d1::task_arena_base&);
389     static int max_concurrency(const d1::task_arena_base*);
390     static void enqueue(d1::task&, d1::task_arena_base*);
391 };
392 
393 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
394     task_arena_impl::initialize(ta);
395 }
396 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
397     task_arena_impl::terminate(ta);
398 }
399 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
400     return task_arena_impl::attach(ta);
401 }
402 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
403     task_arena_impl::execute(ta, d);
404 }
405 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
406     task_arena_impl::wait(ta);
407 }
408 
409 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
410     return task_arena_impl::max_concurrency(ta);
411 }
412 
413 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
414     task_arena_impl::enqueue(t, ta);
415 }
416 
417 void task_arena_impl::initialize(d1::task_arena_base& ta) {
418     governor::one_time_init();
419     if (ta.my_max_concurrency < 1) {
420 #if __TBB_ARENA_BINDING
421 
422 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
423         d1::constraints arena_constraints = d1::constraints{}
424             .set_core_type(ta.core_type())
425             .set_max_threads_per_core(ta.max_threads_per_core())
426             .set_numa_id(ta.my_numa_id);
427         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
428 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
429         ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id);
430 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
431 
432 #else /*!__TBB_ARENA_BINDING*/
433         ta.my_max_concurrency = (int)governor::default_num_threads();
434 #endif /*!__TBB_ARENA_BINDING*/
435     }
436 
437     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
438     unsigned priority_level = arena_priority_level(ta.my_priority);
439     arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
440     ta.my_arena.store(a, std::memory_order_release);
441     // add an internal market reference; a public reference was added in create_arena
442     market::global_market( /*is_public=*/false);
443 #if __TBB_ARENA_BINDING
444     a->my_numa_binding_observer = construct_binding_observer(
445         static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
446 #endif /*__TBB_ARENA_BINDING*/
447 }
448 
449 void task_arena_impl::terminate(d1::task_arena_base& ta) {
450     arena* a = ta.my_arena.load(std::memory_order_relaxed);
451     assert_pointer_valid(a);
452 #if __TBB_ARENA_BINDING
453     if(a->my_numa_binding_observer != nullptr ) {
454         destroy_binding_observer(a->my_numa_binding_observer);
455         a->my_numa_binding_observer = nullptr;
456     }
457 #endif /*__TBB_ARENA_BINDING*/
458     a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
459     a->on_thread_leaving<arena::ref_external>();
460     ta.my_arena.store(nullptr, std::memory_order_relaxed);
461 }
462 
463 bool task_arena_impl::attach(d1::task_arena_base& ta) {
464     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
465     thread_data* td = governor::get_thread_data_if_initialized();
466     if( td && td->my_arena ) {
467         arena* a = td->my_arena;
468         // There is an active arena to attach to.
469         // It's still used by s, so won't be destroyed right away.
470         __TBB_ASSERT(a->my_references > 0, NULL );
471         a->my_references += arena::ref_external;
472         ta.my_num_reserved_slots = a->my_num_reserved_slots;
473         ta.my_priority = arena_priority(a->my_priority_level);
474         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
475         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL);
476         ta.my_arena.store(a, std::memory_order_release);
477         // increases market's ref count for task_arena
478         market::global_market( /*is_public=*/true );
479         return true;
480     }
481     return false;
482 }
483 
484 void task_arena_impl::enqueue(d1::task& t, d1::task_arena_base* ta) {
485     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
486     arena* a = ta->my_arena.load(std::memory_order_relaxed);
487     assert_pointers_valid(ta, a, a->my_default_ctx, td);
488     // Is there a better place for checking the state of my_default_ctx?
489      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
490                   "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
491      a->enqueue_task(t, *a->my_default_ctx, *td);
492 }
493 
494 class nested_arena_context : no_copy {
495 public:
496     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
497         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
498     {
499         if (td.my_arena != &nested_arena) {
500             m_orig_arena = td.my_arena;
501             m_orig_slot_index = td.my_arena_index;
502             m_orig_last_observer = td.my_last_observer;
503 
504             td.detach_task_dispatcher();
505             td.attach_arena(nested_arena, slot_index);
506             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
507             task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold);
508             td.attach_task_dispatcher(task_disp);
509 
510             // If the calling thread occupies the slots out of external thread reserve we need to notify the
511             // market that this arena requires one worker less.
512             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
513                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false);
514             }
515 
516             td.my_last_observer = nullptr;
517             // The task_arena::execute method considers each calling thread as an external thread.
518             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
519         }
520 
521         m_task_dispatcher = td.my_task_dispatcher;
522         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
523         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
524         m_task_dispatcher->m_properties.critical_task_allowed = true;
525 
526         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
527         ed_ext.context = td.my_arena->my_default_ctx;
528         ed_ext.original_slot = td.my_arena_index;
529         ed_ext.affinity_slot = d1::no_slot;
530         ed_ext.task_disp = td.my_task_dispatcher;
531         ed_ext.isolation = no_isolation;
532 
533         __TBB_ASSERT(td.my_arena_slot, nullptr);
534         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
535         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
536     }
537     ~nested_arena_context() {
538         thread_data& td = *m_task_dispatcher->m_thread_data;
539         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
540         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
541         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
542         if (m_orig_arena) {
543             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
544             td.my_last_observer = m_orig_last_observer;
545 
546             // Notify the market that this thread releasing a one slot
547             // that can be used by a worker thread.
548             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
549                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false);
550             }
551 
552             td.my_task_dispatcher->set_stealing_threshold(0);
553             td.detach_task_dispatcher();
554             td.my_arena_slot->release();
555             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
556 
557             td.attach_arena(*m_orig_arena, m_orig_slot_index);
558             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
559         }
560         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
561     }
562 
563 private:
564     execution_data_ext    m_orig_execute_data_ext{};
565     arena*              m_orig_arena{ nullptr };
566     observer_proxy*     m_orig_last_observer{ nullptr };
567     task_dispatcher*    m_task_dispatcher{ nullptr };
568     unsigned            m_orig_slot_index{};
569     bool                m_orig_fifo_tasks_allowed{};
570     bool                m_orig_critical_task_allowed{};
571 };
572 
573 class delegated_task : public d1::task {
574     d1::delegate_base&  m_delegate;
575     concurrent_monitor& m_monitor;
576     d1::wait_context&   m_wait_ctx;
577     std::atomic<bool>   m_completed;
578     d1::task* execute(d1::execution_data& ed) override {
579         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
580         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
581         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
582             "The execute data shall point to the current task dispatcher execute data");
583         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
584 
585         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
586         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
587         try_call([&] {
588             m_delegate();
589         }).on_completion([&] {
590             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
591             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
592         });
593 
594         finalize();
595         return nullptr;
596     }
597     d1::task* cancel(d1::execution_data&) override {
598         finalize();
599         return nullptr;
600     }
601     void finalize() {
602         m_wait_ctx.release(); // must precede the wakeup
603         m_monitor.notify([this](std::uintptr_t ctx) {
604             return ctx == std::uintptr_t(&m_delegate);
605         }); // do not relax, it needs a fence!
606         m_completed.store(true, std::memory_order_release);
607     }
608 public:
609     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
610         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
611     ~delegated_task() {
612         // The destructor can be called earlier than the m_monitor is notified
613         // because the waiting thread can be released after m_wait_ctx.release_wait.
614         // To close that race we wait for the m_completed signal.
615         spin_wait_until_eq(m_completed, true);
616     }
617 };
618 
619 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
620     arena* a = ta.my_arena.load(std::memory_order_relaxed);
621     __TBB_ASSERT(a != nullptr, nullptr);
622     thread_data* td = governor::get_thread_data();
623 
624     bool same_arena = td->my_arena == a;
625     std::size_t index1 = td->my_arena_index;
626     if (!same_arena) {
627         index1 = a->occupy_free_slot</*as_worker */false>(*td);
628         if (index1 == arena::out_of_arena) {
629             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
630             d1::wait_context wo(1);
631             d1::task_group_context exec_context(d1::task_group_context::isolated);
632             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
633 
634             delegated_task dt(d, a->my_exit_monitors, wo);
635             a->enqueue_task( dt, exec_context, *td);
636             size_t index2 = arena::out_of_arena;
637             do {
638                 a->my_exit_monitors.prepare_wait(waiter);
639                 if (!wo.continue_execution()) {
640                     a->my_exit_monitors.cancel_wait(waiter);
641                     break;
642                 }
643                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
644                 if (index2 != arena::out_of_arena) {
645                     a->my_exit_monitors.cancel_wait(waiter);
646                     nested_arena_context scope(*td, *a, index2 );
647                     r1::wait(wo, exec_context);
648                     __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
649                     break;
650                 }
651                 a->my_exit_monitors.commit_wait(waiter);
652             } while (wo.continue_execution());
653             if (index2 == arena::out_of_arena) {
654                 // notify a waiting thread even if this thread did not enter arena,
655                 // in case it was woken by a leaving thread but did not need to enter
656                 a->my_exit_monitors.notify_one(); // do not relax!
657             }
658             // process possible exception
659             if (exec_context.my_exception) {
660                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
661                 exec_context.my_exception->throw_self();
662             }
663             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
664             return;
665         } // if (index1 == arena::out_of_arena)
666     } // if (!same_arena)
667 
668     context_guard_helper</*report_tasks=*/false> context_guard;
669     context_guard.set_ctx(a->my_default_ctx);
670     nested_arena_context scope(*td, *a, index1);
671 #if _WIN64
672     try {
673 #endif
674         d();
675         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
676 #if _WIN64
677     } catch (...) {
678         context_guard.restore_default();
679         throw;
680     }
681 #endif
682 }
683 
684 void task_arena_impl::wait(d1::task_arena_base& ta) {
685     arena* a = ta.my_arena.load(std::memory_order_relaxed);
686     __TBB_ASSERT(a != nullptr, nullptr);
687     thread_data* td = governor::get_thread_data();
688     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
689     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
690     if (a->my_max_num_workers != 0) {
691         while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
692             yield();
693         }
694     }
695 }
696 
697 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
698     arena* a = nullptr;
699     if( ta ) // for special cases of ta->max_concurrency()
700         a = ta->my_arena.load(std::memory_order_relaxed);
701     else if( thread_data* td = governor::get_thread_data_if_initialized() )
702         a = td->my_arena; // the current arena if any
703 
704     if( a ) { // Get parameters from the arena
705         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
706         return a->my_num_reserved_slots + a->my_max_num_workers
707 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
708             + (a->my_local_concurrency_flag.test() ? 1 : 0)
709 #endif
710             ;
711     }
712 
713     if (ta && ta->my_max_concurrency == 1) {
714         return 1;
715     }
716 
717 #if __TBB_ARENA_BINDING
718     if (ta) {
719 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
720         d1::constraints arena_constraints = d1::constraints{}
721             .set_numa_id(ta->my_numa_id)
722             .set_core_type(ta->core_type())
723             .set_max_threads_per_core(ta->max_threads_per_core());
724         return (int)default_concurrency(arena_constraints);
725 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
726         return (int)default_concurrency(ta->my_numa_id);
727 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
728     }
729 #endif /*!__TBB_ARENA_BINDING*/
730 
731     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL );
732     return int(governor::default_num_threads());
733 }
734 
735 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
736     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
737     thread_data* tls = governor::get_thread_data();
738     assert_pointers_valid(tls, tls->my_task_dispatcher);
739     task_dispatcher* dispatcher = tls->my_task_dispatcher;
740     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
741     try_call([&] {
742         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
743         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
744         // Save the current isolation value and set new one
745         previous_isolation = dispatcher->set_isolation(current_isolation);
746         // Isolation within this callable
747         d();
748     }).on_completion([&] {
749         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL);
750         dispatcher->set_isolation(previous_isolation);
751     });
752 }
753 
754 } // namespace r1
755 } // namespace detail
756 } // namespace tbb
757 
758