xref: /oneTBB/src/tbb/arena.cpp (revision 07300f7e)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "arena.h"
20 #include "itt_notify.h"
21 #include "semaphore.h"
22 #include "waiters.h"
23 #include "oneapi/tbb/detail/_task.h"
24 #include "oneapi/tbb/info.h"
25 #include "oneapi/tbb/tbb_allocator.h"
26 
27 #include <atomic>
28 #include <cstring>
29 #include <functional>
30 
31 namespace tbb {
32 namespace detail {
33 namespace r1 {
34 
35 #if __TBB_ARENA_BINDING
36 class numa_binding_observer : public tbb::task_scheduler_observer {
37     binding_handler* my_binding_handler;
38 public:
39     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
40         : task_scheduler_observer(*ta)
41         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
42     {}
43 
44     void on_scheduler_entry( bool ) override {
45         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
46     }
47 
48     void on_scheduler_exit( bool ) override {
49         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
50     }
51 
52     ~numa_binding_observer(){
53         destroy_binding_handler(my_binding_handler);
54     }
55 };
56 
57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
58     numa_binding_observer* binding_observer = nullptr;
59     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
60         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
61         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
62         binding_observer->observe(true);
63     }
64     return binding_observer;
65 }
66 
67 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
68     __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer");
69     binding_observer->observe(false);
70     binding_observer->~numa_binding_observer();
71     deallocate_memory(binding_observer);
72 }
73 #endif /*!__TBB_ARENA_BINDING*/
74 
75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
76     if ( lower >= upper ) return out_of_arena;
77     // Start search for an empty slot from the one we occupied the last time
78     std::size_t index = tls.my_arena_index;
79     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
80     __TBB_ASSERT( index >= lower && index < upper, NULL );
81     // Find a free slot
82     for ( std::size_t i = index; i < upper; ++i )
83         if (my_slots[i].try_occupy()) return i;
84     for ( std::size_t i = lower; i < index; ++i )
85         if (my_slots[i].try_occupy()) return i;
86     return out_of_arena;
87 }
88 
89 template <bool as_worker>
90 std::size_t arena::occupy_free_slot(thread_data& tls) {
91     // Firstly, external threads try to occupy reserved slots
92     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
93     if ( index == out_of_arena ) {
94         // Secondly, all threads try to occupy all non-reserved slots
95         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
96         // Likely this arena is already saturated
97         if ( index == out_of_arena )
98             return out_of_arena;
99     }
100 
101     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
102     return index;
103 }
104 
105 std::uintptr_t arena::calculate_stealing_threshold() {
106     stack_anchor_type anchor;
107     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size());
108 }
109 
110 void arena::process(thread_data& tls) {
111     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
112     __TBB_ASSERT( is_alive(my_guard), nullptr);
113     __TBB_ASSERT( my_num_slots > 1, nullptr);
114 
115     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
116     if (index == out_of_arena) {
117         on_thread_leaving<ref_worker>();
118         return;
119     }
120     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
121     tls.attach_arena(*this, index);
122     // worker thread enters the dispatch loop to look for a work
123     tls.my_inbox.set_is_idle(true);
124     if (tls.my_arena_slot->is_task_pool_published()) {
125             tls.my_inbox.set_is_idle(false);
126     }
127 
128     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
129     task_disp.set_stealing_threshold(calculate_stealing_threshold());
130     __TBB_ASSERT(task_disp.can_steal(), nullptr);
131     tls.attach_task_dispatcher(task_disp);
132 
133     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
134     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
135 
136     // Waiting on special object tied to this arena
137     outermost_worker_waiter waiter(*this);
138     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
139     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
140     // attached to it.
141     tls.my_inbox.set_is_idle(true);
142 
143     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
144     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
145     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
146 
147     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
148     tls.my_last_observer = nullptr;
149 
150     task_disp.set_stealing_threshold(0);
151     tls.detach_task_dispatcher();
152 
153     // Arena slot detach (arena may be used in market::process)
154     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
155     tls.my_arena_slot->release();
156     tls.my_arena_slot = nullptr;
157     tls.my_inbox.detach();
158     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
159     __TBB_ASSERT(is_alive(my_guard), nullptr);
160 
161     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
162     // that arena may be temporarily left unpopulated by threads. See comments in
163     // arena::on_thread_leaving() for more details.
164     on_thread_leaving<ref_worker>();
165     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
166 }
167 
168 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level )
169 {
170     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
171     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
172     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
173     my_market = &m;
174     my_limit = 1;
175     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
176     my_num_slots = num_arena_slots(num_slots);
177     my_num_reserved_slots = num_reserved_slots;
178     my_max_num_workers = num_slots-num_reserved_slots;
179     my_priority_level = priority_level;
180     my_references = ref_external; // accounts for the external thread
181     my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed);
182     my_observers.my_arena = this;
183     my_co_cache.init(4 * num_slots);
184     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL );
185     // Initialize the default context. It should be allocated before task_dispatch construction.
186     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
187         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
188     // Construct slots. Mark internal synchronization elements for the tools.
189     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
190     for( unsigned i = 0; i < my_num_slots; ++i ) {
191         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL );
192         __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL );
193         __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL );
194         mailbox(i).construct();
195         my_slots[i].init_task_streams(i);
196         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
197         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
198     }
199     my_fifo_task_stream.initialize(my_num_slots);
200     my_resume_task_stream.initialize(my_num_slots);
201 #if __TBB_PREVIEW_CRITICAL_TASKS
202     my_critical_task_stream.initialize(my_num_slots);
203 #endif
204 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
205     my_local_concurrency_requests = 0;
206     my_local_concurrency_flag.clear();
207     my_global_concurrency_mode.store(false, std::memory_order_relaxed);
208 #endif
209 }
210 
211 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots,
212                               unsigned priority_level )
213 {
214     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
215     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
216     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
217     std::size_t n = allocation_size(num_arena_slots(num_slots));
218     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
219     // Zero all slots to indicate that they are empty
220     std::memset( storage, 0, n );
221     return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) )
222         arena(m, num_slots, num_reserved_slots, priority_level);
223 }
224 
225 void arena::free_arena () {
226     __TBB_ASSERT( is_alive(my_guard), NULL );
227     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
228     __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
229     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers,
230                   "Inconsistent state of a dying arena" );
231 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
232     __TBB_ASSERT( !my_global_concurrency_mode, NULL );
233 #endif
234     poison_value( my_guard );
235     for ( unsigned i = 0; i < my_num_slots; ++i ) {
236         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
237         // TODO: understand the assertion and modify
238         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL );
239         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty
240         my_slots[i].free_task_pool();
241         mailbox(i).drain();
242         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
243     }
244     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
245     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
246     // Cleanup coroutines/schedulers cache
247     my_co_cache.cleanup();
248     my_default_ctx->~task_group_context();
249     cache_aligned_deallocate(my_default_ctx);
250 #if __TBB_PREVIEW_CRITICAL_TASKS
251     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
252 #endif
253     // remove an internal reference
254     my_market->release( /*is_public=*/false, /*blocking_terminate=*/false );
255     if ( !my_observers.empty() ) {
256         my_observers.clear();
257     }
258     void* storage  = &mailbox(my_num_slots-1);
259     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL );
260     __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL );
261     this->~arena();
262 #if TBB_USE_ASSERT > 1
263     std::memset( storage, 0, allocation_size(my_num_slots) );
264 #endif /* TBB_USE_ASSERT */
265     cache_aligned_deallocate( storage );
266 }
267 
268 bool arena::has_enqueued_tasks() {
269     return !my_fifo_task_stream.empty();
270 }
271 
272 bool arena::is_out_of_work() {
273 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
274     if (my_local_concurrency_flag.try_clear_if([this] {
275         return !has_enqueued_tasks();
276     })) {
277         my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true);
278     }
279 #endif
280 
281     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
282     switch (my_pool_state.load(std::memory_order_acquire)) {
283     case SNAPSHOT_EMPTY:
284         return true;
285     case SNAPSHOT_FULL: {
286         // Use unique id for "busy" in order to avoid ABA problems.
287         const pool_state_t busy = pool_state_t(&busy);
288         // Helper for CAS execution
289         pool_state_t expected_state;
290 
291         // Request permission to take snapshot
292         expected_state = SNAPSHOT_FULL;
293         if (my_pool_state.compare_exchange_strong(expected_state, busy)) {
294             // Got permission. Take the snapshot.
295             // NOTE: This is not a lock, as the state can be set to FULL at
296             //       any moment by a thread that spawns/enqueues new task.
297             std::size_t n = my_limit.load(std::memory_order_acquire);
298             // Make local copies of volatile parameters. Their change during
299             // snapshot taking procedure invalidates the attempt, and returns
300             // this thread into the dispatch loop.
301             std::size_t k;
302             for (k = 0; k < n; ++k) {
303                 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool &&
304                     my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed))
305                 {
306                     // k-th primary task pool is nonempty and does contain tasks.
307                     break;
308                 }
309                 if (my_pool_state.load(std::memory_order_acquire) != busy)
310                     return false; // the work was published
311             }
312             bool work_absent = k == n;
313             // Test and test-and-set.
314             if (my_pool_state.load(std::memory_order_acquire) == busy) {
315                 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty();
316 #if __TBB_PREVIEW_CRITICAL_TASKS
317                 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty();
318 #endif
319                 work_absent = work_absent && no_stream_tasks;
320                 if (work_absent) {
321                     // save current demand value before setting SNAPSHOT_EMPTY,
322                     // to avoid race with advertise_new_work.
323                     int current_demand = (int)my_max_num_workers;
324                     expected_state = busy;
325                     if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) {
326                         // This thread transitioned pool to empty state, and thus is
327                         // responsible for telling the market that there is no work to do.
328                         my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false);
329                         return true;
330                     }
331                     return false;
332                 }
333                 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it.
334                 expected_state = busy;
335                 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL);
336             }
337         }
338         return false;
339     }
340     default:
341         // Another thread is taking a snapshot.
342         return false;
343     }
344 }
345 
346 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
347     task_group_context_impl::bind_to(ctx, &td);
348     task_accessor::context(t) = &ctx;
349     task_accessor::isolation(t) = no_isolation;
350     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
351     advertise_new_work<work_enqueued>();
352 }
353 
354 } // namespace r1
355 } // namespace detail
356 } // namespace tbb
357 
358 // Enable task_arena.h
359 #include "oneapi/tbb/task_arena.h" // task_arena_base
360 
361 namespace tbb {
362 namespace detail {
363 namespace r1 {
364 
365 #if TBB_USE_ASSERT
366 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
367     bool is_arena_priority_correct =
368         a_priority == tbb::task_arena::priority::high   ||
369         a_priority == tbb::task_arena::priority::normal ||
370         a_priority == tbb::task_arena::priority::low;
371     __TBB_ASSERT( is_arena_priority_correct,
372                   "Task arena priority should be equal to one of the predefined values." );
373 }
374 #else
375 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
376 #endif
377 
378 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
379     assert_arena_priority_valid( a_priority );
380     return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
381 }
382 
383 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
384     auto priority = tbb::task_arena::priority(
385         (market::num_priority_levels - priority_level) * d1::priority_stride
386     );
387     assert_arena_priority_valid( priority );
388     return priority;
389 }
390 
391 struct task_arena_impl {
392     static void initialize(d1::task_arena_base&);
393     static void terminate(d1::task_arena_base&);
394     static bool attach(d1::task_arena_base&);
395     static void execute(d1::task_arena_base&, d1::delegate_base&);
396     static void wait(d1::task_arena_base&);
397     static int max_concurrency(const d1::task_arena_base*);
398     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
399 };
400 
401 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
402     task_arena_impl::initialize(ta);
403 }
404 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
405     task_arena_impl::terminate(ta);
406 }
407 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
408     return task_arena_impl::attach(ta);
409 }
410 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
411     task_arena_impl::execute(ta, d);
412 }
413 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
414     task_arena_impl::wait(ta);
415 }
416 
417 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
418     return task_arena_impl::max_concurrency(ta);
419 }
420 
421 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
422     task_arena_impl::enqueue(t, nullptr, ta);
423 }
424 
425 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
426     task_arena_impl::enqueue(t, &ctx, ta);
427 }
428 
429 void task_arena_impl::initialize(d1::task_arena_base& ta) {
430     governor::one_time_init();
431     if (ta.my_max_concurrency < 1) {
432 #if __TBB_ARENA_BINDING
433 
434 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
435         d1::constraints arena_constraints = d1::constraints{}
436             .set_core_type(ta.core_type())
437             .set_max_threads_per_core(ta.max_threads_per_core())
438             .set_numa_id(ta.my_numa_id);
439         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
440 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
441         ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id);
442 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
443 
444 #else /*!__TBB_ARENA_BINDING*/
445         ta.my_max_concurrency = (int)governor::default_num_threads();
446 #endif /*!__TBB_ARENA_BINDING*/
447     }
448 
449     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
450     unsigned priority_level = arena_priority_level(ta.my_priority);
451     arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0);
452     ta.my_arena.store(a, std::memory_order_release);
453     // add an internal market reference; a public reference was added in create_arena
454     market::global_market( /*is_public=*/false);
455 #if __TBB_ARENA_BINDING
456     a->my_numa_binding_observer = construct_binding_observer(
457         static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
458 #endif /*__TBB_ARENA_BINDING*/
459 }
460 
461 void task_arena_impl::terminate(d1::task_arena_base& ta) {
462     arena* a = ta.my_arena.load(std::memory_order_relaxed);
463     assert_pointer_valid(a);
464 #if __TBB_ARENA_BINDING
465     if(a->my_numa_binding_observer != nullptr ) {
466         destroy_binding_observer(a->my_numa_binding_observer);
467         a->my_numa_binding_observer = nullptr;
468     }
469 #endif /*__TBB_ARENA_BINDING*/
470     a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false );
471     a->on_thread_leaving<arena::ref_external>();
472     ta.my_arena.store(nullptr, std::memory_order_relaxed);
473 }
474 
475 bool task_arena_impl::attach(d1::task_arena_base& ta) {
476     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
477     thread_data* td = governor::get_thread_data_if_initialized();
478     if( td && td->my_arena ) {
479         arena* a = td->my_arena;
480         // There is an active arena to attach to.
481         // It's still used by s, so won't be destroyed right away.
482         __TBB_ASSERT(a->my_references > 0, NULL );
483         a->my_references += arena::ref_external;
484         ta.my_num_reserved_slots = a->my_num_reserved_slots;
485         ta.my_priority = arena_priority(a->my_priority_level);
486         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
487         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL);
488         ta.my_arena.store(a, std::memory_order_release);
489         // increases market's ref count for task_arena
490         market::global_market( /*is_public=*/true );
491         return true;
492     }
493     return false;
494 }
495 
496 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
497     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
498     assert_pointer_valid(td, "thread_data pointer should not be null");
499     arena* a = ta ?
500               ta->my_arena.load(std::memory_order_relaxed)
501             : td->my_arena
502     ;
503     assert_pointer_valid(a, "arena pointer should not be null");
504     auto* ctx = c ? c : a->my_default_ctx;
505     assert_pointer_valid(ctx, "context pointer should not be null");
506     // Is there a better place for checking the state of ctx?
507      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
508                   "The task will not be executed because its task_group_context is cancelled.");
509      a->enqueue_task(t, *ctx, *td);
510 }
511 
512 class nested_arena_context : no_copy {
513 public:
514     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
515         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
516     {
517         if (td.my_arena != &nested_arena) {
518             m_orig_arena = td.my_arena;
519             m_orig_slot_index = td.my_arena_index;
520             m_orig_last_observer = td.my_last_observer;
521 
522             td.detach_task_dispatcher();
523             td.attach_arena(nested_arena, slot_index);
524             if (td.my_inbox.is_idle_state(true))
525                 td.my_inbox.set_is_idle(false);
526             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
527             task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold);
528             td.attach_task_dispatcher(task_disp);
529 
530             // If the calling thread occupies the slots out of external thread reserve we need to notify the
531             // market that this arena requires one worker less.
532             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
533                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false);
534             }
535 
536             td.my_last_observer = nullptr;
537             // The task_arena::execute method considers each calling thread as an external thread.
538             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
539         }
540 
541         m_task_dispatcher = td.my_task_dispatcher;
542         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
543         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
544         m_task_dispatcher->m_properties.critical_task_allowed = true;
545 
546         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
547         ed_ext.context = td.my_arena->my_default_ctx;
548         ed_ext.original_slot = td.my_arena_index;
549         ed_ext.affinity_slot = d1::no_slot;
550         ed_ext.task_disp = td.my_task_dispatcher;
551         ed_ext.isolation = no_isolation;
552 
553         __TBB_ASSERT(td.my_arena_slot, nullptr);
554         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
555         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
556     }
557     ~nested_arena_context() {
558         thread_data& td = *m_task_dispatcher->m_thread_data;
559         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
560         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
561         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
562         if (m_orig_arena) {
563             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
564             td.my_last_observer = m_orig_last_observer;
565 
566             // Notify the market that this thread releasing a one slot
567             // that can be used by a worker thread.
568             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
569                 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false);
570             }
571 
572             td.my_task_dispatcher->set_stealing_threshold(0);
573             td.detach_task_dispatcher();
574             td.my_arena_slot->release();
575             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
576 
577             td.attach_arena(*m_orig_arena, m_orig_slot_index);
578             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
579             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
580         }
581         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
582     }
583 
584 private:
585     execution_data_ext    m_orig_execute_data_ext{};
586     arena*              m_orig_arena{ nullptr };
587     observer_proxy*     m_orig_last_observer{ nullptr };
588     task_dispatcher*    m_task_dispatcher{ nullptr };
589     unsigned            m_orig_slot_index{};
590     bool                m_orig_fifo_tasks_allowed{};
591     bool                m_orig_critical_task_allowed{};
592 };
593 
594 class delegated_task : public d1::task {
595     d1::delegate_base&  m_delegate;
596     concurrent_monitor& m_monitor;
597     d1::wait_context&   m_wait_ctx;
598     std::atomic<bool>   m_completed;
599     d1::task* execute(d1::execution_data& ed) override {
600         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
601         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
602         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
603             "The execute data shall point to the current task dispatcher execute data");
604         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
605 
606         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
607         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
608         try_call([&] {
609             m_delegate();
610         }).on_completion([&] {
611             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
612             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
613         });
614 
615         finalize();
616         return nullptr;
617     }
618     d1::task* cancel(d1::execution_data&) override {
619         finalize();
620         return nullptr;
621     }
622     void finalize() {
623         m_wait_ctx.release(); // must precede the wakeup
624         m_monitor.notify([this](std::uintptr_t ctx) {
625             return ctx == std::uintptr_t(&m_delegate);
626         }); // do not relax, it needs a fence!
627         m_completed.store(true, std::memory_order_release);
628     }
629 public:
630     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
631         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
632     ~delegated_task() {
633         // The destructor can be called earlier than the m_monitor is notified
634         // because the waiting thread can be released after m_wait_ctx.release_wait.
635         // To close that race we wait for the m_completed signal.
636         spin_wait_until_eq(m_completed, true);
637     }
638 };
639 
640 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
641     arena* a = ta.my_arena.load(std::memory_order_relaxed);
642     __TBB_ASSERT(a != nullptr, nullptr);
643     thread_data* td = governor::get_thread_data();
644 
645     bool same_arena = td->my_arena == a;
646     std::size_t index1 = td->my_arena_index;
647     if (!same_arena) {
648         index1 = a->occupy_free_slot</*as_worker */false>(*td);
649         if (index1 == arena::out_of_arena) {
650             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
651             d1::wait_context wo(1);
652             d1::task_group_context exec_context(d1::task_group_context::isolated);
653             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
654 
655             delegated_task dt(d, a->my_exit_monitors, wo);
656             a->enqueue_task( dt, exec_context, *td);
657             size_t index2 = arena::out_of_arena;
658             do {
659                 a->my_exit_monitors.prepare_wait(waiter);
660                 if (!wo.continue_execution()) {
661                     a->my_exit_monitors.cancel_wait(waiter);
662                     break;
663                 }
664                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
665                 if (index2 != arena::out_of_arena) {
666                     a->my_exit_monitors.cancel_wait(waiter);
667                     nested_arena_context scope(*td, *a, index2 );
668                     r1::wait(wo, exec_context);
669                     __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred
670                     break;
671                 }
672                 a->my_exit_monitors.commit_wait(waiter);
673             } while (wo.continue_execution());
674             if (index2 == arena::out_of_arena) {
675                 // notify a waiting thread even if this thread did not enter arena,
676                 // in case it was woken by a leaving thread but did not need to enter
677                 a->my_exit_monitors.notify_one(); // do not relax!
678             }
679             // process possible exception
680             if (exec_context.my_exception) {
681                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
682                 exec_context.my_exception->throw_self();
683             }
684             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
685             return;
686         } // if (index1 == arena::out_of_arena)
687     } // if (!same_arena)
688 
689     context_guard_helper</*report_tasks=*/false> context_guard;
690     context_guard.set_ctx(a->my_default_ctx);
691     nested_arena_context scope(*td, *a, index1);
692 #if _WIN64
693     try {
694 #endif
695         d();
696         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
697 #if _WIN64
698     } catch (...) {
699         context_guard.restore_default();
700         throw;
701     }
702 #endif
703 }
704 
705 void task_arena_impl::wait(d1::task_arena_base& ta) {
706     arena* a = ta.my_arena.load(std::memory_order_relaxed);
707     __TBB_ASSERT(a != nullptr, nullptr);
708     thread_data* td = governor::get_thread_data();
709     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
710     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
711     if (a->my_max_num_workers != 0) {
712         while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) {
713             yield();
714         }
715     }
716 }
717 
718 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
719     arena* a = nullptr;
720     if( ta ) // for special cases of ta->max_concurrency()
721         a = ta->my_arena.load(std::memory_order_relaxed);
722     else if( thread_data* td = governor::get_thread_data_if_initialized() )
723         a = td->my_arena; // the current arena if any
724 
725     if( a ) { // Get parameters from the arena
726         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL );
727         return a->my_num_reserved_slots + a->my_max_num_workers
728 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
729             + (a->my_local_concurrency_flag.test() ? 1 : 0)
730 #endif
731             ;
732     }
733 
734     if (ta && ta->my_max_concurrency == 1) {
735         return 1;
736     }
737 
738 #if __TBB_ARENA_BINDING
739     if (ta) {
740 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
741         d1::constraints arena_constraints = d1::constraints{}
742             .set_numa_id(ta->my_numa_id)
743             .set_core_type(ta->core_type())
744             .set_max_threads_per_core(ta->max_threads_per_core());
745         return (int)default_concurrency(arena_constraints);
746 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
747         return (int)default_concurrency(ta->my_numa_id);
748 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/
749     }
750 #endif /*!__TBB_ARENA_BINDING*/
751 
752     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL );
753     return int(governor::default_num_threads());
754 }
755 
756 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
757     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
758     thread_data* tls = governor::get_thread_data();
759     assert_pointers_valid(tls, tls->my_task_dispatcher);
760     task_dispatcher* dispatcher = tls->my_task_dispatcher;
761     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
762     try_call([&] {
763         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
764         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
765         // Save the current isolation value and set new one
766         previous_isolation = dispatcher->set_isolation(current_isolation);
767         // Isolation within this callable
768         d();
769     }).on_completion([&] {
770         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL);
771         dispatcher->set_isolation(previous_isolation);
772     });
773 }
774 
775 } // namespace r1
776 } // namespace detail
777 } // namespace tbb
778