xref: /oneTBB/src/tbb/arena.cpp (revision f5fb3c91)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "threading_control.h"
20 #include "arena.h"
21 #include "itt_notify.h"
22 #include "semaphore.h"
23 #include "waiters.h"
24 #include "oneapi/tbb/detail/_task.h"
25 #include "oneapi/tbb/info.h"
26 #include "oneapi/tbb/tbb_allocator.h"
27 
28 #include <atomic>
29 #include <cstring>
30 #include <functional>
31 
32 namespace tbb {
33 namespace detail {
34 namespace r1 {
35 
36 #if __TBB_ARENA_BINDING
37 class numa_binding_observer : public tbb::task_scheduler_observer {
38     binding_handler* my_binding_handler;
39 public:
40     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
41         : task_scheduler_observer(*ta)
42         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
43     {}
44 
45     void on_scheduler_entry( bool ) override {
46         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
47     }
48 
49     void on_scheduler_exit( bool ) override {
50         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
51     }
52 
53     ~numa_binding_observer() override{
54         destroy_binding_handler(my_binding_handler);
55     }
56 };
57 
58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
59     numa_binding_observer* binding_observer = nullptr;
60     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
61         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
62         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
63         binding_observer->observe(true);
64     }
65     return binding_observer;
66 }
67 
68 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
69     __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
70     binding_observer->observe(false);
71     binding_observer->~numa_binding_observer();
72     deallocate_memory(binding_observer);
73 }
74 #endif /*!__TBB_ARENA_BINDING*/
75 
76 void arena::on_thread_leaving(unsigned ref_param) {
77     //
78     // Implementation of arena destruction synchronization logic contained various
79     // bugs/flaws at the different stages of its evolution, so below is a detailed
80     // description of the issues taken into consideration in the framework of the
81     // current design.
82     //
83     // In case of using fire-and-forget tasks (scheduled via task::enqueue())
84     // external thread is allowed to leave its arena before all its work is executed,
85     // and market may temporarily revoke all workers from this arena. Since revoked
86     // workers never attempt to reset arena state to EMPTY and cancel its request
87     // to RML for threads, the arena object is destroyed only when both the last
88     // thread is leaving it and arena's state is EMPTY (that is its external thread
89     // left and it does not contain any work).
90     // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
91     // be done here (or anywhere else in the external thread to that matter); doing so
92     // can result either in arena's premature destruction (at least without
93     // additional costly checks in workers) or in unnecessary arena state changes
94     // (and ensuing workers migration).
95     //
96     // A worker that checks for work presence and transitions arena to the EMPTY
97     // state (in snapshot taking procedure arena::out_of_work()) updates
98     // arena::my_pool_state first and only then arena::my_num_workers_requested.
99     // So the check for work absence must be done against the latter field.
100     //
101     // In a time window between decrementing the active threads count and checking
102     // if there is an outstanding request for workers. New worker thread may arrive,
103     // finish remaining work, set arena state to empty, and leave decrementing its
104     // refcount and destroying. Then the current thread will destroy the arena
105     // the second time. To preclude it a local copy of the outstanding request
106     // value can be stored before decrementing active threads count.
107     //
108     // But this technique may cause two other problem. When the stored request is
109     // zero, it is possible that arena still has threads and they can generate new
110     // tasks and thus re-establish non-zero requests. Then all the threads can be
111     // revoked (as described above) leaving this thread the last one, and causing
112     // it to destroy non-empty arena.
113     //
114     // The other problem takes place when the stored request is non-zero. Another
115     // thread may complete the work, set arena state to empty, and leave without
116     // arena destruction before this thread decrements the refcount. This thread
117     // cannot destroy the arena either. Thus the arena may be "orphaned".
118     //
119     // In both cases we cannot dereference arena pointer after the refcount is
120     // decremented, as our arena may already be destroyed.
121     //
122     // If this is the external thread, the market is protected by refcount to it.
123     // In case of workers market's liveness is ensured by the RML connection
124     // rundown protocol, according to which the client (i.e. the market) lives
125     // until RML server notifies it about connection termination, and this
126     // notification is fired only after all workers return into RML.
127     //
128     // Thus if we decremented refcount to zero we ask the market to check arena
129     // state (including the fact if it is alive) under the lock.
130     //
131 
132     __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
133 
134     // When there is no workers someone must free arena, as
135     // without workers, no one calls out_of_work().
136     if (ref_param == ref_external && !my_mandatory_concurrency.test()) {
137         out_of_work();
138     }
139 
140     threading_control* tc = my_threading_control;
141     auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client);
142     // Release our reference to sync with destroy_client
143     unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
144     // do not access `this` it might be destroyed already
145     if (remaining_ref == 0) {
146         if (tc->try_destroy_client(tc_client_snapshot)) {
147             // We are requested to destroy ourself
148             free_arena();
149         }
150     }
151 }
152 
153 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
154     if ( lower >= upper ) return out_of_arena;
155     // Start search for an empty slot from the one we occupied the last time
156     std::size_t index = tls.my_arena_index;
157     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
158     __TBB_ASSERT( index >= lower && index < upper, nullptr);
159     // Find a free slot
160     for ( std::size_t i = index; i < upper; ++i )
161         if (my_slots[i].try_occupy()) return i;
162     for ( std::size_t i = lower; i < index; ++i )
163         if (my_slots[i].try_occupy()) return i;
164     return out_of_arena;
165 }
166 
167 template <bool as_worker>
168 std::size_t arena::occupy_free_slot(thread_data& tls) {
169     // Firstly, external threads try to occupy reserved slots
170     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
171     if ( index == out_of_arena ) {
172         // Secondly, all threads try to occupy all non-reserved slots
173         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
174         // Likely this arena is already saturated
175         if ( index == out_of_arena )
176             return out_of_arena;
177     }
178 
179     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
180     return index;
181 }
182 
183 std::uintptr_t arena::calculate_stealing_threshold() {
184     stack_anchor_type anchor;
185     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size());
186 }
187 
188 void arena::process(thread_data& tls) {
189     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
190     __TBB_ASSERT( is_alive(my_guard), nullptr);
191     __TBB_ASSERT( my_num_slots >= 1, nullptr);
192 
193     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
194     if (index == out_of_arena) {
195         on_thread_leaving(ref_worker);
196         return;
197     }
198 
199     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
200     tls.attach_arena(*this, index);
201     // worker thread enters the dispatch loop to look for a work
202     tls.my_inbox.set_is_idle(true);
203     if (tls.my_arena_slot->is_task_pool_published()) {
204         tls.my_inbox.set_is_idle(false);
205     }
206 
207     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
208     tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
209     __TBB_ASSERT(task_disp.can_steal(), nullptr);
210 
211     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
212     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
213 
214     // Waiting on special object tied to this arena
215     outermost_worker_waiter waiter(*this);
216     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
217     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
218     // attached to it.
219     tls.my_inbox.set_is_idle(true);
220 
221     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
222     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
223     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
224 
225     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
226     tls.my_last_observer = nullptr;
227 
228     tls.leave_task_dispatcher();
229 
230     // Arena slot detach (arena may be used in market::process)
231     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
232     tls.my_arena_slot->release();
233     tls.my_arena_slot = nullptr;
234     tls.my_inbox.detach();
235     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
236     __TBB_ASSERT(is_alive(my_guard), nullptr);
237 
238     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
239     // that arena may be temporarily left unpopulated by threads. See comments in
240     // arena::on_thread_leaving() for more details.
241     on_thread_leaving(ref_worker);
242     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
243 }
244 
245 arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) {
246     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
247     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
248     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
249     my_threading_control = control;
250     my_limit = 1;
251     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
252     my_num_slots = num_arena_slots(num_slots, num_reserved_slots);
253     my_num_reserved_slots = num_reserved_slots;
254     my_max_num_workers = num_slots-num_reserved_slots;
255     my_priority_level = priority_level;
256     my_references = ref_external; // accounts for the external thread
257     my_observers.my_arena = this;
258     my_co_cache.init(4 * num_slots);
259     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
260     // Initialize the default context. It should be allocated before task_dispatch construction.
261     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
262         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
263     // Construct slots. Mark internal synchronization elements for the tools.
264     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
265     for( unsigned i = 0; i < my_num_slots; ++i ) {
266         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
267         __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
268         __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
269         mailbox(i).construct();
270         my_slots[i].init_task_streams(i);
271         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
272         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
273     }
274     my_fifo_task_stream.initialize(my_num_slots);
275     my_resume_task_stream.initialize(my_num_slots);
276 #if __TBB_PREVIEW_CRITICAL_TASKS
277     my_critical_task_stream.initialize(my_num_slots);
278 #endif
279     my_mandatory_requests = 0;
280 }
281 
282 arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
283                               unsigned priority_level)
284 {
285     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
286     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
287     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
288     std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots));
289     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
290     // Zero all slots to indicate that they are empty
291     std::memset( storage, 0, n );
292 
293     return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
294         arena(control, num_slots, num_reserved_slots, priority_level);
295 }
296 
297 void arena::free_arena () {
298     __TBB_ASSERT( is_alive(my_guard), nullptr);
299     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
300     __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
301     __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" );
302 #if __TBB_ARENA_BINDING
303     if (my_numa_binding_observer != nullptr) {
304         destroy_binding_observer(my_numa_binding_observer);
305         my_numa_binding_observer = nullptr;
306     }
307 #endif /*__TBB_ARENA_BINDING*/
308     poison_value( my_guard );
309     for ( unsigned i = 0; i < my_num_slots; ++i ) {
310         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
311         // TODO: understand the assertion and modify
312         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
313         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
314         my_slots[i].free_task_pool();
315         mailbox(i).drain();
316         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
317     }
318     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
319     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
320     // Cleanup coroutines/schedulers cache
321     my_co_cache.cleanup();
322     my_default_ctx->~task_group_context();
323     cache_aligned_deallocate(my_default_ctx);
324 #if __TBB_PREVIEW_CRITICAL_TASKS
325     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
326 #endif
327     // Clear enfources synchronization with observe(false)
328     my_observers.clear();
329 
330     void* storage  = &mailbox(my_num_slots-1);
331     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
332     this->~arena();
333 #if TBB_USE_ASSERT > 1
334     std::memset( storage, 0, allocation_size(my_num_slots) );
335 #endif /* TBB_USE_ASSERT */
336     cache_aligned_deallocate( storage );
337 }
338 
339 bool arena::has_enqueued_tasks() {
340     return !my_fifo_task_stream.empty();
341 }
342 
343 void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) {
344     my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta);
345 
346     if (wakeup_threads) {
347         // Notify all sleeping threads that work has appeared in the arena.
348         get_waiting_threads_monitor().notify([&] (market_context context) {
349             return this == context.my_arena_addr;
350         });
351     }
352 }
353 
354 bool arena::has_tasks() {
355     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
356     std::size_t n = my_limit.load(std::memory_order_acquire);
357     bool tasks_are_available = false;
358     for (std::size_t k = 0; k < n && !tasks_are_available; ++k) {
359         tasks_are_available = !my_slots[k].is_empty();
360     }
361     tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty();
362 #if __TBB_PREVIEW_CRITICAL_TASKS
363     tasks_are_available = tasks_are_available || !my_critical_task_stream.empty();
364 #endif
365     return tasks_are_available;
366 }
367 
368 void arena::out_of_work() {
369     // We should try unset my_pool_state first due to keep arena invariants in consistent state
370     // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant
371     bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); });
372     bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); });
373 
374     if (disable_mandatory || release_workers) {
375         int mandatory_delta = disable_mandatory ? -1 : 0;
376         int workers_delta = release_workers ? -(int)my_max_num_workers : 0;
377 
378         if (disable_mandatory && is_arena_workerless()) {
379             // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now
380             workers_delta = -1;
381         }
382         request_workers(mandatory_delta, workers_delta);
383     }
384 }
385 
386 void arena::set_top_priority(bool is_top_priority) {
387     my_is_top_priority.store(is_top_priority, std::memory_order_relaxed);
388 }
389 
390 bool arena::is_top_priority() const {
391     return my_is_top_priority.load(std::memory_order_relaxed);
392 }
393 
394 bool arena::try_join() {
395     if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) {
396         my_references += arena::ref_worker;
397         return true;
398     }
399     return false;
400 }
401 
402 void arena::set_allotment(unsigned allotment) {
403     if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) {
404         my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
405     }
406 }
407 
408 std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) {
409     __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
410 
411     int min_workers_request = 0;
412     int max_workers_request = 0;
413 
414     // Calculate min request
415     my_mandatory_requests += mandatory_delta;
416     min_workers_request = my_mandatory_requests > 0 ? 1 : 0;
417 
418     // Calculate max request
419     my_total_num_workers_requested += workers_delta;
420     // Clamp worker request into interval [0, my_max_num_workers]
421     max_workers_request = clamp(my_total_num_workers_requested, 0,
422         min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers);
423 
424     return { min_workers_request, max_workers_request };
425 }
426 
427 thread_control_monitor& arena::get_waiting_threads_monitor() {
428     return my_threading_control->get_waiting_threads_monitor();
429 }
430 
431 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
432     task_group_context_impl::bind_to(ctx, &td);
433     task_accessor::context(t) = &ctx;
434     task_accessor::isolation(t) = no_isolation;
435     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
436     advertise_new_work<work_enqueued>();
437 }
438 
439 arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level)
440 {
441     __TBB_ASSERT(num_slots > 0, NULL);
442     __TBB_ASSERT(num_reserved_slots <= num_slots, NULL);
443     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
444     arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level);
445     a.my_tc_client = control->create_client(a);
446     // We should not publish arena until all fields are initialized
447     control->publish_client(a.my_tc_client);
448     return a;
449 }
450 
451 } // namespace r1
452 } // namespace detail
453 } // namespace tbb
454 
455 // Enable task_arena.h
456 #include "oneapi/tbb/task_arena.h" // task_arena_base
457 
458 namespace tbb {
459 namespace detail {
460 namespace r1 {
461 
462 #if TBB_USE_ASSERT
463 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
464     bool is_arena_priority_correct =
465         a_priority == tbb::task_arena::priority::high   ||
466         a_priority == tbb::task_arena::priority::normal ||
467         a_priority == tbb::task_arena::priority::low;
468     __TBB_ASSERT( is_arena_priority_correct,
469                   "Task arena priority should be equal to one of the predefined values." );
470 }
471 #else
472 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
473 #endif
474 
475 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
476     assert_arena_priority_valid( a_priority );
477     return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
478 }
479 
480 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
481     auto priority = tbb::task_arena::priority(
482         (d1::num_priority_levels - priority_level) * d1::priority_stride
483     );
484     assert_arena_priority_valid( priority );
485     return priority;
486 }
487 
488 struct task_arena_impl {
489     static void initialize(d1::task_arena_base&);
490     static void terminate(d1::task_arena_base&);
491     static bool attach(d1::task_arena_base&);
492     static void execute(d1::task_arena_base&, d1::delegate_base&);
493     static void wait(d1::task_arena_base&);
494     static int max_concurrency(const d1::task_arena_base*);
495     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
496 };
497 
498 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
499     task_arena_impl::initialize(ta);
500 }
501 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
502     task_arena_impl::terminate(ta);
503 }
504 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
505     return task_arena_impl::attach(ta);
506 }
507 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
508     task_arena_impl::execute(ta, d);
509 }
510 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
511     task_arena_impl::wait(ta);
512 }
513 
514 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
515     return task_arena_impl::max_concurrency(ta);
516 }
517 
518 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
519     task_arena_impl::enqueue(t, nullptr, ta);
520 }
521 
522 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
523     task_arena_impl::enqueue(t, &ctx, ta);
524 }
525 
526 void task_arena_impl::initialize(d1::task_arena_base& ta) {
527     // Enforce global market initialization to properly initialize soft limit
528     (void)governor::get_thread_data();
529     if (ta.my_max_concurrency < 1) {
530 #if __TBB_ARENA_BINDING
531         d1::constraints arena_constraints = d1::constraints{}
532             .set_core_type(ta.core_type())
533             .set_max_threads_per_core(ta.max_threads_per_core())
534             .set_numa_id(ta.my_numa_id);
535         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
536 #else /*!__TBB_ARENA_BINDING*/
537         ta.my_max_concurrency = (int)governor::default_num_threads();
538 #endif /*!__TBB_ARENA_BINDING*/
539     }
540 
541     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
542     unsigned priority_level = arena_priority_level(ta.my_priority);
543     threading_control* thr_control = threading_control::register_public_reference();
544     arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level);
545     ta.my_arena.store(&a, std::memory_order_release);
546 #if __TBB_CPUBIND_PRESENT
547     a.my_numa_binding_observer = construct_binding_observer(
548         static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
549 #endif /*__TBB_CPUBIND_PRESENT*/
550 }
551 
552 void task_arena_impl::terminate(d1::task_arena_base& ta) {
553     arena* a = ta.my_arena.load(std::memory_order_relaxed);
554     assert_pointer_valid(a);
555     threading_control::unregister_public_reference(/*blocking_terminate=*/false);
556     a->on_thread_leaving(arena::ref_external);
557     ta.my_arena.store(nullptr, std::memory_order_relaxed);
558 }
559 
560 bool task_arena_impl::attach(d1::task_arena_base& ta) {
561     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
562     thread_data* td = governor::get_thread_data_if_initialized();
563     if( td && td->my_arena ) {
564         arena* a = td->my_arena;
565         // There is an active arena to attach to.
566         // It's still used by s, so won't be destroyed right away.
567         __TBB_ASSERT(a->my_references > 0, nullptr);
568         a->my_references += arena::ref_external;
569         ta.my_num_reserved_slots = a->my_num_reserved_slots;
570         ta.my_priority = arena_priority(a->my_priority_level);
571         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
572         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr);
573         ta.my_arena.store(a, std::memory_order_release);
574         // increases threading_control's ref count for task_arena
575         threading_control::register_public_reference();
576         return true;
577     }
578     return false;
579 }
580 
581 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
582     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
583     assert_pointer_valid(td, "thread_data pointer should not be null");
584     arena* a = ta ?
585               ta->my_arena.load(std::memory_order_relaxed)
586             : td->my_arena
587     ;
588     assert_pointer_valid(a, "arena pointer should not be null");
589     auto* ctx = c ? c : a->my_default_ctx;
590     assert_pointer_valid(ctx, "context pointer should not be null");
591     // Is there a better place for checking the state of ctx?
592      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
593                   "The task will not be executed because its task_group_context is cancelled.");
594      a->enqueue_task(t, *ctx, *td);
595 }
596 
597 class nested_arena_context : no_copy {
598 public:
599     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
600         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
601     {
602         if (td.my_arena != &nested_arena) {
603             m_orig_arena = td.my_arena;
604             m_orig_slot_index = td.my_arena_index;
605             m_orig_last_observer = td.my_last_observer;
606 
607             td.detach_task_dispatcher();
608             td.attach_arena(nested_arena, slot_index);
609             if (td.my_inbox.is_idle_state(true))
610                 td.my_inbox.set_is_idle(false);
611             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
612             td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
613 
614             // If the calling thread occupies the slots out of external thread reserve we need to notify the
615             // market that this arena requires one worker less.
616             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
617                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
618             }
619 
620             td.my_last_observer = nullptr;
621             // The task_arena::execute method considers each calling thread as an external thread.
622             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
623         }
624 
625         m_task_dispatcher = td.my_task_dispatcher;
626         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
627         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
628         m_task_dispatcher->m_properties.critical_task_allowed = true;
629 
630         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
631         ed_ext.context = td.my_arena->my_default_ctx;
632         ed_ext.original_slot = td.my_arena_index;
633         ed_ext.affinity_slot = d1::no_slot;
634         ed_ext.task_disp = td.my_task_dispatcher;
635         ed_ext.isolation = no_isolation;
636 
637         __TBB_ASSERT(td.my_arena_slot, nullptr);
638         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
639         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
640     }
641     ~nested_arena_context() {
642         thread_data& td = *m_task_dispatcher->m_thread_data;
643         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
644         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
645         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
646         if (m_orig_arena) {
647             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
648             td.my_last_observer = m_orig_last_observer;
649 
650             // Notify the market that this thread releasing a one slot
651             // that can be used by a worker thread.
652             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
653                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
654             }
655 
656             td.leave_task_dispatcher();
657             td.my_arena_slot->release();
658             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
659 
660             td.attach_arena(*m_orig_arena, m_orig_slot_index);
661             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
662             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
663         }
664         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
665     }
666 
667 private:
668     execution_data_ext    m_orig_execute_data_ext{};
669     arena*              m_orig_arena{ nullptr };
670     observer_proxy*     m_orig_last_observer{ nullptr };
671     task_dispatcher*    m_task_dispatcher{ nullptr };
672     unsigned            m_orig_slot_index{};
673     bool                m_orig_fifo_tasks_allowed{};
674     bool                m_orig_critical_task_allowed{};
675 };
676 
677 class delegated_task : public d1::task {
678     d1::delegate_base&  m_delegate;
679     concurrent_monitor& m_monitor;
680     d1::wait_context&   m_wait_ctx;
681     std::atomic<bool>   m_completed;
682     d1::task* execute(d1::execution_data& ed) override {
683         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
684         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
685         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
686             "The execute data shall point to the current task dispatcher execute data");
687         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
688 
689         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
690         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
691         try_call([&] {
692             m_delegate();
693         }).on_completion([&] {
694             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
695             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
696         });
697 
698         finalize();
699         return nullptr;
700     }
701     d1::task* cancel(d1::execution_data&) override {
702         finalize();
703         return nullptr;
704     }
705     void finalize() {
706         m_wait_ctx.release(); // must precede the wakeup
707         m_monitor.notify([this] (std::uintptr_t ctx) {
708             return ctx == std::uintptr_t(&m_delegate);
709         }); // do not relax, it needs a fence!
710         m_completed.store(true, std::memory_order_release);
711     }
712 public:
713     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
714         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
715     ~delegated_task() override {
716         // The destructor can be called earlier than the m_monitor is notified
717         // because the waiting thread can be released after m_wait_ctx.release_wait.
718         // To close that race we wait for the m_completed signal.
719         spin_wait_until_eq(m_completed, true);
720     }
721 };
722 
723 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
724     arena* a = ta.my_arena.load(std::memory_order_relaxed);
725     __TBB_ASSERT(a != nullptr, nullptr);
726     thread_data* td = governor::get_thread_data();
727 
728     bool same_arena = td->my_arena == a;
729     std::size_t index1 = td->my_arena_index;
730     if (!same_arena) {
731         index1 = a->occupy_free_slot</*as_worker */false>(*td);
732         if (index1 == arena::out_of_arena) {
733             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
734             d1::wait_context wo(1);
735             d1::task_group_context exec_context(d1::task_group_context::isolated);
736             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
737 
738             delegated_task dt(d, a->my_exit_monitors, wo);
739             a->enqueue_task( dt, exec_context, *td);
740             size_t index2 = arena::out_of_arena;
741             do {
742                 a->my_exit_monitors.prepare_wait(waiter);
743                 if (!wo.continue_execution()) {
744                     a->my_exit_monitors.cancel_wait(waiter);
745                     break;
746                 }
747                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
748                 if (index2 != arena::out_of_arena) {
749                     a->my_exit_monitors.cancel_wait(waiter);
750                     nested_arena_context scope(*td, *a, index2 );
751                     r1::wait(wo, exec_context);
752                     __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
753                     break;
754                 }
755                 a->my_exit_monitors.commit_wait(waiter);
756             } while (wo.continue_execution());
757             if (index2 == arena::out_of_arena) {
758                 // notify a waiting thread even if this thread did not enter arena,
759                 // in case it was woken by a leaving thread but did not need to enter
760                 a->my_exit_monitors.notify_one(); // do not relax!
761             }
762             // process possible exception
763             auto exception = exec_context.my_exception.load(std::memory_order_acquire);
764             if (exception) {
765                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
766                 exception->throw_self();
767             }
768             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
769             return;
770         } // if (index1 == arena::out_of_arena)
771     } // if (!same_arena)
772 
773     context_guard_helper</*report_tasks=*/false> context_guard;
774     context_guard.set_ctx(a->my_default_ctx);
775     nested_arena_context scope(*td, *a, index1);
776 #if _WIN64
777     try {
778 #endif
779         d();
780         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
781 #if _WIN64
782     } catch (...) {
783         context_guard.restore_default();
784         throw;
785     }
786 #endif
787 }
788 
789 void task_arena_impl::wait(d1::task_arena_base& ta) {
790     arena* a = ta.my_arena.load(std::memory_order_relaxed);
791     __TBB_ASSERT(a != nullptr, nullptr);
792     thread_data* td = governor::get_thread_data();
793     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
794     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
795     if (a->my_max_num_workers != 0) {
796         while (a->num_workers_active() || !a->is_empty()) {
797             yield();
798         }
799     }
800 }
801 
802 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
803     arena* a = nullptr;
804     if( ta ) // for special cases of ta->max_concurrency()
805         a = ta->my_arena.load(std::memory_order_relaxed);
806     else if( thread_data* td = governor::get_thread_data_if_initialized() )
807         a = td->my_arena; // the current arena if any
808 
809     if( a ) { // Get parameters from the arena
810         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
811         int mandatory_worker = 0;
812         if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) {
813             mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0;
814         }
815         return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker;
816     }
817 
818     if (ta && ta->my_max_concurrency == 1) {
819         return 1;
820     }
821 
822 #if __TBB_ARENA_BINDING
823     if (ta) {
824         d1::constraints arena_constraints = d1::constraints{}
825             .set_numa_id(ta->my_numa_id)
826             .set_core_type(ta->core_type())
827             .set_max_threads_per_core(ta->max_threads_per_core());
828         return (int)default_concurrency(arena_constraints);
829     }
830 #endif /*!__TBB_ARENA_BINDING*/
831 
832     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
833     return int(governor::default_num_threads());
834 }
835 
836 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
837     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
838     thread_data* tls = governor::get_thread_data();
839     assert_pointers_valid(tls, tls->my_task_dispatcher);
840     task_dispatcher* dispatcher = tls->my_task_dispatcher;
841     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
842     try_call([&] {
843         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
844         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
845         // Save the current isolation value and set new one
846         previous_isolation = dispatcher->set_isolation(current_isolation);
847         // Isolation within this callable
848         d();
849     }).on_completion([&] {
850         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
851         dispatcher->set_isolation(previous_isolation);
852     });
853 }
854 
855 } // namespace r1
856 } // namespace detail
857 } // namespace tbb
858