151c0b2f7Stbbdev /*
2f8f7f738SPavel Kumbrasev Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev #include "task_dispatcher.h"
1851c0b2f7Stbbdev #include "governor.h"
19c4568449SPavel Kumbrasev #include "threading_control.h"
2051c0b2f7Stbbdev #include "arena.h"
2151c0b2f7Stbbdev #include "itt_notify.h"
2251c0b2f7Stbbdev #include "semaphore.h"
2351c0b2f7Stbbdev #include "waiters.h"
2449e08aacStbbdev #include "oneapi/tbb/detail/_task.h"
25b15aabb3Stbbdev #include "oneapi/tbb/info.h"
2649e08aacStbbdev #include "oneapi/tbb/tbb_allocator.h"
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev #include <atomic>
2951c0b2f7Stbbdev #include <cstring>
3051c0b2f7Stbbdev #include <functional>
3151c0b2f7Stbbdev
3251c0b2f7Stbbdev namespace tbb {
3351c0b2f7Stbbdev namespace detail {
3451c0b2f7Stbbdev namespace r1 {
3551c0b2f7Stbbdev
36b15aabb3Stbbdev #if __TBB_ARENA_BINDING
3751c0b2f7Stbbdev class numa_binding_observer : public tbb::task_scheduler_observer {
3851c0b2f7Stbbdev binding_handler* my_binding_handler;
3951c0b2f7Stbbdev public:
numa_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)40b15aabb3Stbbdev numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
4151c0b2f7Stbbdev : task_scheduler_observer(*ta)
42b15aabb3Stbbdev , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
4351c0b2f7Stbbdev {}
4451c0b2f7Stbbdev
on_scheduler_entry(bool)4551c0b2f7Stbbdev void on_scheduler_entry( bool ) override {
46b15aabb3Stbbdev apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
4751c0b2f7Stbbdev }
4851c0b2f7Stbbdev
on_scheduler_exit(bool)4951c0b2f7Stbbdev void on_scheduler_exit( bool ) override {
5051c0b2f7Stbbdev restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
5151c0b2f7Stbbdev }
5251c0b2f7Stbbdev
~numa_binding_observer()53ba947f18SIlya Isaev ~numa_binding_observer() override{
5451c0b2f7Stbbdev destroy_binding_handler(my_binding_handler);
5551c0b2f7Stbbdev }
5651c0b2f7Stbbdev };
5751c0b2f7Stbbdev
construct_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)58b15aabb3Stbbdev numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
5951c0b2f7Stbbdev numa_binding_observer* binding_observer = nullptr;
60b15aabb3Stbbdev if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
61b15aabb3Stbbdev binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
6251c0b2f7Stbbdev __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
6351c0b2f7Stbbdev binding_observer->observe(true);
6451c0b2f7Stbbdev }
6551c0b2f7Stbbdev return binding_observer;
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev
destroy_binding_observer(numa_binding_observer * binding_observer)6851c0b2f7Stbbdev void destroy_binding_observer( numa_binding_observer* binding_observer ) {
6957f524caSIlya Isaev __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
7051c0b2f7Stbbdev binding_observer->observe(false);
7151c0b2f7Stbbdev binding_observer->~numa_binding_observer();
7251c0b2f7Stbbdev deallocate_memory(binding_observer);
7351c0b2f7Stbbdev }
74b15aabb3Stbbdev #endif /*!__TBB_ARENA_BINDING*/
7551c0b2f7Stbbdev
on_thread_leaving(unsigned ref_param)76c4568449SPavel Kumbrasev void arena::on_thread_leaving(unsigned ref_param) {
77c4568449SPavel Kumbrasev //
78c4568449SPavel Kumbrasev // Implementation of arena destruction synchronization logic contained various
79c4568449SPavel Kumbrasev // bugs/flaws at the different stages of its evolution, so below is a detailed
80c4568449SPavel Kumbrasev // description of the issues taken into consideration in the framework of the
81c4568449SPavel Kumbrasev // current design.
82c4568449SPavel Kumbrasev //
83c4568449SPavel Kumbrasev // In case of using fire-and-forget tasks (scheduled via task::enqueue())
84c4568449SPavel Kumbrasev // external thread is allowed to leave its arena before all its work is executed,
85c4568449SPavel Kumbrasev // and market may temporarily revoke all workers from this arena. Since revoked
86c4568449SPavel Kumbrasev // workers never attempt to reset arena state to EMPTY and cancel its request
87c4568449SPavel Kumbrasev // to RML for threads, the arena object is destroyed only when both the last
88c4568449SPavel Kumbrasev // thread is leaving it and arena's state is EMPTY (that is its external thread
89c4568449SPavel Kumbrasev // left and it does not contain any work).
90c4568449SPavel Kumbrasev // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
91c4568449SPavel Kumbrasev // be done here (or anywhere else in the external thread to that matter); doing so
92c4568449SPavel Kumbrasev // can result either in arena's premature destruction (at least without
93c4568449SPavel Kumbrasev // additional costly checks in workers) or in unnecessary arena state changes
94c4568449SPavel Kumbrasev // (and ensuing workers migration).
95c4568449SPavel Kumbrasev //
96c4568449SPavel Kumbrasev // A worker that checks for work presence and transitions arena to the EMPTY
97c4568449SPavel Kumbrasev // state (in snapshot taking procedure arena::out_of_work()) updates
98c4568449SPavel Kumbrasev // arena::my_pool_state first and only then arena::my_num_workers_requested.
99c4568449SPavel Kumbrasev // So the check for work absence must be done against the latter field.
100c4568449SPavel Kumbrasev //
101c4568449SPavel Kumbrasev // In a time window between decrementing the active threads count and checking
102c4568449SPavel Kumbrasev // if there is an outstanding request for workers. New worker thread may arrive,
103c4568449SPavel Kumbrasev // finish remaining work, set arena state to empty, and leave decrementing its
104c4568449SPavel Kumbrasev // refcount and destroying. Then the current thread will destroy the arena
105c4568449SPavel Kumbrasev // the second time. To preclude it a local copy of the outstanding request
106c4568449SPavel Kumbrasev // value can be stored before decrementing active threads count.
107c4568449SPavel Kumbrasev //
108c4568449SPavel Kumbrasev // But this technique may cause two other problem. When the stored request is
109c4568449SPavel Kumbrasev // zero, it is possible that arena still has threads and they can generate new
110c4568449SPavel Kumbrasev // tasks and thus re-establish non-zero requests. Then all the threads can be
111c4568449SPavel Kumbrasev // revoked (as described above) leaving this thread the last one, and causing
112c4568449SPavel Kumbrasev // it to destroy non-empty arena.
113c4568449SPavel Kumbrasev //
114c4568449SPavel Kumbrasev // The other problem takes place when the stored request is non-zero. Another
115c4568449SPavel Kumbrasev // thread may complete the work, set arena state to empty, and leave without
116c4568449SPavel Kumbrasev // arena destruction before this thread decrements the refcount. This thread
117c4568449SPavel Kumbrasev // cannot destroy the arena either. Thus the arena may be "orphaned".
118c4568449SPavel Kumbrasev //
119c4568449SPavel Kumbrasev // In both cases we cannot dereference arena pointer after the refcount is
120c4568449SPavel Kumbrasev // decremented, as our arena may already be destroyed.
121c4568449SPavel Kumbrasev //
122c4568449SPavel Kumbrasev // If this is the external thread, the market is protected by refcount to it.
123c4568449SPavel Kumbrasev // In case of workers market's liveness is ensured by the RML connection
124c4568449SPavel Kumbrasev // rundown protocol, according to which the client (i.e. the market) lives
125c4568449SPavel Kumbrasev // until RML server notifies it about connection termination, and this
126c4568449SPavel Kumbrasev // notification is fired only after all workers return into RML.
127c4568449SPavel Kumbrasev //
128c4568449SPavel Kumbrasev // Thus if we decremented refcount to zero we ask the market to check arena
129c4568449SPavel Kumbrasev // state (including the fact if it is alive) under the lock.
130c4568449SPavel Kumbrasev //
131c4568449SPavel Kumbrasev
132c4568449SPavel Kumbrasev __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
133c4568449SPavel Kumbrasev
134c4568449SPavel Kumbrasev // When there is no workers someone must free arena, as
135c4568449SPavel Kumbrasev // without workers, no one calls out_of_work().
136c4568449SPavel Kumbrasev if (ref_param == ref_external && !my_mandatory_concurrency.test()) {
137c4568449SPavel Kumbrasev out_of_work();
138c4568449SPavel Kumbrasev }
139c4568449SPavel Kumbrasev
140c4568449SPavel Kumbrasev threading_control* tc = my_threading_control;
141c4568449SPavel Kumbrasev auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client);
142c4568449SPavel Kumbrasev // Release our reference to sync with destroy_client
143c4568449SPavel Kumbrasev unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
144c4568449SPavel Kumbrasev // do not access `this` it might be destroyed already
145c4568449SPavel Kumbrasev if (remaining_ref == 0) {
146c4568449SPavel Kumbrasev if (tc->try_destroy_client(tc_client_snapshot)) {
147c4568449SPavel Kumbrasev // We are requested to destroy ourself
148c4568449SPavel Kumbrasev free_arena();
149c4568449SPavel Kumbrasev }
150c4568449SPavel Kumbrasev }
151c4568449SPavel Kumbrasev }
152c4568449SPavel Kumbrasev
occupy_free_slot_in_range(thread_data & tls,std::size_t lower,std::size_t upper)15351c0b2f7Stbbdev std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
15451c0b2f7Stbbdev if ( lower >= upper ) return out_of_arena;
15551c0b2f7Stbbdev // Start search for an empty slot from the one we occupied the last time
15651c0b2f7Stbbdev std::size_t index = tls.my_arena_index;
15751c0b2f7Stbbdev if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
15857f524caSIlya Isaev __TBB_ASSERT( index >= lower && index < upper, nullptr);
15951c0b2f7Stbbdev // Find a free slot
16051c0b2f7Stbbdev for ( std::size_t i = index; i < upper; ++i )
16151c0b2f7Stbbdev if (my_slots[i].try_occupy()) return i;
16251c0b2f7Stbbdev for ( std::size_t i = lower; i < index; ++i )
16351c0b2f7Stbbdev if (my_slots[i].try_occupy()) return i;
16451c0b2f7Stbbdev return out_of_arena;
16551c0b2f7Stbbdev }
16651c0b2f7Stbbdev
16751c0b2f7Stbbdev template <bool as_worker>
occupy_free_slot(thread_data & tls)16851c0b2f7Stbbdev std::size_t arena::occupy_free_slot(thread_data& tls) {
169b15aabb3Stbbdev // Firstly, external threads try to occupy reserved slots
17051c0b2f7Stbbdev std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots );
17151c0b2f7Stbbdev if ( index == out_of_arena ) {
17251c0b2f7Stbbdev // Secondly, all threads try to occupy all non-reserved slots
17351c0b2f7Stbbdev index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
17451c0b2f7Stbbdev // Likely this arena is already saturated
17551c0b2f7Stbbdev if ( index == out_of_arena )
17651c0b2f7Stbbdev return out_of_arena;
17751c0b2f7Stbbdev }
17851c0b2f7Stbbdev
17951c0b2f7Stbbdev atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
18051c0b2f7Stbbdev return index;
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev
calculate_stealing_threshold()18351c0b2f7Stbbdev std::uintptr_t arena::calculate_stealing_threshold() {
18451c0b2f7Stbbdev stack_anchor_type anchor;
185c4568449SPavel Kumbrasev return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size());
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev
process(thread_data & tls)18851c0b2f7Stbbdev void arena::process(thread_data& tls) {
18951c0b2f7Stbbdev governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
19051c0b2f7Stbbdev __TBB_ASSERT( is_alive(my_guard), nullptr);
191f8f7f738SPavel Kumbrasev __TBB_ASSERT( my_num_slots >= 1, nullptr);
19251c0b2f7Stbbdev
19351c0b2f7Stbbdev std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
19451c0b2f7Stbbdev if (index == out_of_arena) {
195c4568449SPavel Kumbrasev on_thread_leaving(ref_worker);
19651c0b2f7Stbbdev return;
19751c0b2f7Stbbdev }
198c4568449SPavel Kumbrasev
19971e1bb8eSPavel Kumbrasev my_tc_client.get_pm_client()->register_thread();
20071e1bb8eSPavel Kumbrasev
20151c0b2f7Stbbdev __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
20251c0b2f7Stbbdev tls.attach_arena(*this, index);
203478de5b1Stbbdev // worker thread enters the dispatch loop to look for a work
204478de5b1Stbbdev tls.my_inbox.set_is_idle(true);
205478de5b1Stbbdev if (tls.my_arena_slot->is_task_pool_published()) {
206478de5b1Stbbdev tls.my_inbox.set_is_idle(false);
207478de5b1Stbbdev }
20851c0b2f7Stbbdev
20951c0b2f7Stbbdev task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
210219c4252SAlex tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
21151c0b2f7Stbbdev __TBB_ASSERT(task_disp.can_steal(), nullptr);
21251c0b2f7Stbbdev
21351c0b2f7Stbbdev __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
21451c0b2f7Stbbdev my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
21551c0b2f7Stbbdev
21651c0b2f7Stbbdev // Waiting on special object tied to this arena
21751c0b2f7Stbbdev outermost_worker_waiter waiter(*this);
21851c0b2f7Stbbdev d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
219478de5b1Stbbdev // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
220478de5b1Stbbdev // attached to it.
221478de5b1Stbbdev tls.my_inbox.set_is_idle(true);
222478de5b1Stbbdev
22351c0b2f7Stbbdev __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
22451c0b2f7Stbbdev __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
22551c0b2f7Stbbdev __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
22651c0b2f7Stbbdev
22751c0b2f7Stbbdev my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
22851c0b2f7Stbbdev tls.my_last_observer = nullptr;
22951c0b2f7Stbbdev
230219c4252SAlex tls.leave_task_dispatcher();
23151c0b2f7Stbbdev
23251c0b2f7Stbbdev // Arena slot detach (arena may be used in market::process)
23351c0b2f7Stbbdev // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
23451c0b2f7Stbbdev tls.my_arena_slot->release();
23551c0b2f7Stbbdev tls.my_arena_slot = nullptr;
23651c0b2f7Stbbdev tls.my_inbox.detach();
23751c0b2f7Stbbdev __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
23851c0b2f7Stbbdev __TBB_ASSERT(is_alive(my_guard), nullptr);
23951c0b2f7Stbbdev
24071e1bb8eSPavel Kumbrasev my_tc_client.get_pm_client()->unregister_thread();
24171e1bb8eSPavel Kumbrasev
24251c0b2f7Stbbdev // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
24351c0b2f7Stbbdev // that arena may be temporarily left unpopulated by threads. See comments in
24451c0b2f7Stbbdev // arena::on_thread_leaving() for more details.
245c4568449SPavel Kumbrasev on_thread_leaving(ref_worker);
24651c0b2f7Stbbdev __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
24751c0b2f7Stbbdev }
24851c0b2f7Stbbdev
arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)249c4568449SPavel Kumbrasev arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) {
25051c0b2f7Stbbdev __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
25151c0b2f7Stbbdev __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
25251c0b2f7Stbbdev __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
253c4568449SPavel Kumbrasev my_threading_control = control;
25451c0b2f7Stbbdev my_limit = 1;
255b15aabb3Stbbdev // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
256f8f7f738SPavel Kumbrasev my_num_slots = num_arena_slots(num_slots, num_reserved_slots);
25751c0b2f7Stbbdev my_num_reserved_slots = num_reserved_slots;
25851c0b2f7Stbbdev my_max_num_workers = num_slots-num_reserved_slots;
25951c0b2f7Stbbdev my_priority_level = priority_level;
260b15aabb3Stbbdev my_references = ref_external; // accounts for the external thread
26151c0b2f7Stbbdev my_observers.my_arena = this;
26251c0b2f7Stbbdev my_co_cache.init(4 * num_slots);
26357f524caSIlya Isaev __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
26451c0b2f7Stbbdev // Initialize the default context. It should be allocated before task_dispatch construction.
26551c0b2f7Stbbdev my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
26651c0b2f7Stbbdev d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
26751c0b2f7Stbbdev // Construct slots. Mark internal synchronization elements for the tools.
26851c0b2f7Stbbdev task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
26951c0b2f7Stbbdev for( unsigned i = 0; i < my_num_slots; ++i ) {
27057f524caSIlya Isaev // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
27157f524caSIlya Isaev __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
27257f524caSIlya Isaev __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
27351c0b2f7Stbbdev mailbox(i).construct();
27451c0b2f7Stbbdev my_slots[i].init_task_streams(i);
27551c0b2f7Stbbdev my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
27651c0b2f7Stbbdev my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
27751c0b2f7Stbbdev }
27851c0b2f7Stbbdev my_fifo_task_stream.initialize(my_num_slots);
27951c0b2f7Stbbdev my_resume_task_stream.initialize(my_num_slots);
28051c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
28151c0b2f7Stbbdev my_critical_task_stream.initialize(my_num_slots);
28251c0b2f7Stbbdev #endif
283c4568449SPavel Kumbrasev my_mandatory_requests = 0;
28451c0b2f7Stbbdev }
28551c0b2f7Stbbdev
allocate_arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)286c4568449SPavel Kumbrasev arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
28751c0b2f7Stbbdev unsigned priority_level)
28851c0b2f7Stbbdev {
28951c0b2f7Stbbdev __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
29051c0b2f7Stbbdev __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
29151c0b2f7Stbbdev __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
292f8f7f738SPavel Kumbrasev std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots));
29351c0b2f7Stbbdev unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
29451c0b2f7Stbbdev // Zero all slots to indicate that they are empty
29551c0b2f7Stbbdev std::memset( storage, 0, n );
296c4568449SPavel Kumbrasev
297f8f7f738SPavel Kumbrasev return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
298c4568449SPavel Kumbrasev arena(control, num_slots, num_reserved_slots, priority_level);
29951c0b2f7Stbbdev }
30051c0b2f7Stbbdev
free_arena()30151c0b2f7Stbbdev void arena::free_arena () {
30257f524caSIlya Isaev __TBB_ASSERT( is_alive(my_guard), nullptr);
30351c0b2f7Stbbdev __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
304c4568449SPavel Kumbrasev __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
305c4568449SPavel Kumbrasev __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" );
306edc30c82SIvan Kochin #if __TBB_ARENA_BINDING
307edc30c82SIvan Kochin if (my_numa_binding_observer != nullptr) {
308edc30c82SIvan Kochin destroy_binding_observer(my_numa_binding_observer);
309edc30c82SIvan Kochin my_numa_binding_observer = nullptr;
310edc30c82SIvan Kochin }
311edc30c82SIvan Kochin #endif /*__TBB_ARENA_BINDING*/
31251c0b2f7Stbbdev poison_value( my_guard );
31351c0b2f7Stbbdev for ( unsigned i = 0; i < my_num_slots; ++i ) {
31451c0b2f7Stbbdev // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
31551c0b2f7Stbbdev // TODO: understand the assertion and modify
31657f524caSIlya Isaev // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
31757f524caSIlya Isaev __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
31851c0b2f7Stbbdev my_slots[i].free_task_pool();
319933a38a4SKhem Raj mailbox(i).drain();
32051c0b2f7Stbbdev my_slots[i].my_default_task_dispatcher->~task_dispatcher();
32151c0b2f7Stbbdev }
32251c0b2f7Stbbdev __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
32351c0b2f7Stbbdev __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
32451c0b2f7Stbbdev // Cleanup coroutines/schedulers cache
32551c0b2f7Stbbdev my_co_cache.cleanup();
32651c0b2f7Stbbdev my_default_ctx->~task_group_context();
32751c0b2f7Stbbdev cache_aligned_deallocate(my_default_ctx);
32851c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
32951c0b2f7Stbbdev __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
33051c0b2f7Stbbdev #endif
331a080baf9SAlex // Clear enfources synchronization with observe(false)
33251c0b2f7Stbbdev my_observers.clear();
333a080baf9SAlex
33451c0b2f7Stbbdev void* storage = &mailbox(my_num_slots-1);
33557f524caSIlya Isaev __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
33651c0b2f7Stbbdev this->~arena();
33751c0b2f7Stbbdev #if TBB_USE_ASSERT > 1
33851c0b2f7Stbbdev std::memset( storage, 0, allocation_size(my_num_slots) );
33951c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
34051c0b2f7Stbbdev cache_aligned_deallocate( storage );
34151c0b2f7Stbbdev }
34251c0b2f7Stbbdev
has_enqueued_tasks()34351c0b2f7Stbbdev bool arena::has_enqueued_tasks() {
34451c0b2f7Stbbdev return !my_fifo_task_stream.empty();
34551c0b2f7Stbbdev }
34651c0b2f7Stbbdev
request_workers(int mandatory_delta,int workers_delta,bool wakeup_threads)347c4568449SPavel Kumbrasev void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) {
348c4568449SPavel Kumbrasev my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta);
349b15aabb3Stbbdev
350c4568449SPavel Kumbrasev if (wakeup_threads) {
351c4568449SPavel Kumbrasev // Notify all sleeping threads that work has appeared in the arena.
352c4568449SPavel Kumbrasev get_waiting_threads_monitor().notify([&] (market_context context) {
353c4568449SPavel Kumbrasev return this == context.my_arena_addr;
354c4568449SPavel Kumbrasev });
355c4568449SPavel Kumbrasev }
356c4568449SPavel Kumbrasev }
357c4568449SPavel Kumbrasev
has_tasks()358c4568449SPavel Kumbrasev bool arena::has_tasks() {
35951c0b2f7Stbbdev // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
36051c0b2f7Stbbdev std::size_t n = my_limit.load(std::memory_order_acquire);
361c4568449SPavel Kumbrasev bool tasks_are_available = false;
362c4568449SPavel Kumbrasev for (std::size_t k = 0; k < n && !tasks_are_available; ++k) {
363c4568449SPavel Kumbrasev tasks_are_available = !my_slots[k].is_empty();
36451c0b2f7Stbbdev }
365c4568449SPavel Kumbrasev tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty();
36651c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
367c4568449SPavel Kumbrasev tasks_are_available = tasks_are_available || !my_critical_task_stream.empty();
36851c0b2f7Stbbdev #endif
369c4568449SPavel Kumbrasev return tasks_are_available;
370c4568449SPavel Kumbrasev }
371c4568449SPavel Kumbrasev
out_of_work()372c4568449SPavel Kumbrasev void arena::out_of_work() {
373c4568449SPavel Kumbrasev // We should try unset my_pool_state first due to keep arena invariants in consistent state
374c4568449SPavel Kumbrasev // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant
375c4568449SPavel Kumbrasev bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); });
376c4568449SPavel Kumbrasev bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); });
377c4568449SPavel Kumbrasev
378c4568449SPavel Kumbrasev if (disable_mandatory || release_workers) {
379c4568449SPavel Kumbrasev int mandatory_delta = disable_mandatory ? -1 : 0;
380c4568449SPavel Kumbrasev int workers_delta = release_workers ? -(int)my_max_num_workers : 0;
381c4568449SPavel Kumbrasev
382c4568449SPavel Kumbrasev if (disable_mandatory && is_arena_workerless()) {
383c4568449SPavel Kumbrasev // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now
384c4568449SPavel Kumbrasev workers_delta = -1;
385c4568449SPavel Kumbrasev }
386c4568449SPavel Kumbrasev request_workers(mandatory_delta, workers_delta);
387c4568449SPavel Kumbrasev }
388c4568449SPavel Kumbrasev }
389c4568449SPavel Kumbrasev
set_top_priority(bool is_top_priority)390c4568449SPavel Kumbrasev void arena::set_top_priority(bool is_top_priority) {
391c4568449SPavel Kumbrasev my_is_top_priority.store(is_top_priority, std::memory_order_relaxed);
392c4568449SPavel Kumbrasev }
393c4568449SPavel Kumbrasev
is_top_priority() const394c4568449SPavel Kumbrasev bool arena::is_top_priority() const {
395c4568449SPavel Kumbrasev return my_is_top_priority.load(std::memory_order_relaxed);
396c4568449SPavel Kumbrasev }
397c4568449SPavel Kumbrasev
try_join()398c4568449SPavel Kumbrasev bool arena::try_join() {
399c4568449SPavel Kumbrasev if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) {
400c4568449SPavel Kumbrasev my_references += arena::ref_worker;
40151c0b2f7Stbbdev return true;
40251c0b2f7Stbbdev }
40351c0b2f7Stbbdev return false;
40451c0b2f7Stbbdev }
405c4568449SPavel Kumbrasev
set_allotment(unsigned allotment)406c4568449SPavel Kumbrasev void arena::set_allotment(unsigned allotment) {
407c4568449SPavel Kumbrasev if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) {
408c4568449SPavel Kumbrasev my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
40951c0b2f7Stbbdev }
41051c0b2f7Stbbdev }
411c4568449SPavel Kumbrasev
update_concurrency(unsigned allotment)41271e1bb8eSPavel Kumbrasev int arena::update_concurrency(unsigned allotment) {
413*7b022651SPavel Kumbrasev int delta = allotment - my_num_workers_allotted.load(std::memory_order_relaxed);
414*7b022651SPavel Kumbrasev if (delta != 0) {
415*7b022651SPavel Kumbrasev my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
416*7b022651SPavel Kumbrasev }
417*7b022651SPavel Kumbrasev return delta;
41871e1bb8eSPavel Kumbrasev }
41971e1bb8eSPavel Kumbrasev
update_request(int mandatory_delta,int workers_delta)420c4568449SPavel Kumbrasev std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) {
421c4568449SPavel Kumbrasev __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
422c4568449SPavel Kumbrasev
423c4568449SPavel Kumbrasev int min_workers_request = 0;
424c4568449SPavel Kumbrasev int max_workers_request = 0;
425c4568449SPavel Kumbrasev
426c4568449SPavel Kumbrasev // Calculate min request
427c4568449SPavel Kumbrasev my_mandatory_requests += mandatory_delta;
428c4568449SPavel Kumbrasev min_workers_request = my_mandatory_requests > 0 ? 1 : 0;
429c4568449SPavel Kumbrasev
430c4568449SPavel Kumbrasev // Calculate max request
431c4568449SPavel Kumbrasev my_total_num_workers_requested += workers_delta;
432c4568449SPavel Kumbrasev // Clamp worker request into interval [0, my_max_num_workers]
433c4568449SPavel Kumbrasev max_workers_request = clamp(my_total_num_workers_requested, 0,
434c4568449SPavel Kumbrasev min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers);
435c4568449SPavel Kumbrasev
436c4568449SPavel Kumbrasev return { min_workers_request, max_workers_request };
43751c0b2f7Stbbdev }
438c4568449SPavel Kumbrasev
get_waiting_threads_monitor()439c4568449SPavel Kumbrasev thread_control_monitor& arena::get_waiting_threads_monitor() {
440c4568449SPavel Kumbrasev return my_threading_control->get_waiting_threads_monitor();
44151c0b2f7Stbbdev }
44251c0b2f7Stbbdev
enqueue_task(d1::task & t,d1::task_group_context & ctx,thread_data & td)44351c0b2f7Stbbdev void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
44451c0b2f7Stbbdev task_group_context_impl::bind_to(ctx, &td);
44551c0b2f7Stbbdev task_accessor::context(t) = &ctx;
44651c0b2f7Stbbdev task_accessor::isolation(t) = no_isolation;
44751c0b2f7Stbbdev my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
44851c0b2f7Stbbdev advertise_new_work<work_enqueued>();
44951c0b2f7Stbbdev }
45051c0b2f7Stbbdev
create(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned arena_priority_level,d1::constraints constraints)45171e1bb8eSPavel Kumbrasev arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) {
452c4568449SPavel Kumbrasev __TBB_ASSERT(num_slots > 0, NULL);
453c4568449SPavel Kumbrasev __TBB_ASSERT(num_reserved_slots <= num_slots, NULL);
454c4568449SPavel Kumbrasev // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
455c4568449SPavel Kumbrasev arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level);
456c4568449SPavel Kumbrasev a.my_tc_client = control->create_client(a);
457c4568449SPavel Kumbrasev // We should not publish arena until all fields are initialized
45871e1bb8eSPavel Kumbrasev control->publish_client(a.my_tc_client, constraints);
459c4568449SPavel Kumbrasev return a;
460c4568449SPavel Kumbrasev }
461c4568449SPavel Kumbrasev
46251c0b2f7Stbbdev } // namespace r1
46351c0b2f7Stbbdev } // namespace detail
46451c0b2f7Stbbdev } // namespace tbb
46551c0b2f7Stbbdev
46651c0b2f7Stbbdev // Enable task_arena.h
46749e08aacStbbdev #include "oneapi/tbb/task_arena.h" // task_arena_base
46851c0b2f7Stbbdev
46951c0b2f7Stbbdev namespace tbb {
47051c0b2f7Stbbdev namespace detail {
47151c0b2f7Stbbdev namespace r1 {
47251c0b2f7Stbbdev
47351c0b2f7Stbbdev #if TBB_USE_ASSERT
assert_arena_priority_valid(tbb::task_arena::priority a_priority)47451c0b2f7Stbbdev void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
47551c0b2f7Stbbdev bool is_arena_priority_correct =
47651c0b2f7Stbbdev a_priority == tbb::task_arena::priority::high ||
47751c0b2f7Stbbdev a_priority == tbb::task_arena::priority::normal ||
47851c0b2f7Stbbdev a_priority == tbb::task_arena::priority::low;
47951c0b2f7Stbbdev __TBB_ASSERT( is_arena_priority_correct,
48051c0b2f7Stbbdev "Task arena priority should be equal to one of the predefined values." );
48151c0b2f7Stbbdev }
48251c0b2f7Stbbdev #else
48351c0b2f7Stbbdev void assert_arena_priority_valid( tbb::task_arena::priority ) {}
48451c0b2f7Stbbdev #endif
48551c0b2f7Stbbdev
arena_priority_level(tbb::task_arena::priority a_priority)48651c0b2f7Stbbdev unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
48751c0b2f7Stbbdev assert_arena_priority_valid( a_priority );
488c4568449SPavel Kumbrasev return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
48951c0b2f7Stbbdev }
49051c0b2f7Stbbdev
arena_priority(unsigned priority_level)49151c0b2f7Stbbdev tbb::task_arena::priority arena_priority( unsigned priority_level ) {
49251c0b2f7Stbbdev auto priority = tbb::task_arena::priority(
493c4568449SPavel Kumbrasev (d1::num_priority_levels - priority_level) * d1::priority_stride
49451c0b2f7Stbbdev );
49551c0b2f7Stbbdev assert_arena_priority_valid( priority );
49651c0b2f7Stbbdev return priority;
49751c0b2f7Stbbdev }
49851c0b2f7Stbbdev
49951c0b2f7Stbbdev struct task_arena_impl {
50051c0b2f7Stbbdev static void initialize(d1::task_arena_base&);
50151c0b2f7Stbbdev static void terminate(d1::task_arena_base&);
50251c0b2f7Stbbdev static bool attach(d1::task_arena_base&);
50351c0b2f7Stbbdev static void execute(d1::task_arena_base&, d1::delegate_base&);
50451c0b2f7Stbbdev static void wait(d1::task_arena_base&);
50551c0b2f7Stbbdev static int max_concurrency(const d1::task_arena_base*);
506478de5b1Stbbdev static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
50751c0b2f7Stbbdev };
50851c0b2f7Stbbdev
initialize(d1::task_arena_base & ta)50951c0b2f7Stbbdev void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
51051c0b2f7Stbbdev task_arena_impl::initialize(ta);
51151c0b2f7Stbbdev }
terminate(d1::task_arena_base & ta)51251c0b2f7Stbbdev void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
51351c0b2f7Stbbdev task_arena_impl::terminate(ta);
51451c0b2f7Stbbdev }
attach(d1::task_arena_base & ta)51551c0b2f7Stbbdev bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
51651c0b2f7Stbbdev return task_arena_impl::attach(ta);
51751c0b2f7Stbbdev }
execute(d1::task_arena_base & ta,d1::delegate_base & d)51851c0b2f7Stbbdev void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
51951c0b2f7Stbbdev task_arena_impl::execute(ta, d);
52051c0b2f7Stbbdev }
wait(d1::task_arena_base & ta)52151c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
52251c0b2f7Stbbdev task_arena_impl::wait(ta);
52351c0b2f7Stbbdev }
52451c0b2f7Stbbdev
max_concurrency(const d1::task_arena_base * ta)52551c0b2f7Stbbdev int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
52651c0b2f7Stbbdev return task_arena_impl::max_concurrency(ta);
52751c0b2f7Stbbdev }
52851c0b2f7Stbbdev
enqueue(d1::task & t,d1::task_arena_base * ta)52951c0b2f7Stbbdev void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
530478de5b1Stbbdev task_arena_impl::enqueue(t, nullptr, ta);
531478de5b1Stbbdev }
532478de5b1Stbbdev
enqueue(d1::task & t,d1::task_group_context & ctx,d1::task_arena_base * ta)533478de5b1Stbbdev void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
534478de5b1Stbbdev task_arena_impl::enqueue(t, &ctx, ta);
53551c0b2f7Stbbdev }
53651c0b2f7Stbbdev
initialize(d1::task_arena_base & ta)53751c0b2f7Stbbdev void task_arena_impl::initialize(d1::task_arena_base& ta) {
538552f342bSPavel // Enforce global market initialization to properly initialize soft limit
539552f342bSPavel (void)governor::get_thread_data();
54071e1bb8eSPavel Kumbrasev d1::constraints arena_constraints;
54171e1bb8eSPavel Kumbrasev
542b15aabb3Stbbdev #if __TBB_ARENA_BINDING
54371e1bb8eSPavel Kumbrasev arena_constraints = d1::constraints{}
544b15aabb3Stbbdev .set_core_type(ta.core_type())
545b15aabb3Stbbdev .set_max_threads_per_core(ta.max_threads_per_core())
546b15aabb3Stbbdev .set_numa_id(ta.my_numa_id);
54771e1bb8eSPavel Kumbrasev #endif /*__TBB_ARENA_BINDING*/
54871e1bb8eSPavel Kumbrasev
54971e1bb8eSPavel Kumbrasev if (ta.my_max_concurrency < 1) {
55071e1bb8eSPavel Kumbrasev #if __TBB_ARENA_BINDING
551b15aabb3Stbbdev ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
552b15aabb3Stbbdev #else /*!__TBB_ARENA_BINDING*/
553b15aabb3Stbbdev ta.my_max_concurrency = (int)governor::default_num_threads();
554b15aabb3Stbbdev #endif /*!__TBB_ARENA_BINDING*/
555b15aabb3Stbbdev }
556b15aabb3Stbbdev
557b15aabb3Stbbdev __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
55851c0b2f7Stbbdev unsigned priority_level = arena_priority_level(ta.my_priority);
559c4568449SPavel Kumbrasev threading_control* thr_control = threading_control::register_public_reference();
56071e1bb8eSPavel Kumbrasev arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints);
56171e1bb8eSPavel Kumbrasev
562c4568449SPavel Kumbrasev ta.my_arena.store(&a, std::memory_order_release);
563f9fd1beeSIlya Isaev #if __TBB_CPUBIND_PRESENT
564c4568449SPavel Kumbrasev a.my_numa_binding_observer = construct_binding_observer(
565c4568449SPavel Kumbrasev static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
566f9fd1beeSIlya Isaev #endif /*__TBB_CPUBIND_PRESENT*/
56751c0b2f7Stbbdev }
56851c0b2f7Stbbdev
terminate(d1::task_arena_base & ta)56951c0b2f7Stbbdev void task_arena_impl::terminate(d1::task_arena_base& ta) {
570b15aabb3Stbbdev arena* a = ta.my_arena.load(std::memory_order_relaxed);
571b15aabb3Stbbdev assert_pointer_valid(a);
572c4568449SPavel Kumbrasev threading_control::unregister_public_reference(/*blocking_terminate=*/false);
573c4568449SPavel Kumbrasev a->on_thread_leaving(arena::ref_external);
574b15aabb3Stbbdev ta.my_arena.store(nullptr, std::memory_order_relaxed);
57551c0b2f7Stbbdev }
57651c0b2f7Stbbdev
attach(d1::task_arena_base & ta)57751c0b2f7Stbbdev bool task_arena_impl::attach(d1::task_arena_base& ta) {
578b15aabb3Stbbdev __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
57951c0b2f7Stbbdev thread_data* td = governor::get_thread_data_if_initialized();
58051c0b2f7Stbbdev if( td && td->my_arena ) {
581b15aabb3Stbbdev arena* a = td->my_arena;
58251c0b2f7Stbbdev // There is an active arena to attach to.
58351c0b2f7Stbbdev // It's still used by s, so won't be destroyed right away.
58457f524caSIlya Isaev __TBB_ASSERT(a->my_references > 0, nullptr);
585b15aabb3Stbbdev a->my_references += arena::ref_external;
586b15aabb3Stbbdev ta.my_num_reserved_slots = a->my_num_reserved_slots;
587b15aabb3Stbbdev ta.my_priority = arena_priority(a->my_priority_level);
588b15aabb3Stbbdev ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
589f8f7f738SPavel Kumbrasev __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr);
590b15aabb3Stbbdev ta.my_arena.store(a, std::memory_order_release);
591c4568449SPavel Kumbrasev // increases threading_control's ref count for task_arena
592c4568449SPavel Kumbrasev threading_control::register_public_reference();
59351c0b2f7Stbbdev return true;
59451c0b2f7Stbbdev }
59551c0b2f7Stbbdev return false;
59651c0b2f7Stbbdev }
59751c0b2f7Stbbdev
enqueue(d1::task & t,d1::task_group_context * c,d1::task_arena_base * ta)598478de5b1Stbbdev void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
59951c0b2f7Stbbdev thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance
600478de5b1Stbbdev assert_pointer_valid(td, "thread_data pointer should not be null");
601478de5b1Stbbdev arena* a = ta ?
602478de5b1Stbbdev ta->my_arena.load(std::memory_order_relaxed)
603478de5b1Stbbdev : td->my_arena
604478de5b1Stbbdev ;
605478de5b1Stbbdev assert_pointer_valid(a, "arena pointer should not be null");
606478de5b1Stbbdev auto* ctx = c ? c : a->my_default_ctx;
607478de5b1Stbbdev assert_pointer_valid(ctx, "context pointer should not be null");
608478de5b1Stbbdev // Is there a better place for checking the state of ctx?
609b15aabb3Stbbdev __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
610478de5b1Stbbdev "The task will not be executed because its task_group_context is cancelled.");
611478de5b1Stbbdev a->enqueue_task(t, *ctx, *td);
61251c0b2f7Stbbdev }
61351c0b2f7Stbbdev
61451c0b2f7Stbbdev class nested_arena_context : no_copy {
61551c0b2f7Stbbdev public:
nested_arena_context(thread_data & td,arena & nested_arena,std::size_t slot_index)61651c0b2f7Stbbdev nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
61751c0b2f7Stbbdev : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
61851c0b2f7Stbbdev {
61951c0b2f7Stbbdev if (td.my_arena != &nested_arena) {
62051c0b2f7Stbbdev m_orig_arena = td.my_arena;
62151c0b2f7Stbbdev m_orig_slot_index = td.my_arena_index;
62251c0b2f7Stbbdev m_orig_last_observer = td.my_last_observer;
62351c0b2f7Stbbdev
62451c0b2f7Stbbdev td.detach_task_dispatcher();
62551c0b2f7Stbbdev td.attach_arena(nested_arena, slot_index);
626478de5b1Stbbdev if (td.my_inbox.is_idle_state(true))
627478de5b1Stbbdev td.my_inbox.set_is_idle(false);
62851c0b2f7Stbbdev task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
629219c4252SAlex td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
63051c0b2f7Stbbdev
631b15aabb3Stbbdev // If the calling thread occupies the slots out of external thread reserve we need to notify the
63251c0b2f7Stbbdev // market that this arena requires one worker less.
63351c0b2f7Stbbdev if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
634c4568449SPavel Kumbrasev td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
63551c0b2f7Stbbdev }
63651c0b2f7Stbbdev
63751c0b2f7Stbbdev td.my_last_observer = nullptr;
638b15aabb3Stbbdev // The task_arena::execute method considers each calling thread as an external thread.
63951c0b2f7Stbbdev td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
64051c0b2f7Stbbdev }
64151c0b2f7Stbbdev
64251c0b2f7Stbbdev m_task_dispatcher = td.my_task_dispatcher;
64351c0b2f7Stbbdev m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
64451c0b2f7Stbbdev m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
64551c0b2f7Stbbdev m_task_dispatcher->m_properties.critical_task_allowed = true;
64651c0b2f7Stbbdev
64751c0b2f7Stbbdev execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
64851c0b2f7Stbbdev ed_ext.context = td.my_arena->my_default_ctx;
64951c0b2f7Stbbdev ed_ext.original_slot = td.my_arena_index;
65051c0b2f7Stbbdev ed_ext.affinity_slot = d1::no_slot;
65151c0b2f7Stbbdev ed_ext.task_disp = td.my_task_dispatcher;
65251c0b2f7Stbbdev ed_ext.isolation = no_isolation;
65351c0b2f7Stbbdev
65451c0b2f7Stbbdev __TBB_ASSERT(td.my_arena_slot, nullptr);
65551c0b2f7Stbbdev __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
65651c0b2f7Stbbdev __TBB_ASSERT(td.my_task_dispatcher, nullptr);
65751c0b2f7Stbbdev }
~nested_arena_context()65851c0b2f7Stbbdev ~nested_arena_context() {
65951c0b2f7Stbbdev thread_data& td = *m_task_dispatcher->m_thread_data;
66051c0b2f7Stbbdev __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
66151c0b2f7Stbbdev m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
66251c0b2f7Stbbdev m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
66351c0b2f7Stbbdev if (m_orig_arena) {
66451c0b2f7Stbbdev td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
66551c0b2f7Stbbdev td.my_last_observer = m_orig_last_observer;
66651c0b2f7Stbbdev
66751c0b2f7Stbbdev // Notify the market that this thread releasing a one slot
66851c0b2f7Stbbdev // that can be used by a worker thread.
66951c0b2f7Stbbdev if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
670c4568449SPavel Kumbrasev td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
67151c0b2f7Stbbdev }
67251c0b2f7Stbbdev
673219c4252SAlex td.leave_task_dispatcher();
67451c0b2f7Stbbdev td.my_arena_slot->release();
67551c0b2f7Stbbdev td.my_arena->my_exit_monitors.notify_one(); // do not relax!
67651c0b2f7Stbbdev
67751c0b2f7Stbbdev td.attach_arena(*m_orig_arena, m_orig_slot_index);
67851c0b2f7Stbbdev td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
679478de5b1Stbbdev __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
68051c0b2f7Stbbdev }
68151c0b2f7Stbbdev td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
68251c0b2f7Stbbdev }
68351c0b2f7Stbbdev
68451c0b2f7Stbbdev private:
68551c0b2f7Stbbdev execution_data_ext m_orig_execute_data_ext{};
68651c0b2f7Stbbdev arena* m_orig_arena{ nullptr };
68751c0b2f7Stbbdev observer_proxy* m_orig_last_observer{ nullptr };
68851c0b2f7Stbbdev task_dispatcher* m_task_dispatcher{ nullptr };
68951c0b2f7Stbbdev unsigned m_orig_slot_index{};
69051c0b2f7Stbbdev bool m_orig_fifo_tasks_allowed{};
69151c0b2f7Stbbdev bool m_orig_critical_task_allowed{};
69251c0b2f7Stbbdev };
69351c0b2f7Stbbdev
69451c0b2f7Stbbdev class delegated_task : public d1::task {
69551c0b2f7Stbbdev d1::delegate_base& m_delegate;
69651c0b2f7Stbbdev concurrent_monitor& m_monitor;
69751c0b2f7Stbbdev d1::wait_context& m_wait_ctx;
69851c0b2f7Stbbdev std::atomic<bool> m_completed;
execute(d1::execution_data & ed)69951c0b2f7Stbbdev d1::task* execute(d1::execution_data& ed) override {
70051c0b2f7Stbbdev const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
70151c0b2f7Stbbdev execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
70251c0b2f7Stbbdev __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
70351c0b2f7Stbbdev "The execute data shall point to the current task dispatcher execute data");
70451c0b2f7Stbbdev __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
70551c0b2f7Stbbdev
70651c0b2f7Stbbdev ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
70751c0b2f7Stbbdev bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
70851c0b2f7Stbbdev try_call([&] {
70951c0b2f7Stbbdev m_delegate();
71051c0b2f7Stbbdev }).on_completion([&] {
71151c0b2f7Stbbdev ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
71251c0b2f7Stbbdev ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
71351c0b2f7Stbbdev });
71451c0b2f7Stbbdev
71551c0b2f7Stbbdev finalize();
71651c0b2f7Stbbdev return nullptr;
71751c0b2f7Stbbdev }
cancel(d1::execution_data &)71851c0b2f7Stbbdev d1::task* cancel(d1::execution_data&) override {
71951c0b2f7Stbbdev finalize();
72051c0b2f7Stbbdev return nullptr;
72151c0b2f7Stbbdev }
finalize()72251c0b2f7Stbbdev void finalize() {
72351c0b2f7Stbbdev m_wait_ctx.release(); // must precede the wakeup
72451c0b2f7Stbbdev m_monitor.notify([this] (std::uintptr_t ctx) {
72551c0b2f7Stbbdev return ctx == std::uintptr_t(&m_delegate);
72651c0b2f7Stbbdev }); // do not relax, it needs a fence!
72751c0b2f7Stbbdev m_completed.store(true, std::memory_order_release);
72851c0b2f7Stbbdev }
72951c0b2f7Stbbdev public:
delegated_task(d1::delegate_base & d,concurrent_monitor & s,d1::wait_context & wo)73051c0b2f7Stbbdev delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
73151c0b2f7Stbbdev : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
~delegated_task()732ba947f18SIlya Isaev ~delegated_task() override {
73351c0b2f7Stbbdev // The destructor can be called earlier than the m_monitor is notified
73451c0b2f7Stbbdev // because the waiting thread can be released after m_wait_ctx.release_wait.
73551c0b2f7Stbbdev // To close that race we wait for the m_completed signal.
73651c0b2f7Stbbdev spin_wait_until_eq(m_completed, true);
73751c0b2f7Stbbdev }
73851c0b2f7Stbbdev };
73951c0b2f7Stbbdev
execute(d1::task_arena_base & ta,d1::delegate_base & d)74051c0b2f7Stbbdev void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
741b15aabb3Stbbdev arena* a = ta.my_arena.load(std::memory_order_relaxed);
742b15aabb3Stbbdev __TBB_ASSERT(a != nullptr, nullptr);
74351c0b2f7Stbbdev thread_data* td = governor::get_thread_data();
74451c0b2f7Stbbdev
745b15aabb3Stbbdev bool same_arena = td->my_arena == a;
74651c0b2f7Stbbdev std::size_t index1 = td->my_arena_index;
74751c0b2f7Stbbdev if (!same_arena) {
748b15aabb3Stbbdev index1 = a->occupy_free_slot</*as_worker */false>(*td);
74951c0b2f7Stbbdev if (index1 == arena::out_of_arena) {
7508dcbd5b1Stbbdev concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
75151c0b2f7Stbbdev d1::wait_context wo(1);
75251c0b2f7Stbbdev d1::task_group_context exec_context(d1::task_group_context::isolated);
753b15aabb3Stbbdev task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
75451c0b2f7Stbbdev
755b15aabb3Stbbdev delegated_task dt(d, a->my_exit_monitors, wo);
756b15aabb3Stbbdev a->enqueue_task( dt, exec_context, *td);
75751c0b2f7Stbbdev size_t index2 = arena::out_of_arena;
75851c0b2f7Stbbdev do {
759b15aabb3Stbbdev a->my_exit_monitors.prepare_wait(waiter);
76051c0b2f7Stbbdev if (!wo.continue_execution()) {
761b15aabb3Stbbdev a->my_exit_monitors.cancel_wait(waiter);
76251c0b2f7Stbbdev break;
76351c0b2f7Stbbdev }
764b15aabb3Stbbdev index2 = a->occupy_free_slot</*as_worker*/false>(*td);
76551c0b2f7Stbbdev if (index2 != arena::out_of_arena) {
766b15aabb3Stbbdev a->my_exit_monitors.cancel_wait(waiter);
767b15aabb3Stbbdev nested_arena_context scope(*td, *a, index2 );
76851c0b2f7Stbbdev r1::wait(wo, exec_context);
769a080baf9SAlex __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
77051c0b2f7Stbbdev break;
77151c0b2f7Stbbdev }
772b15aabb3Stbbdev a->my_exit_monitors.commit_wait(waiter);
77351c0b2f7Stbbdev } while (wo.continue_execution());
77451c0b2f7Stbbdev if (index2 == arena::out_of_arena) {
77551c0b2f7Stbbdev // notify a waiting thread even if this thread did not enter arena,
77651c0b2f7Stbbdev // in case it was woken by a leaving thread but did not need to enter
777b15aabb3Stbbdev a->my_exit_monitors.notify_one(); // do not relax!
77851c0b2f7Stbbdev }
77951c0b2f7Stbbdev // process possible exception
780a080baf9SAlex auto exception = exec_context.my_exception.load(std::memory_order_acquire);
781a080baf9SAlex if (exception) {
78251c0b2f7Stbbdev __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
783a080baf9SAlex exception->throw_self();
78451c0b2f7Stbbdev }
78551c0b2f7Stbbdev __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
78651c0b2f7Stbbdev return;
78751c0b2f7Stbbdev } // if (index1 == arena::out_of_arena)
78851c0b2f7Stbbdev } // if (!same_arena)
78951c0b2f7Stbbdev
79051c0b2f7Stbbdev context_guard_helper</*report_tasks=*/false> context_guard;
791b15aabb3Stbbdev context_guard.set_ctx(a->my_default_ctx);
792b15aabb3Stbbdev nested_arena_context scope(*td, *a, index1);
79351c0b2f7Stbbdev #if _WIN64
79451c0b2f7Stbbdev try {
79551c0b2f7Stbbdev #endif
79651c0b2f7Stbbdev d();
79751c0b2f7Stbbdev __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
79851c0b2f7Stbbdev #if _WIN64
79951c0b2f7Stbbdev } catch (...) {
80051c0b2f7Stbbdev context_guard.restore_default();
80151c0b2f7Stbbdev throw;
80251c0b2f7Stbbdev }
80351c0b2f7Stbbdev #endif
80451c0b2f7Stbbdev }
80551c0b2f7Stbbdev
wait(d1::task_arena_base & ta)80651c0b2f7Stbbdev void task_arena_impl::wait(d1::task_arena_base& ta) {
807b15aabb3Stbbdev arena* a = ta.my_arena.load(std::memory_order_relaxed);
808b15aabb3Stbbdev __TBB_ASSERT(a != nullptr, nullptr);
80951c0b2f7Stbbdev thread_data* td = governor::get_thread_data();
81051c0b2f7Stbbdev __TBB_ASSERT_EX(td, "Scheduler is not initialized");
811b15aabb3Stbbdev __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
812b15aabb3Stbbdev if (a->my_max_num_workers != 0) {
813c4568449SPavel Kumbrasev while (a->num_workers_active() || !a->is_empty()) {
81451c0b2f7Stbbdev yield();
81551c0b2f7Stbbdev }
81651c0b2f7Stbbdev }
81751c0b2f7Stbbdev }
81851c0b2f7Stbbdev
max_concurrency(const d1::task_arena_base * ta)81951c0b2f7Stbbdev int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
82051c0b2f7Stbbdev arena* a = nullptr;
82151c0b2f7Stbbdev if( ta ) // for special cases of ta->max_concurrency()
822b15aabb3Stbbdev a = ta->my_arena.load(std::memory_order_relaxed);
82351c0b2f7Stbbdev else if( thread_data* td = governor::get_thread_data_if_initialized() )
82451c0b2f7Stbbdev a = td->my_arena; // the current arena if any
82551c0b2f7Stbbdev
82651c0b2f7Stbbdev if( a ) { // Get parameters from the arena
82757f524caSIlya Isaev __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
828c4568449SPavel Kumbrasev int mandatory_worker = 0;
829c4568449SPavel Kumbrasev if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) {
830c4568449SPavel Kumbrasev mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0;
831c4568449SPavel Kumbrasev }
832c4568449SPavel Kumbrasev return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker;
83351c0b2f7Stbbdev }
83451c0b2f7Stbbdev
83551c0b2f7Stbbdev if (ta && ta->my_max_concurrency == 1) {
83651c0b2f7Stbbdev return 1;
83751c0b2f7Stbbdev }
83851c0b2f7Stbbdev
839b15aabb3Stbbdev #if __TBB_ARENA_BINDING
840b15aabb3Stbbdev if (ta) {
841b15aabb3Stbbdev d1::constraints arena_constraints = d1::constraints{}
842b15aabb3Stbbdev .set_numa_id(ta->my_numa_id)
843b15aabb3Stbbdev .set_core_type(ta->core_type())
844b15aabb3Stbbdev .set_max_threads_per_core(ta->max_threads_per_core());
845b15aabb3Stbbdev return (int)default_concurrency(arena_constraints);
846b15aabb3Stbbdev }
847b15aabb3Stbbdev #endif /*!__TBB_ARENA_BINDING*/
848b15aabb3Stbbdev
84957f524caSIlya Isaev __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
85051c0b2f7Stbbdev return int(governor::default_num_threads());
85151c0b2f7Stbbdev }
85251c0b2f7Stbbdev
isolate_within_arena(d1::delegate_base & d,std::intptr_t isolation)85351c0b2f7Stbbdev void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
85451c0b2f7Stbbdev // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
85551c0b2f7Stbbdev thread_data* tls = governor::get_thread_data();
85651c0b2f7Stbbdev assert_pointers_valid(tls, tls->my_task_dispatcher);
85751c0b2f7Stbbdev task_dispatcher* dispatcher = tls->my_task_dispatcher;
85851c0b2f7Stbbdev isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
85951c0b2f7Stbbdev try_call([&] {
86051c0b2f7Stbbdev // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
86151c0b2f7Stbbdev isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
86251c0b2f7Stbbdev // Save the current isolation value and set new one
86351c0b2f7Stbbdev previous_isolation = dispatcher->set_isolation(current_isolation);
86451c0b2f7Stbbdev // Isolation within this callable
86551c0b2f7Stbbdev d();
86651c0b2f7Stbbdev }).on_completion([&] {
86757f524caSIlya Isaev __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
86851c0b2f7Stbbdev dispatcher->set_isolation(previous_isolation);
86951c0b2f7Stbbdev });
87051c0b2f7Stbbdev }
87151c0b2f7Stbbdev
87251c0b2f7Stbbdev } // namespace r1
87351c0b2f7Stbbdev } // namespace detail
87451c0b2f7Stbbdev } // namespace tbb
875