xref: /oneTBB/src/tbb/arena.cpp (revision 7b022651)
151c0b2f7Stbbdev /*
2f8f7f738SPavel Kumbrasev     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "task_dispatcher.h"
1851c0b2f7Stbbdev #include "governor.h"
19c4568449SPavel Kumbrasev #include "threading_control.h"
2051c0b2f7Stbbdev #include "arena.h"
2151c0b2f7Stbbdev #include "itt_notify.h"
2251c0b2f7Stbbdev #include "semaphore.h"
2351c0b2f7Stbbdev #include "waiters.h"
2449e08aacStbbdev #include "oneapi/tbb/detail/_task.h"
25b15aabb3Stbbdev #include "oneapi/tbb/info.h"
2649e08aacStbbdev #include "oneapi/tbb/tbb_allocator.h"
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev #include <atomic>
2951c0b2f7Stbbdev #include <cstring>
3051c0b2f7Stbbdev #include <functional>
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev namespace tbb {
3351c0b2f7Stbbdev namespace detail {
3451c0b2f7Stbbdev namespace r1 {
3551c0b2f7Stbbdev 
36b15aabb3Stbbdev #if __TBB_ARENA_BINDING
3751c0b2f7Stbbdev class numa_binding_observer : public tbb::task_scheduler_observer {
3851c0b2f7Stbbdev     binding_handler* my_binding_handler;
3951c0b2f7Stbbdev public:
numa_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)40b15aabb3Stbbdev     numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
4151c0b2f7Stbbdev         : task_scheduler_observer(*ta)
42b15aabb3Stbbdev         , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
4351c0b2f7Stbbdev     {}
4451c0b2f7Stbbdev 
on_scheduler_entry(bool)4551c0b2f7Stbbdev     void on_scheduler_entry( bool ) override {
46b15aabb3Stbbdev         apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
4751c0b2f7Stbbdev     }
4851c0b2f7Stbbdev 
on_scheduler_exit(bool)4951c0b2f7Stbbdev     void on_scheduler_exit( bool ) override {
5051c0b2f7Stbbdev         restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
5151c0b2f7Stbbdev     }
5251c0b2f7Stbbdev 
~numa_binding_observer()53ba947f18SIlya Isaev     ~numa_binding_observer() override{
5451c0b2f7Stbbdev         destroy_binding_handler(my_binding_handler);
5551c0b2f7Stbbdev     }
5651c0b2f7Stbbdev };
5751c0b2f7Stbbdev 
construct_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)58b15aabb3Stbbdev numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
5951c0b2f7Stbbdev     numa_binding_observer* binding_observer = nullptr;
60b15aabb3Stbbdev     if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
61b15aabb3Stbbdev         binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
6251c0b2f7Stbbdev         __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
6351c0b2f7Stbbdev         binding_observer->observe(true);
6451c0b2f7Stbbdev     }
6551c0b2f7Stbbdev     return binding_observer;
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev 
destroy_binding_observer(numa_binding_observer * binding_observer)6851c0b2f7Stbbdev void destroy_binding_observer( numa_binding_observer* binding_observer ) {
6957f524caSIlya Isaev     __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
7051c0b2f7Stbbdev     binding_observer->observe(false);
7151c0b2f7Stbbdev     binding_observer->~numa_binding_observer();
7251c0b2f7Stbbdev     deallocate_memory(binding_observer);
7351c0b2f7Stbbdev }
74b15aabb3Stbbdev #endif /*!__TBB_ARENA_BINDING*/
7551c0b2f7Stbbdev 
on_thread_leaving(unsigned ref_param)76c4568449SPavel Kumbrasev void arena::on_thread_leaving(unsigned ref_param) {
77c4568449SPavel Kumbrasev     //
78c4568449SPavel Kumbrasev     // Implementation of arena destruction synchronization logic contained various
79c4568449SPavel Kumbrasev     // bugs/flaws at the different stages of its evolution, so below is a detailed
80c4568449SPavel Kumbrasev     // description of the issues taken into consideration in the framework of the
81c4568449SPavel Kumbrasev     // current design.
82c4568449SPavel Kumbrasev     //
83c4568449SPavel Kumbrasev     // In case of using fire-and-forget tasks (scheduled via task::enqueue())
84c4568449SPavel Kumbrasev     // external thread is allowed to leave its arena before all its work is executed,
85c4568449SPavel Kumbrasev     // and market may temporarily revoke all workers from this arena. Since revoked
86c4568449SPavel Kumbrasev     // workers never attempt to reset arena state to EMPTY and cancel its request
87c4568449SPavel Kumbrasev     // to RML for threads, the arena object is destroyed only when both the last
88c4568449SPavel Kumbrasev     // thread is leaving it and arena's state is EMPTY (that is its external thread
89c4568449SPavel Kumbrasev     // left and it does not contain any work).
90c4568449SPavel Kumbrasev     // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
91c4568449SPavel Kumbrasev     // be done here (or anywhere else in the external thread to that matter); doing so
92c4568449SPavel Kumbrasev     // can result either in arena's premature destruction (at least without
93c4568449SPavel Kumbrasev     // additional costly checks in workers) or in unnecessary arena state changes
94c4568449SPavel Kumbrasev     // (and ensuing workers migration).
95c4568449SPavel Kumbrasev     //
96c4568449SPavel Kumbrasev     // A worker that checks for work presence and transitions arena to the EMPTY
97c4568449SPavel Kumbrasev     // state (in snapshot taking procedure arena::out_of_work()) updates
98c4568449SPavel Kumbrasev     // arena::my_pool_state first and only then arena::my_num_workers_requested.
99c4568449SPavel Kumbrasev     // So the check for work absence must be done against the latter field.
100c4568449SPavel Kumbrasev     //
101c4568449SPavel Kumbrasev     // In a time window between decrementing the active threads count and checking
102c4568449SPavel Kumbrasev     // if there is an outstanding request for workers. New worker thread may arrive,
103c4568449SPavel Kumbrasev     // finish remaining work, set arena state to empty, and leave decrementing its
104c4568449SPavel Kumbrasev     // refcount and destroying. Then the current thread will destroy the arena
105c4568449SPavel Kumbrasev     // the second time. To preclude it a local copy of the outstanding request
106c4568449SPavel Kumbrasev     // value can be stored before decrementing active threads count.
107c4568449SPavel Kumbrasev     //
108c4568449SPavel Kumbrasev     // But this technique may cause two other problem. When the stored request is
109c4568449SPavel Kumbrasev     // zero, it is possible that arena still has threads and they can generate new
110c4568449SPavel Kumbrasev     // tasks and thus re-establish non-zero requests. Then all the threads can be
111c4568449SPavel Kumbrasev     // revoked (as described above) leaving this thread the last one, and causing
112c4568449SPavel Kumbrasev     // it to destroy non-empty arena.
113c4568449SPavel Kumbrasev     //
114c4568449SPavel Kumbrasev     // The other problem takes place when the stored request is non-zero. Another
115c4568449SPavel Kumbrasev     // thread may complete the work, set arena state to empty, and leave without
116c4568449SPavel Kumbrasev     // arena destruction before this thread decrements the refcount. This thread
117c4568449SPavel Kumbrasev     // cannot destroy the arena either. Thus the arena may be "orphaned".
118c4568449SPavel Kumbrasev     //
119c4568449SPavel Kumbrasev     // In both cases we cannot dereference arena pointer after the refcount is
120c4568449SPavel Kumbrasev     // decremented, as our arena may already be destroyed.
121c4568449SPavel Kumbrasev     //
122c4568449SPavel Kumbrasev     // If this is the external thread, the market is protected by refcount to it.
123c4568449SPavel Kumbrasev     // In case of workers market's liveness is ensured by the RML connection
124c4568449SPavel Kumbrasev     // rundown protocol, according to which the client (i.e. the market) lives
125c4568449SPavel Kumbrasev     // until RML server notifies it about connection termination, and this
126c4568449SPavel Kumbrasev     // notification is fired only after all workers return into RML.
127c4568449SPavel Kumbrasev     //
128c4568449SPavel Kumbrasev     // Thus if we decremented refcount to zero we ask the market to check arena
129c4568449SPavel Kumbrasev     // state (including the fact if it is alive) under the lock.
130c4568449SPavel Kumbrasev     //
131c4568449SPavel Kumbrasev 
132c4568449SPavel Kumbrasev     __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
133c4568449SPavel Kumbrasev 
134c4568449SPavel Kumbrasev     // When there is no workers someone must free arena, as
135c4568449SPavel Kumbrasev     // without workers, no one calls out_of_work().
136c4568449SPavel Kumbrasev     if (ref_param == ref_external && !my_mandatory_concurrency.test()) {
137c4568449SPavel Kumbrasev         out_of_work();
138c4568449SPavel Kumbrasev     }
139c4568449SPavel Kumbrasev 
140c4568449SPavel Kumbrasev     threading_control* tc = my_threading_control;
141c4568449SPavel Kumbrasev     auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client);
142c4568449SPavel Kumbrasev     // Release our reference to sync with destroy_client
143c4568449SPavel Kumbrasev     unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
144c4568449SPavel Kumbrasev     // do not access `this` it might be destroyed already
145c4568449SPavel Kumbrasev     if (remaining_ref == 0) {
146c4568449SPavel Kumbrasev         if (tc->try_destroy_client(tc_client_snapshot)) {
147c4568449SPavel Kumbrasev             // We are requested to destroy ourself
148c4568449SPavel Kumbrasev             free_arena();
149c4568449SPavel Kumbrasev         }
150c4568449SPavel Kumbrasev     }
151c4568449SPavel Kumbrasev }
152c4568449SPavel Kumbrasev 
occupy_free_slot_in_range(thread_data & tls,std::size_t lower,std::size_t upper)15351c0b2f7Stbbdev std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
15451c0b2f7Stbbdev     if ( lower >= upper ) return out_of_arena;
15551c0b2f7Stbbdev     // Start search for an empty slot from the one we occupied the last time
15651c0b2f7Stbbdev     std::size_t index = tls.my_arena_index;
15751c0b2f7Stbbdev     if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
15857f524caSIlya Isaev     __TBB_ASSERT( index >= lower && index < upper, nullptr);
15951c0b2f7Stbbdev     // Find a free slot
16051c0b2f7Stbbdev     for ( std::size_t i = index; i < upper; ++i )
16151c0b2f7Stbbdev         if (my_slots[i].try_occupy()) return i;
16251c0b2f7Stbbdev     for ( std::size_t i = lower; i < index; ++i )
16351c0b2f7Stbbdev         if (my_slots[i].try_occupy()) return i;
16451c0b2f7Stbbdev     return out_of_arena;
16551c0b2f7Stbbdev }
16651c0b2f7Stbbdev 
16751c0b2f7Stbbdev template <bool as_worker>
occupy_free_slot(thread_data & tls)16851c0b2f7Stbbdev std::size_t arena::occupy_free_slot(thread_data& tls) {
169b15aabb3Stbbdev     // Firstly, external threads try to occupy reserved slots
17051c0b2f7Stbbdev     std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls,  0, my_num_reserved_slots );
17151c0b2f7Stbbdev     if ( index == out_of_arena ) {
17251c0b2f7Stbbdev         // Secondly, all threads try to occupy all non-reserved slots
17351c0b2f7Stbbdev         index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
17451c0b2f7Stbbdev         // Likely this arena is already saturated
17551c0b2f7Stbbdev         if ( index == out_of_arena )
17651c0b2f7Stbbdev             return out_of_arena;
17751c0b2f7Stbbdev     }
17851c0b2f7Stbbdev 
17951c0b2f7Stbbdev     atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
18051c0b2f7Stbbdev     return index;
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev 
calculate_stealing_threshold()18351c0b2f7Stbbdev std::uintptr_t arena::calculate_stealing_threshold() {
18451c0b2f7Stbbdev     stack_anchor_type anchor;
185c4568449SPavel Kumbrasev     return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size());
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev 
process(thread_data & tls)18851c0b2f7Stbbdev void arena::process(thread_data& tls) {
18951c0b2f7Stbbdev     governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
19051c0b2f7Stbbdev     __TBB_ASSERT( is_alive(my_guard), nullptr);
191f8f7f738SPavel Kumbrasev     __TBB_ASSERT( my_num_slots >= 1, nullptr);
19251c0b2f7Stbbdev 
19351c0b2f7Stbbdev     std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
19451c0b2f7Stbbdev     if (index == out_of_arena) {
195c4568449SPavel Kumbrasev         on_thread_leaving(ref_worker);
19651c0b2f7Stbbdev         return;
19751c0b2f7Stbbdev     }
198c4568449SPavel Kumbrasev 
19971e1bb8eSPavel Kumbrasev     my_tc_client.get_pm_client()->register_thread();
20071e1bb8eSPavel Kumbrasev 
20151c0b2f7Stbbdev     __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
20251c0b2f7Stbbdev     tls.attach_arena(*this, index);
203478de5b1Stbbdev     // worker thread enters the dispatch loop to look for a work
204478de5b1Stbbdev     tls.my_inbox.set_is_idle(true);
205478de5b1Stbbdev     if (tls.my_arena_slot->is_task_pool_published()) {
206478de5b1Stbbdev         tls.my_inbox.set_is_idle(false);
207478de5b1Stbbdev     }
20851c0b2f7Stbbdev 
20951c0b2f7Stbbdev     task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
210219c4252SAlex     tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
21151c0b2f7Stbbdev     __TBB_ASSERT(task_disp.can_steal(), nullptr);
21251c0b2f7Stbbdev 
21351c0b2f7Stbbdev     __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
21451c0b2f7Stbbdev     my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
21551c0b2f7Stbbdev 
21651c0b2f7Stbbdev     // Waiting on special object tied to this arena
21751c0b2f7Stbbdev     outermost_worker_waiter waiter(*this);
21851c0b2f7Stbbdev     d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
219478de5b1Stbbdev     // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
220478de5b1Stbbdev     // attached to it.
221478de5b1Stbbdev     tls.my_inbox.set_is_idle(true);
222478de5b1Stbbdev 
22351c0b2f7Stbbdev     __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
22451c0b2f7Stbbdev     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
22551c0b2f7Stbbdev     __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
22651c0b2f7Stbbdev 
22751c0b2f7Stbbdev     my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
22851c0b2f7Stbbdev     tls.my_last_observer = nullptr;
22951c0b2f7Stbbdev 
230219c4252SAlex     tls.leave_task_dispatcher();
23151c0b2f7Stbbdev 
23251c0b2f7Stbbdev     // Arena slot detach (arena may be used in market::process)
23351c0b2f7Stbbdev     // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
23451c0b2f7Stbbdev     tls.my_arena_slot->release();
23551c0b2f7Stbbdev     tls.my_arena_slot = nullptr;
23651c0b2f7Stbbdev     tls.my_inbox.detach();
23751c0b2f7Stbbdev     __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
23851c0b2f7Stbbdev     __TBB_ASSERT(is_alive(my_guard), nullptr);
23951c0b2f7Stbbdev 
24071e1bb8eSPavel Kumbrasev     my_tc_client.get_pm_client()->unregister_thread();
24171e1bb8eSPavel Kumbrasev 
24251c0b2f7Stbbdev     // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
24351c0b2f7Stbbdev     // that arena may be temporarily left unpopulated by threads. See comments in
24451c0b2f7Stbbdev     // arena::on_thread_leaving() for more details.
245c4568449SPavel Kumbrasev     on_thread_leaving(ref_worker);
24651c0b2f7Stbbdev     __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
24751c0b2f7Stbbdev }
24851c0b2f7Stbbdev 
arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)249c4568449SPavel Kumbrasev arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) {
25051c0b2f7Stbbdev     __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
25151c0b2f7Stbbdev     __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
25251c0b2f7Stbbdev     __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
253c4568449SPavel Kumbrasev     my_threading_control = control;
25451c0b2f7Stbbdev     my_limit = 1;
255b15aabb3Stbbdev     // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
256f8f7f738SPavel Kumbrasev     my_num_slots = num_arena_slots(num_slots, num_reserved_slots);
25751c0b2f7Stbbdev     my_num_reserved_slots = num_reserved_slots;
25851c0b2f7Stbbdev     my_max_num_workers = num_slots-num_reserved_slots;
25951c0b2f7Stbbdev     my_priority_level = priority_level;
260b15aabb3Stbbdev     my_references = ref_external; // accounts for the external thread
26151c0b2f7Stbbdev     my_observers.my_arena = this;
26251c0b2f7Stbbdev     my_co_cache.init(4 * num_slots);
26357f524caSIlya Isaev     __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
26451c0b2f7Stbbdev     // Initialize the default context. It should be allocated before task_dispatch construction.
26551c0b2f7Stbbdev     my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
26651c0b2f7Stbbdev         d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
26751c0b2f7Stbbdev     // Construct slots. Mark internal synchronization elements for the tools.
26851c0b2f7Stbbdev     task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
26951c0b2f7Stbbdev     for( unsigned i = 0; i < my_num_slots; ++i ) {
27057f524caSIlya Isaev         // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
27157f524caSIlya Isaev         __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
27257f524caSIlya Isaev         __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
27351c0b2f7Stbbdev         mailbox(i).construct();
27451c0b2f7Stbbdev         my_slots[i].init_task_streams(i);
27551c0b2f7Stbbdev         my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
27651c0b2f7Stbbdev         my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
27751c0b2f7Stbbdev     }
27851c0b2f7Stbbdev     my_fifo_task_stream.initialize(my_num_slots);
27951c0b2f7Stbbdev     my_resume_task_stream.initialize(my_num_slots);
28051c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
28151c0b2f7Stbbdev     my_critical_task_stream.initialize(my_num_slots);
28251c0b2f7Stbbdev #endif
283c4568449SPavel Kumbrasev     my_mandatory_requests = 0;
28451c0b2f7Stbbdev }
28551c0b2f7Stbbdev 
allocate_arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)286c4568449SPavel Kumbrasev arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
28751c0b2f7Stbbdev                               unsigned priority_level)
28851c0b2f7Stbbdev {
28951c0b2f7Stbbdev     __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
29051c0b2f7Stbbdev     __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
29151c0b2f7Stbbdev     __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
292f8f7f738SPavel Kumbrasev     std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots));
29351c0b2f7Stbbdev     unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
29451c0b2f7Stbbdev     // Zero all slots to indicate that they are empty
29551c0b2f7Stbbdev     std::memset( storage, 0, n );
296c4568449SPavel Kumbrasev 
297f8f7f738SPavel Kumbrasev     return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
298c4568449SPavel Kumbrasev         arena(control, num_slots, num_reserved_slots, priority_level);
29951c0b2f7Stbbdev }
30051c0b2f7Stbbdev 
free_arena()30151c0b2f7Stbbdev void arena::free_arena () {
30257f524caSIlya Isaev     __TBB_ASSERT( is_alive(my_guard), nullptr);
30351c0b2f7Stbbdev     __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
304c4568449SPavel Kumbrasev     __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
305c4568449SPavel Kumbrasev     __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" );
306edc30c82SIvan Kochin #if __TBB_ARENA_BINDING
307edc30c82SIvan Kochin     if (my_numa_binding_observer != nullptr) {
308edc30c82SIvan Kochin         destroy_binding_observer(my_numa_binding_observer);
309edc30c82SIvan Kochin         my_numa_binding_observer = nullptr;
310edc30c82SIvan Kochin     }
311edc30c82SIvan Kochin #endif /*__TBB_ARENA_BINDING*/
31251c0b2f7Stbbdev     poison_value( my_guard );
31351c0b2f7Stbbdev     for ( unsigned i = 0; i < my_num_slots; ++i ) {
31451c0b2f7Stbbdev         // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
31551c0b2f7Stbbdev         // TODO: understand the assertion and modify
31657f524caSIlya Isaev         // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
31757f524caSIlya Isaev         __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
31851c0b2f7Stbbdev         my_slots[i].free_task_pool();
319933a38a4SKhem Raj         mailbox(i).drain();
32051c0b2f7Stbbdev         my_slots[i].my_default_task_dispatcher->~task_dispatcher();
32151c0b2f7Stbbdev     }
32251c0b2f7Stbbdev     __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
32351c0b2f7Stbbdev     __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
32451c0b2f7Stbbdev     // Cleanup coroutines/schedulers cache
32551c0b2f7Stbbdev     my_co_cache.cleanup();
32651c0b2f7Stbbdev     my_default_ctx->~task_group_context();
32751c0b2f7Stbbdev     cache_aligned_deallocate(my_default_ctx);
32851c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
32951c0b2f7Stbbdev     __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
33051c0b2f7Stbbdev #endif
331a080baf9SAlex     // Clear enfources synchronization with observe(false)
33251c0b2f7Stbbdev     my_observers.clear();
333a080baf9SAlex 
33451c0b2f7Stbbdev     void* storage  = &mailbox(my_num_slots-1);
33557f524caSIlya Isaev     __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
33651c0b2f7Stbbdev     this->~arena();
33751c0b2f7Stbbdev #if TBB_USE_ASSERT > 1
33851c0b2f7Stbbdev     std::memset( storage, 0, allocation_size(my_num_slots) );
33951c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
34051c0b2f7Stbbdev     cache_aligned_deallocate( storage );
34151c0b2f7Stbbdev }
34251c0b2f7Stbbdev 
has_enqueued_tasks()34351c0b2f7Stbbdev bool arena::has_enqueued_tasks() {
34451c0b2f7Stbbdev     return !my_fifo_task_stream.empty();
34551c0b2f7Stbbdev }
34651c0b2f7Stbbdev 
request_workers(int mandatory_delta,int workers_delta,bool wakeup_threads)347c4568449SPavel Kumbrasev void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) {
348c4568449SPavel Kumbrasev     my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta);
349b15aabb3Stbbdev 
350c4568449SPavel Kumbrasev     if (wakeup_threads) {
351c4568449SPavel Kumbrasev         // Notify all sleeping threads that work has appeared in the arena.
352c4568449SPavel Kumbrasev         get_waiting_threads_monitor().notify([&] (market_context context) {
353c4568449SPavel Kumbrasev             return this == context.my_arena_addr;
354c4568449SPavel Kumbrasev         });
355c4568449SPavel Kumbrasev     }
356c4568449SPavel Kumbrasev }
357c4568449SPavel Kumbrasev 
has_tasks()358c4568449SPavel Kumbrasev bool arena::has_tasks() {
35951c0b2f7Stbbdev     // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
36051c0b2f7Stbbdev     std::size_t n = my_limit.load(std::memory_order_acquire);
361c4568449SPavel Kumbrasev     bool tasks_are_available = false;
362c4568449SPavel Kumbrasev     for (std::size_t k = 0; k < n && !tasks_are_available; ++k) {
363c4568449SPavel Kumbrasev         tasks_are_available = !my_slots[k].is_empty();
36451c0b2f7Stbbdev     }
365c4568449SPavel Kumbrasev     tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty();
36651c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
367c4568449SPavel Kumbrasev     tasks_are_available = tasks_are_available || !my_critical_task_stream.empty();
36851c0b2f7Stbbdev #endif
369c4568449SPavel Kumbrasev     return tasks_are_available;
370c4568449SPavel Kumbrasev }
371c4568449SPavel Kumbrasev 
out_of_work()372c4568449SPavel Kumbrasev void arena::out_of_work() {
373c4568449SPavel Kumbrasev     // We should try unset my_pool_state first due to keep arena invariants in consistent state
374c4568449SPavel Kumbrasev     // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant
375c4568449SPavel Kumbrasev     bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); });
376c4568449SPavel Kumbrasev     bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); });
377c4568449SPavel Kumbrasev 
378c4568449SPavel Kumbrasev     if (disable_mandatory || release_workers) {
379c4568449SPavel Kumbrasev         int mandatory_delta = disable_mandatory ? -1 : 0;
380c4568449SPavel Kumbrasev         int workers_delta = release_workers ? -(int)my_max_num_workers : 0;
381c4568449SPavel Kumbrasev 
382c4568449SPavel Kumbrasev         if (disable_mandatory && is_arena_workerless()) {
383c4568449SPavel Kumbrasev             // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now
384c4568449SPavel Kumbrasev             workers_delta = -1;
385c4568449SPavel Kumbrasev         }
386c4568449SPavel Kumbrasev         request_workers(mandatory_delta, workers_delta);
387c4568449SPavel Kumbrasev     }
388c4568449SPavel Kumbrasev }
389c4568449SPavel Kumbrasev 
set_top_priority(bool is_top_priority)390c4568449SPavel Kumbrasev void arena::set_top_priority(bool is_top_priority) {
391c4568449SPavel Kumbrasev     my_is_top_priority.store(is_top_priority, std::memory_order_relaxed);
392c4568449SPavel Kumbrasev }
393c4568449SPavel Kumbrasev 
is_top_priority() const394c4568449SPavel Kumbrasev bool arena::is_top_priority() const {
395c4568449SPavel Kumbrasev     return my_is_top_priority.load(std::memory_order_relaxed);
396c4568449SPavel Kumbrasev }
397c4568449SPavel Kumbrasev 
try_join()398c4568449SPavel Kumbrasev bool arena::try_join() {
399c4568449SPavel Kumbrasev     if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) {
400c4568449SPavel Kumbrasev         my_references += arena::ref_worker;
40151c0b2f7Stbbdev         return true;
40251c0b2f7Stbbdev     }
40351c0b2f7Stbbdev     return false;
40451c0b2f7Stbbdev }
405c4568449SPavel Kumbrasev 
set_allotment(unsigned allotment)406c4568449SPavel Kumbrasev void arena::set_allotment(unsigned allotment) {
407c4568449SPavel Kumbrasev     if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) {
408c4568449SPavel Kumbrasev         my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
40951c0b2f7Stbbdev     }
41051c0b2f7Stbbdev }
411c4568449SPavel Kumbrasev 
update_concurrency(unsigned allotment)41271e1bb8eSPavel Kumbrasev int arena::update_concurrency(unsigned allotment) {
413*7b022651SPavel Kumbrasev     int delta = allotment - my_num_workers_allotted.load(std::memory_order_relaxed);
414*7b022651SPavel Kumbrasev     if (delta != 0) {
415*7b022651SPavel Kumbrasev         my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
416*7b022651SPavel Kumbrasev     }
417*7b022651SPavel Kumbrasev     return delta;
41871e1bb8eSPavel Kumbrasev }
41971e1bb8eSPavel Kumbrasev 
update_request(int mandatory_delta,int workers_delta)420c4568449SPavel Kumbrasev std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) {
421c4568449SPavel Kumbrasev     __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
422c4568449SPavel Kumbrasev 
423c4568449SPavel Kumbrasev     int min_workers_request = 0;
424c4568449SPavel Kumbrasev     int max_workers_request = 0;
425c4568449SPavel Kumbrasev 
426c4568449SPavel Kumbrasev     // Calculate min request
427c4568449SPavel Kumbrasev     my_mandatory_requests += mandatory_delta;
428c4568449SPavel Kumbrasev     min_workers_request = my_mandatory_requests > 0 ? 1 : 0;
429c4568449SPavel Kumbrasev 
430c4568449SPavel Kumbrasev     // Calculate max request
431c4568449SPavel Kumbrasev     my_total_num_workers_requested += workers_delta;
432c4568449SPavel Kumbrasev     // Clamp worker request into interval [0, my_max_num_workers]
433c4568449SPavel Kumbrasev     max_workers_request = clamp(my_total_num_workers_requested, 0,
434c4568449SPavel Kumbrasev         min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers);
435c4568449SPavel Kumbrasev 
436c4568449SPavel Kumbrasev     return { min_workers_request, max_workers_request };
43751c0b2f7Stbbdev }
438c4568449SPavel Kumbrasev 
get_waiting_threads_monitor()439c4568449SPavel Kumbrasev thread_control_monitor& arena::get_waiting_threads_monitor() {
440c4568449SPavel Kumbrasev     return my_threading_control->get_waiting_threads_monitor();
44151c0b2f7Stbbdev }
44251c0b2f7Stbbdev 
enqueue_task(d1::task & t,d1::task_group_context & ctx,thread_data & td)44351c0b2f7Stbbdev void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
44451c0b2f7Stbbdev     task_group_context_impl::bind_to(ctx, &td);
44551c0b2f7Stbbdev     task_accessor::context(t) = &ctx;
44651c0b2f7Stbbdev     task_accessor::isolation(t) = no_isolation;
44751c0b2f7Stbbdev     my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
44851c0b2f7Stbbdev     advertise_new_work<work_enqueued>();
44951c0b2f7Stbbdev }
45051c0b2f7Stbbdev 
create(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned arena_priority_level,d1::constraints constraints)45171e1bb8eSPavel Kumbrasev arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) {
452c4568449SPavel Kumbrasev     __TBB_ASSERT(num_slots > 0, NULL);
453c4568449SPavel Kumbrasev     __TBB_ASSERT(num_reserved_slots <= num_slots, NULL);
454c4568449SPavel Kumbrasev     // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
455c4568449SPavel Kumbrasev     arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level);
456c4568449SPavel Kumbrasev     a.my_tc_client = control->create_client(a);
457c4568449SPavel Kumbrasev     // We should not publish arena until all fields are initialized
45871e1bb8eSPavel Kumbrasev     control->publish_client(a.my_tc_client, constraints);
459c4568449SPavel Kumbrasev     return a;
460c4568449SPavel Kumbrasev }
461c4568449SPavel Kumbrasev 
46251c0b2f7Stbbdev } // namespace r1
46351c0b2f7Stbbdev } // namespace detail
46451c0b2f7Stbbdev } // namespace tbb
46551c0b2f7Stbbdev 
46651c0b2f7Stbbdev // Enable task_arena.h
46749e08aacStbbdev #include "oneapi/tbb/task_arena.h" // task_arena_base
46851c0b2f7Stbbdev 
46951c0b2f7Stbbdev namespace tbb {
47051c0b2f7Stbbdev namespace detail {
47151c0b2f7Stbbdev namespace r1 {
47251c0b2f7Stbbdev 
47351c0b2f7Stbbdev #if TBB_USE_ASSERT
assert_arena_priority_valid(tbb::task_arena::priority a_priority)47451c0b2f7Stbbdev void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
47551c0b2f7Stbbdev     bool is_arena_priority_correct =
47651c0b2f7Stbbdev         a_priority == tbb::task_arena::priority::high   ||
47751c0b2f7Stbbdev         a_priority == tbb::task_arena::priority::normal ||
47851c0b2f7Stbbdev         a_priority == tbb::task_arena::priority::low;
47951c0b2f7Stbbdev     __TBB_ASSERT( is_arena_priority_correct,
48051c0b2f7Stbbdev                   "Task arena priority should be equal to one of the predefined values." );
48151c0b2f7Stbbdev }
48251c0b2f7Stbbdev #else
48351c0b2f7Stbbdev void assert_arena_priority_valid( tbb::task_arena::priority ) {}
48451c0b2f7Stbbdev #endif
48551c0b2f7Stbbdev 
arena_priority_level(tbb::task_arena::priority a_priority)48651c0b2f7Stbbdev unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
48751c0b2f7Stbbdev     assert_arena_priority_valid( a_priority );
488c4568449SPavel Kumbrasev     return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
48951c0b2f7Stbbdev }
49051c0b2f7Stbbdev 
arena_priority(unsigned priority_level)49151c0b2f7Stbbdev tbb::task_arena::priority arena_priority( unsigned priority_level ) {
49251c0b2f7Stbbdev     auto priority = tbb::task_arena::priority(
493c4568449SPavel Kumbrasev         (d1::num_priority_levels - priority_level) * d1::priority_stride
49451c0b2f7Stbbdev     );
49551c0b2f7Stbbdev     assert_arena_priority_valid( priority );
49651c0b2f7Stbbdev     return priority;
49751c0b2f7Stbbdev }
49851c0b2f7Stbbdev 
49951c0b2f7Stbbdev struct task_arena_impl {
50051c0b2f7Stbbdev     static void initialize(d1::task_arena_base&);
50151c0b2f7Stbbdev     static void terminate(d1::task_arena_base&);
50251c0b2f7Stbbdev     static bool attach(d1::task_arena_base&);
50351c0b2f7Stbbdev     static void execute(d1::task_arena_base&, d1::delegate_base&);
50451c0b2f7Stbbdev     static void wait(d1::task_arena_base&);
50551c0b2f7Stbbdev     static int max_concurrency(const d1::task_arena_base*);
506478de5b1Stbbdev     static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
50751c0b2f7Stbbdev };
50851c0b2f7Stbbdev 
initialize(d1::task_arena_base & ta)50951c0b2f7Stbbdev void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
51051c0b2f7Stbbdev     task_arena_impl::initialize(ta);
51151c0b2f7Stbbdev }
terminate(d1::task_arena_base & ta)51251c0b2f7Stbbdev void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
51351c0b2f7Stbbdev     task_arena_impl::terminate(ta);
51451c0b2f7Stbbdev }
attach(d1::task_arena_base & ta)51551c0b2f7Stbbdev bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
51651c0b2f7Stbbdev     return task_arena_impl::attach(ta);
51751c0b2f7Stbbdev }
execute(d1::task_arena_base & ta,d1::delegate_base & d)51851c0b2f7Stbbdev void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
51951c0b2f7Stbbdev     task_arena_impl::execute(ta, d);
52051c0b2f7Stbbdev }
wait(d1::task_arena_base & ta)52151c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
52251c0b2f7Stbbdev     task_arena_impl::wait(ta);
52351c0b2f7Stbbdev }
52451c0b2f7Stbbdev 
max_concurrency(const d1::task_arena_base * ta)52551c0b2f7Stbbdev int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
52651c0b2f7Stbbdev     return task_arena_impl::max_concurrency(ta);
52751c0b2f7Stbbdev }
52851c0b2f7Stbbdev 
enqueue(d1::task & t,d1::task_arena_base * ta)52951c0b2f7Stbbdev void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
530478de5b1Stbbdev     task_arena_impl::enqueue(t, nullptr, ta);
531478de5b1Stbbdev }
532478de5b1Stbbdev 
enqueue(d1::task & t,d1::task_group_context & ctx,d1::task_arena_base * ta)533478de5b1Stbbdev void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
534478de5b1Stbbdev     task_arena_impl::enqueue(t, &ctx, ta);
53551c0b2f7Stbbdev }
53651c0b2f7Stbbdev 
initialize(d1::task_arena_base & ta)53751c0b2f7Stbbdev void task_arena_impl::initialize(d1::task_arena_base& ta) {
538552f342bSPavel     // Enforce global market initialization to properly initialize soft limit
539552f342bSPavel     (void)governor::get_thread_data();
54071e1bb8eSPavel Kumbrasev     d1::constraints arena_constraints;
54171e1bb8eSPavel Kumbrasev 
542b15aabb3Stbbdev #if __TBB_ARENA_BINDING
54371e1bb8eSPavel Kumbrasev     arena_constraints = d1::constraints{}
544b15aabb3Stbbdev         .set_core_type(ta.core_type())
545b15aabb3Stbbdev         .set_max_threads_per_core(ta.max_threads_per_core())
546b15aabb3Stbbdev         .set_numa_id(ta.my_numa_id);
54771e1bb8eSPavel Kumbrasev #endif /*__TBB_ARENA_BINDING*/
54871e1bb8eSPavel Kumbrasev 
54971e1bb8eSPavel Kumbrasev     if (ta.my_max_concurrency < 1) {
55071e1bb8eSPavel Kumbrasev #if __TBB_ARENA_BINDING
551b15aabb3Stbbdev         ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
552b15aabb3Stbbdev #else /*!__TBB_ARENA_BINDING*/
553b15aabb3Stbbdev         ta.my_max_concurrency = (int)governor::default_num_threads();
554b15aabb3Stbbdev #endif /*!__TBB_ARENA_BINDING*/
555b15aabb3Stbbdev     }
556b15aabb3Stbbdev 
557b15aabb3Stbbdev     __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
55851c0b2f7Stbbdev     unsigned priority_level = arena_priority_level(ta.my_priority);
559c4568449SPavel Kumbrasev     threading_control* thr_control = threading_control::register_public_reference();
56071e1bb8eSPavel Kumbrasev     arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints);
56171e1bb8eSPavel Kumbrasev 
562c4568449SPavel Kumbrasev     ta.my_arena.store(&a, std::memory_order_release);
563f9fd1beeSIlya Isaev #if __TBB_CPUBIND_PRESENT
564c4568449SPavel Kumbrasev     a.my_numa_binding_observer = construct_binding_observer(
565c4568449SPavel Kumbrasev         static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
566f9fd1beeSIlya Isaev #endif /*__TBB_CPUBIND_PRESENT*/
56751c0b2f7Stbbdev }
56851c0b2f7Stbbdev 
terminate(d1::task_arena_base & ta)56951c0b2f7Stbbdev void task_arena_impl::terminate(d1::task_arena_base& ta) {
570b15aabb3Stbbdev     arena* a = ta.my_arena.load(std::memory_order_relaxed);
571b15aabb3Stbbdev     assert_pointer_valid(a);
572c4568449SPavel Kumbrasev     threading_control::unregister_public_reference(/*blocking_terminate=*/false);
573c4568449SPavel Kumbrasev     a->on_thread_leaving(arena::ref_external);
574b15aabb3Stbbdev     ta.my_arena.store(nullptr, std::memory_order_relaxed);
57551c0b2f7Stbbdev }
57651c0b2f7Stbbdev 
attach(d1::task_arena_base & ta)57751c0b2f7Stbbdev bool task_arena_impl::attach(d1::task_arena_base& ta) {
578b15aabb3Stbbdev     __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
57951c0b2f7Stbbdev     thread_data* td = governor::get_thread_data_if_initialized();
58051c0b2f7Stbbdev     if( td && td->my_arena ) {
581b15aabb3Stbbdev         arena* a = td->my_arena;
58251c0b2f7Stbbdev         // There is an active arena to attach to.
58351c0b2f7Stbbdev         // It's still used by s, so won't be destroyed right away.
58457f524caSIlya Isaev         __TBB_ASSERT(a->my_references > 0, nullptr);
585b15aabb3Stbbdev         a->my_references += arena::ref_external;
586b15aabb3Stbbdev         ta.my_num_reserved_slots = a->my_num_reserved_slots;
587b15aabb3Stbbdev         ta.my_priority = arena_priority(a->my_priority_level);
588b15aabb3Stbbdev         ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
589f8f7f738SPavel Kumbrasev         __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr);
590b15aabb3Stbbdev         ta.my_arena.store(a, std::memory_order_release);
591c4568449SPavel Kumbrasev         // increases threading_control's ref count for task_arena
592c4568449SPavel Kumbrasev         threading_control::register_public_reference();
59351c0b2f7Stbbdev         return true;
59451c0b2f7Stbbdev     }
59551c0b2f7Stbbdev     return false;
59651c0b2f7Stbbdev }
59751c0b2f7Stbbdev 
enqueue(d1::task & t,d1::task_group_context * c,d1::task_arena_base * ta)598478de5b1Stbbdev void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
59951c0b2f7Stbbdev     thread_data* td = governor::get_thread_data();  // thread data is only needed for FastRandom instance
600478de5b1Stbbdev     assert_pointer_valid(td, "thread_data pointer should not be null");
601478de5b1Stbbdev     arena* a = ta ?
602478de5b1Stbbdev               ta->my_arena.load(std::memory_order_relaxed)
603478de5b1Stbbdev             : td->my_arena
604478de5b1Stbbdev     ;
605478de5b1Stbbdev     assert_pointer_valid(a, "arena pointer should not be null");
606478de5b1Stbbdev     auto* ctx = c ? c : a->my_default_ctx;
607478de5b1Stbbdev     assert_pointer_valid(ctx, "context pointer should not be null");
608478de5b1Stbbdev     // Is there a better place for checking the state of ctx?
609b15aabb3Stbbdev      __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
610478de5b1Stbbdev                   "The task will not be executed because its task_group_context is cancelled.");
611478de5b1Stbbdev      a->enqueue_task(t, *ctx, *td);
61251c0b2f7Stbbdev }
61351c0b2f7Stbbdev 
61451c0b2f7Stbbdev class nested_arena_context : no_copy {
61551c0b2f7Stbbdev public:
nested_arena_context(thread_data & td,arena & nested_arena,std::size_t slot_index)61651c0b2f7Stbbdev     nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
61751c0b2f7Stbbdev         : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
61851c0b2f7Stbbdev     {
61951c0b2f7Stbbdev         if (td.my_arena != &nested_arena) {
62051c0b2f7Stbbdev             m_orig_arena = td.my_arena;
62151c0b2f7Stbbdev             m_orig_slot_index = td.my_arena_index;
62251c0b2f7Stbbdev             m_orig_last_observer = td.my_last_observer;
62351c0b2f7Stbbdev 
62451c0b2f7Stbbdev             td.detach_task_dispatcher();
62551c0b2f7Stbbdev             td.attach_arena(nested_arena, slot_index);
626478de5b1Stbbdev             if (td.my_inbox.is_idle_state(true))
627478de5b1Stbbdev                 td.my_inbox.set_is_idle(false);
62851c0b2f7Stbbdev             task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
629219c4252SAlex             td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
63051c0b2f7Stbbdev 
631b15aabb3Stbbdev             // If the calling thread occupies the slots out of external thread reserve we need to notify the
63251c0b2f7Stbbdev             // market that this arena requires one worker less.
63351c0b2f7Stbbdev             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
634c4568449SPavel Kumbrasev                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
63551c0b2f7Stbbdev             }
63651c0b2f7Stbbdev 
63751c0b2f7Stbbdev             td.my_last_observer = nullptr;
638b15aabb3Stbbdev             // The task_arena::execute method considers each calling thread as an external thread.
63951c0b2f7Stbbdev             td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
64051c0b2f7Stbbdev         }
64151c0b2f7Stbbdev 
64251c0b2f7Stbbdev         m_task_dispatcher = td.my_task_dispatcher;
64351c0b2f7Stbbdev         m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
64451c0b2f7Stbbdev         m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
64551c0b2f7Stbbdev         m_task_dispatcher->m_properties.critical_task_allowed = true;
64651c0b2f7Stbbdev 
64751c0b2f7Stbbdev         execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
64851c0b2f7Stbbdev         ed_ext.context = td.my_arena->my_default_ctx;
64951c0b2f7Stbbdev         ed_ext.original_slot = td.my_arena_index;
65051c0b2f7Stbbdev         ed_ext.affinity_slot = d1::no_slot;
65151c0b2f7Stbbdev         ed_ext.task_disp = td.my_task_dispatcher;
65251c0b2f7Stbbdev         ed_ext.isolation = no_isolation;
65351c0b2f7Stbbdev 
65451c0b2f7Stbbdev         __TBB_ASSERT(td.my_arena_slot, nullptr);
65551c0b2f7Stbbdev         __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
65651c0b2f7Stbbdev         __TBB_ASSERT(td.my_task_dispatcher, nullptr);
65751c0b2f7Stbbdev     }
~nested_arena_context()65851c0b2f7Stbbdev     ~nested_arena_context() {
65951c0b2f7Stbbdev         thread_data& td = *m_task_dispatcher->m_thread_data;
66051c0b2f7Stbbdev         __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
66151c0b2f7Stbbdev         m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
66251c0b2f7Stbbdev         m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
66351c0b2f7Stbbdev         if (m_orig_arena) {
66451c0b2f7Stbbdev             td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
66551c0b2f7Stbbdev             td.my_last_observer = m_orig_last_observer;
66651c0b2f7Stbbdev 
66751c0b2f7Stbbdev             // Notify the market that this thread releasing a one slot
66851c0b2f7Stbbdev             // that can be used by a worker thread.
66951c0b2f7Stbbdev             if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
670c4568449SPavel Kumbrasev                 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
67151c0b2f7Stbbdev             }
67251c0b2f7Stbbdev 
673219c4252SAlex             td.leave_task_dispatcher();
67451c0b2f7Stbbdev             td.my_arena_slot->release();
67551c0b2f7Stbbdev             td.my_arena->my_exit_monitors.notify_one(); // do not relax!
67651c0b2f7Stbbdev 
67751c0b2f7Stbbdev             td.attach_arena(*m_orig_arena, m_orig_slot_index);
67851c0b2f7Stbbdev             td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
679478de5b1Stbbdev             __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
68051c0b2f7Stbbdev         }
68151c0b2f7Stbbdev         td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
68251c0b2f7Stbbdev     }
68351c0b2f7Stbbdev 
68451c0b2f7Stbbdev private:
68551c0b2f7Stbbdev     execution_data_ext    m_orig_execute_data_ext{};
68651c0b2f7Stbbdev     arena*              m_orig_arena{ nullptr };
68751c0b2f7Stbbdev     observer_proxy*     m_orig_last_observer{ nullptr };
68851c0b2f7Stbbdev     task_dispatcher*    m_task_dispatcher{ nullptr };
68951c0b2f7Stbbdev     unsigned            m_orig_slot_index{};
69051c0b2f7Stbbdev     bool                m_orig_fifo_tasks_allowed{};
69151c0b2f7Stbbdev     bool                m_orig_critical_task_allowed{};
69251c0b2f7Stbbdev };
69351c0b2f7Stbbdev 
69451c0b2f7Stbbdev class delegated_task : public d1::task {
69551c0b2f7Stbbdev     d1::delegate_base&  m_delegate;
69651c0b2f7Stbbdev     concurrent_monitor& m_monitor;
69751c0b2f7Stbbdev     d1::wait_context&   m_wait_ctx;
69851c0b2f7Stbbdev     std::atomic<bool>   m_completed;
execute(d1::execution_data & ed)69951c0b2f7Stbbdev     d1::task* execute(d1::execution_data& ed) override {
70051c0b2f7Stbbdev         const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
70151c0b2f7Stbbdev         execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
70251c0b2f7Stbbdev         __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
70351c0b2f7Stbbdev             "The execute data shall point to the current task dispatcher execute data");
70451c0b2f7Stbbdev         __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
70551c0b2f7Stbbdev 
70651c0b2f7Stbbdev         ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
70751c0b2f7Stbbdev         bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
70851c0b2f7Stbbdev         try_call([&] {
70951c0b2f7Stbbdev             m_delegate();
71051c0b2f7Stbbdev         }).on_completion([&] {
71151c0b2f7Stbbdev             ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
71251c0b2f7Stbbdev             ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
71351c0b2f7Stbbdev         });
71451c0b2f7Stbbdev 
71551c0b2f7Stbbdev         finalize();
71651c0b2f7Stbbdev         return nullptr;
71751c0b2f7Stbbdev     }
cancel(d1::execution_data &)71851c0b2f7Stbbdev     d1::task* cancel(d1::execution_data&) override {
71951c0b2f7Stbbdev         finalize();
72051c0b2f7Stbbdev         return nullptr;
72151c0b2f7Stbbdev     }
finalize()72251c0b2f7Stbbdev     void finalize() {
72351c0b2f7Stbbdev         m_wait_ctx.release(); // must precede the wakeup
72451c0b2f7Stbbdev         m_monitor.notify([this] (std::uintptr_t ctx) {
72551c0b2f7Stbbdev             return ctx == std::uintptr_t(&m_delegate);
72651c0b2f7Stbbdev         }); // do not relax, it needs a fence!
72751c0b2f7Stbbdev         m_completed.store(true, std::memory_order_release);
72851c0b2f7Stbbdev     }
72951c0b2f7Stbbdev public:
delegated_task(d1::delegate_base & d,concurrent_monitor & s,d1::wait_context & wo)73051c0b2f7Stbbdev     delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
73151c0b2f7Stbbdev         : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
~delegated_task()732ba947f18SIlya Isaev     ~delegated_task() override {
73351c0b2f7Stbbdev         // The destructor can be called earlier than the m_monitor is notified
73451c0b2f7Stbbdev         // because the waiting thread can be released after m_wait_ctx.release_wait.
73551c0b2f7Stbbdev         // To close that race we wait for the m_completed signal.
73651c0b2f7Stbbdev         spin_wait_until_eq(m_completed, true);
73751c0b2f7Stbbdev     }
73851c0b2f7Stbbdev };
73951c0b2f7Stbbdev 
execute(d1::task_arena_base & ta,d1::delegate_base & d)74051c0b2f7Stbbdev void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
741b15aabb3Stbbdev     arena* a = ta.my_arena.load(std::memory_order_relaxed);
742b15aabb3Stbbdev     __TBB_ASSERT(a != nullptr, nullptr);
74351c0b2f7Stbbdev     thread_data* td = governor::get_thread_data();
74451c0b2f7Stbbdev 
745b15aabb3Stbbdev     bool same_arena = td->my_arena == a;
74651c0b2f7Stbbdev     std::size_t index1 = td->my_arena_index;
74751c0b2f7Stbbdev     if (!same_arena) {
748b15aabb3Stbbdev         index1 = a->occupy_free_slot</*as_worker */false>(*td);
74951c0b2f7Stbbdev         if (index1 == arena::out_of_arena) {
7508dcbd5b1Stbbdev             concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
75151c0b2f7Stbbdev             d1::wait_context wo(1);
75251c0b2f7Stbbdev             d1::task_group_context exec_context(d1::task_group_context::isolated);
753b15aabb3Stbbdev             task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
75451c0b2f7Stbbdev 
755b15aabb3Stbbdev             delegated_task dt(d, a->my_exit_monitors, wo);
756b15aabb3Stbbdev             a->enqueue_task( dt, exec_context, *td);
75751c0b2f7Stbbdev             size_t index2 = arena::out_of_arena;
75851c0b2f7Stbbdev             do {
759b15aabb3Stbbdev                 a->my_exit_monitors.prepare_wait(waiter);
76051c0b2f7Stbbdev                 if (!wo.continue_execution()) {
761b15aabb3Stbbdev                     a->my_exit_monitors.cancel_wait(waiter);
76251c0b2f7Stbbdev                     break;
76351c0b2f7Stbbdev                 }
764b15aabb3Stbbdev                 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
76551c0b2f7Stbbdev                 if (index2 != arena::out_of_arena) {
766b15aabb3Stbbdev                     a->my_exit_monitors.cancel_wait(waiter);
767b15aabb3Stbbdev                     nested_arena_context scope(*td, *a, index2 );
76851c0b2f7Stbbdev                     r1::wait(wo, exec_context);
769a080baf9SAlex                     __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
77051c0b2f7Stbbdev                     break;
77151c0b2f7Stbbdev                 }
772b15aabb3Stbbdev                 a->my_exit_monitors.commit_wait(waiter);
77351c0b2f7Stbbdev             } while (wo.continue_execution());
77451c0b2f7Stbbdev             if (index2 == arena::out_of_arena) {
77551c0b2f7Stbbdev                 // notify a waiting thread even if this thread did not enter arena,
77651c0b2f7Stbbdev                 // in case it was woken by a leaving thread but did not need to enter
777b15aabb3Stbbdev                 a->my_exit_monitors.notify_one(); // do not relax!
77851c0b2f7Stbbdev             }
77951c0b2f7Stbbdev             // process possible exception
780a080baf9SAlex             auto exception = exec_context.my_exception.load(std::memory_order_acquire);
781a080baf9SAlex             if (exception) {
78251c0b2f7Stbbdev                 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
783a080baf9SAlex                 exception->throw_self();
78451c0b2f7Stbbdev             }
78551c0b2f7Stbbdev             __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
78651c0b2f7Stbbdev             return;
78751c0b2f7Stbbdev         } // if (index1 == arena::out_of_arena)
78851c0b2f7Stbbdev     } // if (!same_arena)
78951c0b2f7Stbbdev 
79051c0b2f7Stbbdev     context_guard_helper</*report_tasks=*/false> context_guard;
791b15aabb3Stbbdev     context_guard.set_ctx(a->my_default_ctx);
792b15aabb3Stbbdev     nested_arena_context scope(*td, *a, index1);
79351c0b2f7Stbbdev #if _WIN64
79451c0b2f7Stbbdev     try {
79551c0b2f7Stbbdev #endif
79651c0b2f7Stbbdev         d();
79751c0b2f7Stbbdev         __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
79851c0b2f7Stbbdev #if _WIN64
79951c0b2f7Stbbdev     } catch (...) {
80051c0b2f7Stbbdev         context_guard.restore_default();
80151c0b2f7Stbbdev         throw;
80251c0b2f7Stbbdev     }
80351c0b2f7Stbbdev #endif
80451c0b2f7Stbbdev }
80551c0b2f7Stbbdev 
wait(d1::task_arena_base & ta)80651c0b2f7Stbbdev void task_arena_impl::wait(d1::task_arena_base& ta) {
807b15aabb3Stbbdev     arena* a = ta.my_arena.load(std::memory_order_relaxed);
808b15aabb3Stbbdev     __TBB_ASSERT(a != nullptr, nullptr);
80951c0b2f7Stbbdev     thread_data* td = governor::get_thread_data();
81051c0b2f7Stbbdev     __TBB_ASSERT_EX(td, "Scheduler is not initialized");
811b15aabb3Stbbdev     __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
812b15aabb3Stbbdev     if (a->my_max_num_workers != 0) {
813c4568449SPavel Kumbrasev         while (a->num_workers_active() || !a->is_empty()) {
81451c0b2f7Stbbdev             yield();
81551c0b2f7Stbbdev         }
81651c0b2f7Stbbdev     }
81751c0b2f7Stbbdev }
81851c0b2f7Stbbdev 
max_concurrency(const d1::task_arena_base * ta)81951c0b2f7Stbbdev int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
82051c0b2f7Stbbdev     arena* a = nullptr;
82151c0b2f7Stbbdev     if( ta ) // for special cases of ta->max_concurrency()
822b15aabb3Stbbdev         a = ta->my_arena.load(std::memory_order_relaxed);
82351c0b2f7Stbbdev     else if( thread_data* td = governor::get_thread_data_if_initialized() )
82451c0b2f7Stbbdev         a = td->my_arena; // the current arena if any
82551c0b2f7Stbbdev 
82651c0b2f7Stbbdev     if( a ) { // Get parameters from the arena
82757f524caSIlya Isaev         __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
828c4568449SPavel Kumbrasev         int mandatory_worker = 0;
829c4568449SPavel Kumbrasev         if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) {
830c4568449SPavel Kumbrasev             mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0;
831c4568449SPavel Kumbrasev         }
832c4568449SPavel Kumbrasev         return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker;
83351c0b2f7Stbbdev     }
83451c0b2f7Stbbdev 
83551c0b2f7Stbbdev     if (ta && ta->my_max_concurrency == 1) {
83651c0b2f7Stbbdev         return 1;
83751c0b2f7Stbbdev     }
83851c0b2f7Stbbdev 
839b15aabb3Stbbdev #if __TBB_ARENA_BINDING
840b15aabb3Stbbdev     if (ta) {
841b15aabb3Stbbdev         d1::constraints arena_constraints = d1::constraints{}
842b15aabb3Stbbdev             .set_numa_id(ta->my_numa_id)
843b15aabb3Stbbdev             .set_core_type(ta->core_type())
844b15aabb3Stbbdev             .set_max_threads_per_core(ta->max_threads_per_core());
845b15aabb3Stbbdev         return (int)default_concurrency(arena_constraints);
846b15aabb3Stbbdev     }
847b15aabb3Stbbdev #endif /*!__TBB_ARENA_BINDING*/
848b15aabb3Stbbdev 
84957f524caSIlya Isaev     __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
85051c0b2f7Stbbdev     return int(governor::default_num_threads());
85151c0b2f7Stbbdev }
85251c0b2f7Stbbdev 
isolate_within_arena(d1::delegate_base & d,std::intptr_t isolation)85351c0b2f7Stbbdev void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
85451c0b2f7Stbbdev     // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
85551c0b2f7Stbbdev     thread_data* tls = governor::get_thread_data();
85651c0b2f7Stbbdev     assert_pointers_valid(tls, tls->my_task_dispatcher);
85751c0b2f7Stbbdev     task_dispatcher* dispatcher = tls->my_task_dispatcher;
85851c0b2f7Stbbdev     isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
85951c0b2f7Stbbdev     try_call([&] {
86051c0b2f7Stbbdev         // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
86151c0b2f7Stbbdev         isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
86251c0b2f7Stbbdev         // Save the current isolation value and set new one
86351c0b2f7Stbbdev         previous_isolation = dispatcher->set_isolation(current_isolation);
86451c0b2f7Stbbdev         // Isolation within this callable
86551c0b2f7Stbbdev         d();
86651c0b2f7Stbbdev     }).on_completion([&] {
86757f524caSIlya Isaev         __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
86851c0b2f7Stbbdev         dispatcher->set_isolation(previous_isolation);
86951c0b2f7Stbbdev     });
87051c0b2f7Stbbdev }
87151c0b2f7Stbbdev 
87251c0b2f7Stbbdev } // namespace r1
87351c0b2f7Stbbdev } // namespace detail
87451c0b2f7Stbbdev } // namespace tbb
875