xref: /oneTBB/src/tbb/arena.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/tbb_allocator.h"
25 
26 #include <atomic>
27 #include <cstring>
28 #include <functional>
29 
30 namespace tbb {
31 namespace detail {
32 namespace r1 {
33 
34 #if __TBB_NUMA_SUPPORT
35 class numa_binding_observer : public tbb::task_scheduler_observer {
36     int my_numa_node_id;
37     binding_handler* my_binding_handler;
38 public:
39     numa_binding_observer( d1::task_arena* ta, int numa_id, int num_slots )
40         : task_scheduler_observer(*ta)
41         , my_numa_node_id(numa_id)
42         , my_binding_handler(construct_binding_handler(num_slots))
43     {}
44 
45     void on_scheduler_entry( bool ) override {
46         bind_thread_to_node( my_binding_handler, this_task_arena::current_thread_index(), my_numa_node_id);
47     }
48 
49     void on_scheduler_exit( bool ) override {
50         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
51     }
52 
53     ~numa_binding_observer(){
54         destroy_binding_handler(my_binding_handler);
55     }
56 };
57 
58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int numa_id, int num_slots ) {
59     numa_binding_observer* binding_observer = nullptr;
60     // numa_topology initialization will be lazily performed inside nodes_count() call
61     if (numa_id >= 0 && numa_node_count() > 1) {
62         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, numa_id, num_slots);
63         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
64         binding_observer->observe(true);
65     }
66     return binding_observer;
67 }
68 
69 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
70     __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer");
71     binding_observer->observe(false);
72     binding_observer->~numa_binding_observer();
73     deallocate_memory(binding_observer);
74 }
75 #endif
76 
77 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
78     if ( lower >= upper ) return out_of_arena;
79     // Start search for an empty slot from the one we occupied the last time
80     std::size_t index = tls.my_arena_index;
81     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
82     __TBB_ASSERT( index >= lower && index < upper, NULL );
83     // Find a free slot
84     for ( std::size_t i = index; i < upper; ++i )
85         if (my_slots[i].try_occupy()) return i;
86     for ( std::size_t i = lower; i < index; ++i )
87         if (my_slots[i].try_occupy()) return i;
88     return out_of_arena;
89 }
90 
91 template <bool as_worker>
92 std::size_t arena::occupy_free_slot(thread_data& tls) {
93     // Firstly, masters try to occupy reserved slots
94     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
95     if ( index == out_of_arena ) {
96         // Secondly, all threads try to occupy all non-reserved slots
97         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
98         // Likely this arena is already saturated
99         if ( index == out_of_arena )
100             return out_of_arena;
101     }
102 
103     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
104     return index;
105 }
106 
107 std::uintptr_t arena::calculate_stealing_threshold() {
108     stack_anchor_type anchor;
109     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
110 }
111 
112 void arena::process(thread_data& tls) {
113     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
114     __TBB_ASSERT( is_alive(my_guard), nullptr);
115     __TBB_ASSERT( my_num_slots > 1, nullptr);
116 
117     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
118     if (index == out_of_arena) {
119         on_thread_leaving<ref_worker>();
120         return;
121     }
122     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
123     tls.attach_arena(*this, index);
124 
125     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
126     task_disp.set_stealing_threshold(calculate_stealing_threshold());
127     __TBB_ASSERT(task_disp.can_steal(), nullptr);
128     tls.attach_task_dispatcher(task_disp);
129 
130     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
131     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
132 
133     // Waiting on special object tied to this arena
134     outermost_worker_waiter waiter(*this);
135     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
136     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
137     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
138     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
139 
140     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
141     tls.my_last_observer = nullptr;
142 
143     task_disp.set_stealing_threshold(0);
144     tls.detach_task_dispatcher();
145 
146     // Arena slot detach (arena may be used in market::process)
147     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
148     tls.my_arena_slot->release();
149     tls.my_arena_slot = nullptr;
150     tls.my_inbox.detach();
151     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
152     __TBB_ASSERT(is_alive(my_guard), nullptr);
153 
154     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
155     // that arena may be temporarily left unpopulated by threads. See comments in
156     // arena::on_thread_leaving() for more details.
157     on_thread_leaving<ref_worker>();
158     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
159 }
160 
161 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
162 {
163     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
164     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
165     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
166     my_market = &m;
167     my_limit = 1;
168     // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks).
169     my_num_slots = num_arena_slots(num_slots);
170     my_num_reserved_slots = num_reserved_slots;
171     my_max_num_workers = num_slots-num_reserved_slots;
172     my_priority_level = priority_level;
173     my_references = ref_external; // accounts for the master
174     my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
175     my_observers.my_arena = this;
176     my_co_cache.init(4 * num_slots);
177     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
178     // Initialize the default context. It should be allocated before task_dispatch construction.
179     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
180         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
181     // Construct slots. Mark internal synchronization elements for the tools.
182     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
183     for( unsigned i = 0; i < my_num_slots; ++i ) {
184         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
185         __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
186         __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
187         mailbox(i).construct();
188         my_slots[i].init_task_streams(i);
189         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
190         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
191     }
192     my_fifo_task_stream.initialize(my_num_slots);
193     my_resume_task_stream.initialize(my_num_slots);
194 #if __TBB_PREVIEW_CRITICAL_TASKS
195     my_critical_task_stream.initialize(my_num_slots);
196 #endif
197 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
198     my_local_concurrency_mode = false;
199     my_global_concurrency_mode.store(false, std::memory_order_relaxed);
200 #endif
201 }
202 
203 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
204                               unsigned priority_level )
205 {
206     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
207     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
208     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
209     std::size_t n = allocation_size(num_arena_slots(num_slots));
210     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
211     // Zero all slots to indicate that they are empty
212     std::memset( storage, 0, n );
213     return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) )
214         arena(m, num_slots, num_reserved_slots, priority_level);
215 }
216 
217 void arena::free_arena () {
218     __TBB_ASSERT( is_alive(my_guard), NULL );
219     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
220     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
221     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
222                   "Inconsistent state of a dying arena" );
223 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
224     __TBB_ASSERT( !my_global_concurrency_mode, NULL );
225 #endif
226     poison_value( my_guard );
227     std::intptr_t drained = 0;
228     for ( unsigned i = 0; i < my_num_slots; ++i ) {
229         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
230         // TODO: understand the assertion and modify
231         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
232         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
233         my_slots[i].free_task_pool();
234         drained += mailbox(i).drain();
235         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
236     }
237     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
238     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
239     // Cleanup coroutines/schedulers cache
240     my_co_cache.cleanup();
241     my_default_ctx->~task_group_context();
242     cache_aligned_deallocate(my_default_ctx);
243 #if __TBB_PREVIEW_CRITICAL_TASKS
244     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
245 #endif
246     // remove an internal reference
247     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
248     if ( !my_observers.empty() ) {
249         my_observers.clear();
250     }
251     void* storage  = &mailbox(my_num_slots-1);
252     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL );
253     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
254     this->~arena();
255 #if TBB_USE_ASSERT > 1
256     std::memset( storage, 0, allocation_size(my_num_slots) );
257 #endif /* TBB_USE_ASSERT */
258     cache_aligned_deallocate( storage );
259 }
260 
261 bool arena::has_enqueued_tasks() {
262     return !my_fifo_task_stream.empty();
263 }
264 
265 bool arena::is_out_of_work() {
266     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
267     for(;;) {
268         switch( my_pool_state.load(std::memory_order_acquire) ) {
269             case SNAPSHOT_EMPTY:
270                 return true;
271             case SNAPSHOT_FULL: {
272                 // Use unique id for "busy" in order to avoid ABA problems.
273                 const pool_state_t busy = pool_state_t(&busy);
274                 // Helper for CAS execution
275                 pool_state_t expected_state;
276 
277                 // Request permission to take snapshot
278                 expected_state = SNAPSHOT_FULL;
279                 if( my_pool_state.compare_exchange_strong( expected_state, busy ) ) {
280                     // Got permission. Take the snapshot.
281                     // NOTE: This is not a lock, as the state can be set to FULL at
282                     //       any moment by a thread that spawns/enqueues new task.
283                     std::size_t n = my_limit.load(std::memory_order_acquire);
284                     // Make local copies of volatile parameters. Their change during
285                     // snapshot taking procedure invalidates the attempt, and returns
286                     // this thread into the dispatch loop.
287                     std::size_t k;
288                     for( k=0; k<n; ++k ) {
289                         if( my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
290                             my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed) )
291                         {
292                             // k-th primary task pool is nonempty and does contain tasks.
293                             break;
294                         }
295                         if( my_pool_state.load(std::memory_order_acquire)!=busy )
296                             return false; // the work was published
297                     }
298                     __TBB_ASSERT( k <= n, NULL );
299                     bool work_absent = k == n;
300                     // Test and test-and-set.
301                     if( my_pool_state.load(std::memory_order_acquire)==busy ) {
302                         bool no_stream_tasks = my_fifo_task_stream.empty() && my_resume_task_stream.empty();
303 #if __TBB_PREVIEW_CRITICAL_TASKS
304                         no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
305 #endif
306                         work_absent = work_absent && no_stream_tasks;
307                         if( work_absent ) {
308                                 // save current demand value before setting SNAPSHOT_EMPTY,
309                                 // to avoid race with advertise_new_work.
310                                 int current_demand = (int)my_max_num_workers;
311                                 expected_state = busy;
312                                 if( my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_EMPTY ) ) {
313                                     // This thread transitioned pool to empty state, and thus is
314                                     // responsible for telling the market that there is no work to do.
315                                     my_market->adjust_demand( *this, -current_demand );
316                                     return true;
317                                 }
318                                 return false;
319                         }
320                         // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
321                         expected_state = busy;
322                         my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_FULL );
323                     }
324                 }
325                 return false;
326             }
327             default:
328                 // Another thread is taking a snapshot.
329                 return false;
330         }
331     }
332 }
333 
334 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
335     task_group_context_impl::bind_to(ctx, &td);
336     task_accessor::context(t) = &ctx;
337     task_accessor::isolation(t) = no_isolation;
338     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
339     advertise_new_work<work_enqueued>();
340 }
341 
342 } // namespace r1
343 } // namespace detail
344 } // namespace tbb
345 
346 // Enable task_arena.h
347 #include "oneapi/tbb/task_arena.h" // task_arena_base
348 
349 namespace tbb {
350 namespace detail {
351 namespace r1 {
352 
353 #if TBB_USE_ASSERT
354 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
355     bool is_arena_priority_correct =
356         a_priority == tbb::task_arena::priority::high   ||
357         a_priority == tbb::task_arena::priority::normal ||
358         a_priority == tbb::task_arena::priority::low;
359     __TBB_ASSERT( is_arena_priority_correct,
360                   "Task arena priority should be equal to one of the predefined values." );
361 }
362 #else
363 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
364 #endif
365 
366 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
367     assert_arena_priority_valid( a_priority );
368     return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
369 }
370 
371 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
372     auto priority = tbb::task_arena::priority(
373         (market::num_priority_levels - priority_level) * d1::priority_stride
374     );
375     assert_arena_priority_valid( priority );
376     return priority;
377 }
378 
379 struct task_arena_impl {
380     static void initialize(d1::task_arena_base&);
381     static void terminate(d1::task_arena_base&);
382     static bool attach(d1::task_arena_base&);
383     static void execute(d1::task_arena_base&, d1::delegate_base&);
384     static void wait(d1::task_arena_base&);
385     static int max_concurrency(const d1::task_arena_base*);
386     static void enqueue(d1::task&, d1::task_arena_base*);
387 };
388 
389 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
390     task_arena_impl::initialize(ta);
391 }
392 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
393     task_arena_impl::terminate(ta);
394 }
395 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
396     return task_arena_impl::attach(ta);
397 }
398 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
399     task_arena_impl::execute(ta, d);
400 }
401 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
402     task_arena_impl::wait(ta);
403 }
404 
405 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
406     return task_arena_impl::max_concurrency(ta);
407 }
408 
409 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
410     task_arena_impl::enqueue(t, ta);
411 }
412 
413 void task_arena_impl::initialize(d1::task_arena_base& ta) {
414     governor::one_time_init();
415     if (ta.my_max_concurrency < 1) {
416 #if __TBB_NUMA_SUPPORT
417         ta.my_max_concurrency = numa_default_concurrency(ta.my_numa_id);
418 #else /*__TBB_NUMA_SUPPORT*/
419         ta.my_max_concurrency = (int)governor::default_num_threads();
420 #endif /*__TBB_NUMA_SUPPORT*/
421     }
422     __TBB_ASSERT(ta.my_master_slots <= (unsigned)ta.my_max_concurrency,
423         "Number of slots reserved for master should not exceed arena concurrency");
424 
425     __TBB_ASSERT(ta.my_arena == nullptr, "Arena already initialized");
426     unsigned priority_level = arena_priority_level(ta.my_priority);
427     ta.my_arena = market::create_arena(ta.my_max_concurrency, ta.my_master_slots, priority_level, 0);
428     // add an internal market reference; a public reference was added in create_arena
429     market::global_market( /*is_public=*/false);
430 #if __TBB_NUMA_SUPPORT
431     ta.my_arena->my_numa_binding_observer = construct_binding_observer(
432         static_cast<d1::task_arena*>(&ta), ta.my_numa_id, ta.my_arena->my_num_slots);
433 #endif /*__TBB_NUMA_SUPPORT*/
434 }
435 
436 void task_arena_impl::terminate(d1::task_arena_base& ta) {
437     assert_pointer_valid(ta.my_arena);
438 #if __TBB_NUMA_SUPPORT
439     if(ta.my_arena->my_numa_binding_observer != nullptr ) {
440         destroy_binding_observer(ta.my_arena->my_numa_binding_observer);
441         ta.my_arena->my_numa_binding_observer = nullptr;
442     }
443 #endif /*__TBB_NUMA_SUPPORT*/
444     ta.my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
445     ta.my_arena->on_thread_leaving<arena::ref_external>();
446     ta.my_arena = nullptr;
447 }
448 
449 bool task_arena_impl::attach(d1::task_arena_base& ta) {
450     __TBB_ASSERT(!ta.my_arena, nullptr);
451     thread_data* td = governor::get_thread_data_if_initialized();
452     if( td && td->my_arena ) {
453         ta.my_arena = td->my_arena;
454         // There is an active arena to attach to.
455         // It's still used by s, so won't be destroyed right away.
456         __TBB_ASSERT(ta.my_arena->my_references > 0, NULL );
457         ta.my_arena->my_references += arena::ref_external;
458         ta.my_master_slots = ta.my_arena->my_num_reserved_slots;
459         ta.my_priority = arena_priority(ta.my_arena->my_priority_level);
460         ta.my_max_concurrency = ta.my_master_slots + ta.my_arena->my_max_num_workers;
461         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == ta.my_arena->my_num_slots, NULL);
462         // increases market's ref count for task_arena
463         market::global_market( /*is_public=*/true );
464         return true;
465     }
466     return false;
467 }
468 
469 void task_arena_impl::enqueue(d1::task& t, d1::task_arena_base* ta) {
470     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
471     assert_pointers_valid(ta, ta->my_arena, ta->my_arena->my_default_ctx, td);
472     // Is there a better place for checking the state of my_default_ctx?
473      __TBB_ASSERT(!ta->my_arena->my_default_ctx->is_group_execution_cancelled(),
474                   "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?");
475      ta->my_arena->enqueue_task(t, *ta->my_arena->my_default_ctx, *td);
476 }
477 
478 class nested_arena_context : no_copy {
479 public:
480     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
481         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
482     {
483         if (td.my_arena != &nested_arena) {
484             m_orig_arena = td.my_arena;
485             m_orig_slot_index = td.my_arena_index;
486             m_orig_last_observer = td.my_last_observer;
487 
488             td.detach_task_dispatcher();
489             td.attach_arena(nested_arena, slot_index);
490             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
491             task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold);
492             td.attach_task_dispatcher(task_disp);
493 
494             // If the calling thread occupies the slots out of master reserve we need to notify the
495             // market that this arena requires one worker less.
496             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
497                 td.my_arena->my_market->adjust_demand(*td.my_arena, -1);
498             }
499 
500             td.my_last_observer = nullptr;
501             // The task_arena::execute method considers each calling thread as a master.
502             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
503         }
504 
505         m_task_dispatcher = td.my_task_dispatcher;
506         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
507         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
508         m_task_dispatcher->m_properties.critical_task_allowed = true;
509 
510         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
511         ed_ext.context = td.my_arena->my_default_ctx;
512         ed_ext.original_slot = td.my_arena_index;
513         ed_ext.affinity_slot = d1::no_slot;
514         ed_ext.task_disp = td.my_task_dispatcher;
515         ed_ext.isolation = no_isolation;
516 
517         __TBB_ASSERT(td.my_arena_slot, nullptr);
518         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
519         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
520     }
521     ~nested_arena_context() {
522         thread_data& td = *m_task_dispatcher->m_thread_data;
523         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
524         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
525         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
526         if (m_orig_arena) {
527             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
528             td.my_last_observer = m_orig_last_observer;
529 
530             // Notify the market that this thread releasing a one slot
531             // that can be used by a worker thread.
532             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
533                 td.my_arena->my_market->adjust_demand(*td.my_arena, 1);
534             }
535 
536             td.my_task_dispatcher->set_stealing_threshold(0);
537             td.detach_task_dispatcher();
538             td.my_arena_slot->release();
539             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
540 
541             td.attach_arena(*m_orig_arena, m_orig_slot_index);
542             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
543         }
544         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
545     }
546 
547 private:
548     execution_data_ext    m_orig_execute_data_ext{};
549     arena*              m_orig_arena{ nullptr };
550     observer_proxy*     m_orig_last_observer{ nullptr };
551     task_dispatcher*    m_task_dispatcher{ nullptr };
552     unsigned            m_orig_slot_index{};
553     bool                m_orig_fifo_tasks_allowed{};
554     bool                m_orig_critical_task_allowed{};
555 };
556 
557 class delegated_task : public d1::task {
558     d1::delegate_base&  m_delegate;
559     concurrent_monitor& m_monitor;
560     d1::wait_context&   m_wait_ctx;
561     std::atomic<bool>   m_completed;
562     d1::task* execute(d1::execution_data& ed) override {
563         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
564         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
565         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
566             "The execute data shall point to the current task dispatcher execute data");
567         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
568 
569         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
570         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
571         try_call([&] {
572             m_delegate();
573         }).on_completion([&] {
574             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
575             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
576         });
577 
578         finalize();
579         return nullptr;
580     }
581     d1::task* cancel(d1::execution_data&) override {
582         finalize();
583         return nullptr;
584     }
585     void finalize() {
586         m_wait_ctx.release(); // must precede the wakeup
587         m_monitor.notify([this](std::uintptr_t ctx) {
588             return ctx == std::uintptr_t(&m_delegate);
589         }); // do not relax, it needs a fence!
590         m_completed.store(true, std::memory_order_release);
591     }
592 public:
593     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
594         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
595     ~delegated_task() {
596         // The destructor can be called earlier than the m_monitor is notified
597         // because the waiting thread can be released after m_wait_ctx.release_wait.
598         // To close that race we wait for the m_completed signal.
599         spin_wait_until_eq(m_completed, true);
600     }
601 };
602 
603 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
604     __TBB_ASSERT(ta.my_arena != nullptr, nullptr);
605     thread_data* td = governor::get_thread_data();
606 
607     bool same_arena = td->my_arena == ta.my_arena;
608     std::size_t index1 = td->my_arena_index;
609     if (!same_arena) {
610         index1 = ta.my_arena->occupy_free_slot</*as_worker */false>(*td);
611         if (index1 == arena::out_of_arena) {
612             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
613             d1::wait_context wo(1);
614             d1::task_group_context exec_context(d1::task_group_context::isolated);
615             task_group_context_impl::copy_fp_settings(exec_context, *ta.my_arena->my_default_ctx);
616 
617             delegated_task dt(d, ta.my_arena->my_exit_monitors, wo);
618             ta.my_arena->enqueue_task( dt, exec_context, *td);
619             size_t index2 = arena::out_of_arena;
620             do {
621                 ta.my_arena->my_exit_monitors.prepare_wait(waiter);
622                 if (!wo.continue_execution()) {
623                     ta.my_arena->my_exit_monitors.cancel_wait(waiter);
624                     break;
625                 }
626                 index2 = ta.my_arena->occupy_free_slot</*as_worker*/false>(*td);
627                 if (index2 != arena::out_of_arena) {
628                     ta.my_arena->my_exit_monitors.cancel_wait(waiter);
629                     nested_arena_context scope(*td, *ta.my_arena, index2 );
630                     r1::wait(wo, exec_context);
631                     __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
632                     break;
633                 }
634                 ta.my_arena->my_exit_monitors.commit_wait(waiter);
635             } while (wo.continue_execution());
636             if (index2 == arena::out_of_arena) {
637                 // notify a waiting thread even if this thread did not enter arena,
638                 // in case it was woken by a leaving thread but did not need to enter
639                 ta.my_arena->my_exit_monitors.notify_one(); // do not relax!
640             }
641             // process possible exception
642             if (exec_context.my_exception) {
643                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
644                 exec_context.my_exception->throw_self();
645             }
646             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
647             return;
648         } // if (index1 == arena::out_of_arena)
649     } // if (!same_arena)
650 
651     context_guard_helper</*report_tasks=*/false> context_guard;
652     context_guard.set_ctx(ta.my_arena->my_default_ctx);
653     nested_arena_context scope(*td, *ta.my_arena, index1);
654 #if _WIN64
655     try {
656 #endif
657         d();
658         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
659 #if _WIN64
660     } catch (...) {
661         context_guard.restore_default();
662         throw;
663     }
664 #endif
665 }
666 
667 void task_arena_impl::wait(d1::task_arena_base& ta) {
668     __TBB_ASSERT(ta.my_arena != nullptr, nullptr);
669     thread_data* td = governor::get_thread_data();
670     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
671     __TBB_ASSERT(td->my_arena != ta.my_arena || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
672     if (ta.my_arena->my_max_num_workers != 0) {
673         while (ta.my_arena->num_workers_active() || ta.my_arena->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
674             yield();
675         }
676     }
677 }
678 
679 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
680     arena* a = nullptr;
681     if( ta ) // for special cases of ta->max_concurrency()
682         a = ta->my_arena;
683     else if( thread_data* td = governor::get_thread_data_if_initialized() )
684         a = td->my_arena; // the current arena if any
685 
686     if( a ) { // Get parameters from the arena
687         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
688         return a->my_num_reserved_slots + a->my_max_num_workers;
689     }
690 
691     if (ta && ta->my_max_concurrency == 1) {
692         return 1;
693     }
694 
695     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL );
696     return int(governor::default_num_threads());
697 }
698 
699 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
700     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
701     thread_data* tls = governor::get_thread_data();
702     assert_pointers_valid(tls, tls->my_task_dispatcher);
703     task_dispatcher* dispatcher = tls->my_task_dispatcher;
704     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
705     try_call([&] {
706         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
707         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
708         // Save the current isolation value and set new one
709         previous_isolation = dispatcher->set_isolation(current_isolation);
710         // Isolation within this callable
711         d();
712     }).on_completion([&] {
713         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL);
714         dispatcher->set_isolation(previous_isolation);
715     });
716 }
717 
718 } // namespace r1
719 } // namespace detail
720 } // namespace tbb
721 
722