xref: /oneTBB/src/tbb/task_dispatcher.cpp (revision e77098d6)
151c0b2f7Stbbdev /*
2*e77098d6SPavel Kumbrasev     Copyright (c) 2020-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "task_dispatcher.h"
1851c0b2f7Stbbdev #include "waiters.h"
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev namespace tbb {
2151c0b2f7Stbbdev namespace detail {
2251c0b2f7Stbbdev namespace r1 {
2351c0b2f7Stbbdev 
spawn_and_notify(d1::task & t,arena_slot * slot,arena * a)2451c0b2f7Stbbdev static inline void spawn_and_notify(d1::task& t, arena_slot* slot, arena* a) {
2551c0b2f7Stbbdev     slot->spawn(t);
2651c0b2f7Stbbdev     a->advertise_new_work<arena::work_spawned>();
2751c0b2f7Stbbdev     // TODO: TBB_REVAMP_TODO slot->assert_task_pool_valid();
2851c0b2f7Stbbdev }
2951c0b2f7Stbbdev 
spawn(d1::task & t,d1::task_group_context & ctx)3051c0b2f7Stbbdev void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx) {
3151c0b2f7Stbbdev     thread_data* tls = governor::get_thread_data();
3251c0b2f7Stbbdev     task_group_context_impl::bind_to(ctx, tls);
3351c0b2f7Stbbdev     arena* a = tls->my_arena;
3451c0b2f7Stbbdev     arena_slot* slot = tls->my_arena_slot;
3551c0b2f7Stbbdev     // Capture current context
3651c0b2f7Stbbdev     task_accessor::context(t) = &ctx;
3751c0b2f7Stbbdev     // Mark isolation
3851c0b2f7Stbbdev     task_accessor::isolation(t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
3951c0b2f7Stbbdev     spawn_and_notify(t, slot, a);
4051c0b2f7Stbbdev }
4151c0b2f7Stbbdev 
spawn(d1::task & t,d1::task_group_context & ctx,d1::slot_id id)4251c0b2f7Stbbdev void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx, d1::slot_id id) {
4351c0b2f7Stbbdev     thread_data* tls = governor::get_thread_data();
4451c0b2f7Stbbdev     task_group_context_impl::bind_to(ctx, tls);
4551c0b2f7Stbbdev     arena* a = tls->my_arena;
4651c0b2f7Stbbdev     arena_slot* slot = tls->my_arena_slot;
47d86ed7fbStbbdev     execution_data_ext& ed = tls->my_task_dispatcher->m_execute_data_ext;
4851c0b2f7Stbbdev 
4951c0b2f7Stbbdev     // Capture context
5051c0b2f7Stbbdev     task_accessor::context(t) = &ctx;
5151c0b2f7Stbbdev     // Mark isolation
52d86ed7fbStbbdev     task_accessor::isolation(t) = ed.isolation;
5351c0b2f7Stbbdev 
5434c7d170SAlex     if ( id != d1::no_slot && id != tls->my_arena_index && id < a->my_num_slots) {
5551c0b2f7Stbbdev         // Allocate proxy task
56d86ed7fbStbbdev         d1::small_object_allocator alloc{};
57d86ed7fbStbbdev         auto proxy = alloc.new_object<task_proxy>(static_cast<d1::execution_data&>(ed));
5851c0b2f7Stbbdev         // Mark as a proxy
5951c0b2f7Stbbdev         task_accessor::set_proxy_trait(*proxy);
6051c0b2f7Stbbdev         // Mark isolation for the proxy task
61d86ed7fbStbbdev         task_accessor::isolation(*proxy) = ed.isolation;
6251c0b2f7Stbbdev         // Deallocation hint (tls) from the task allocator
6351c0b2f7Stbbdev         proxy->allocator = alloc;
6451c0b2f7Stbbdev         proxy->slot = id;
6551c0b2f7Stbbdev         proxy->outbox = &a->mailbox(id);
6651c0b2f7Stbbdev         // Mark proxy as present in both locations (sender's task pool and destination mailbox)
6751c0b2f7Stbbdev         proxy->task_and_tag = intptr_t(&t) | task_proxy::location_mask;
6851c0b2f7Stbbdev         // Mail the proxy - after this point t may be destroyed by another thread at any moment.
6951c0b2f7Stbbdev         proxy->outbox->push(proxy);
7051c0b2f7Stbbdev         // Spawn proxy to the local task pool
7151c0b2f7Stbbdev         spawn_and_notify(*proxy, slot, a);
7251c0b2f7Stbbdev     } else {
7351c0b2f7Stbbdev         spawn_and_notify(t, slot, a);
7451c0b2f7Stbbdev     }
7551c0b2f7Stbbdev }
7651c0b2f7Stbbdev 
submit(d1::task & t,d1::task_group_context & ctx,arena * a,std::uintptr_t as_critical)7751c0b2f7Stbbdev void __TBB_EXPORTED_FUNC submit(d1::task& t, d1::task_group_context& ctx, arena* a, std::uintptr_t as_critical) {
7851c0b2f7Stbbdev     suppress_unused_warning(as_critical);
7951c0b2f7Stbbdev     assert_pointer_valid(a);
8051c0b2f7Stbbdev     thread_data& tls = *governor::get_thread_data();
8151c0b2f7Stbbdev 
8251c0b2f7Stbbdev     // TODO revamp: for each use case investigate neccesity to make this call
8351c0b2f7Stbbdev     task_group_context_impl::bind_to(ctx, &tls);
8451c0b2f7Stbbdev     task_accessor::context(t) = &ctx;
8551c0b2f7Stbbdev     // TODO revamp: consider respecting task isolation if this call is being made by external thread
8651c0b2f7Stbbdev     task_accessor::isolation(t) = tls.my_task_dispatcher->m_execute_data_ext.isolation;
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev     // TODO: consider code refactoring when lane selection mechanism is unified.
8951c0b2f7Stbbdev 
9051c0b2f7Stbbdev     if ( tls.is_attached_to(a) ) {
9151c0b2f7Stbbdev         arena_slot* slot = tls.my_arena_slot;
9251c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
9351c0b2f7Stbbdev         if( as_critical ) {
9451c0b2f7Stbbdev             a->my_critical_task_stream.push( &t, subsequent_lane_selector(slot->critical_hint()) );
9551c0b2f7Stbbdev         } else
9651c0b2f7Stbbdev #endif
9751c0b2f7Stbbdev         {
9851c0b2f7Stbbdev             slot->spawn(t);
9951c0b2f7Stbbdev         }
10051c0b2f7Stbbdev     } else {
10151c0b2f7Stbbdev         random_lane_selector lane_selector{tls.my_random};
10251c0b2f7Stbbdev #if !__TBB_PREVIEW_CRITICAL_TASKS
10351c0b2f7Stbbdev         suppress_unused_warning(as_critical);
10451c0b2f7Stbbdev #else
10551c0b2f7Stbbdev         if ( as_critical ) {
10651c0b2f7Stbbdev             a->my_critical_task_stream.push( &t, lane_selector );
10751c0b2f7Stbbdev         } else
10851c0b2f7Stbbdev #endif
10951c0b2f7Stbbdev         {
11051c0b2f7Stbbdev             // Avoid joining the arena the thread is not currently in.
11151c0b2f7Stbbdev             a->my_fifo_task_stream.push( &t, lane_selector );
11251c0b2f7Stbbdev         }
11351c0b2f7Stbbdev     }
11451c0b2f7Stbbdev     // It is assumed that some thread will explicitly wait in the arena the task is submitted
11551c0b2f7Stbbdev     // into. Therefore, no need to utilize mandatory concurrency here.
11651c0b2f7Stbbdev     a->advertise_new_work<arena::work_spawned>();
11751c0b2f7Stbbdev }
11851c0b2f7Stbbdev 
execute_and_wait(d1::task & t,d1::task_group_context & t_ctx,d1::wait_context & wait_ctx,d1::task_group_context & w_ctx)11951c0b2f7Stbbdev void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group_context& t_ctx, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
12051c0b2f7Stbbdev     task_accessor::context(t) = &t_ctx;
12151c0b2f7Stbbdev     task_dispatcher::execute_and_wait(&t, wait_ctx, w_ctx);
12251c0b2f7Stbbdev }
12351c0b2f7Stbbdev 
wait(d1::wait_context & wait_ctx,d1::task_group_context & w_ctx)12451c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait(d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
12551c0b2f7Stbbdev     // Enter the task dispatch loop without a task
12651c0b2f7Stbbdev     task_dispatcher::execute_and_wait(nullptr, wait_ctx, w_ctx);
12751c0b2f7Stbbdev }
12851c0b2f7Stbbdev 
execution_slot(const d1::execution_data * ed)12951c0b2f7Stbbdev d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data* ed) {
13051c0b2f7Stbbdev     if (ed) {
13151c0b2f7Stbbdev         const execution_data_ext* ed_ext = static_cast<const execution_data_ext*>(ed);
13251c0b2f7Stbbdev         assert_pointers_valid(ed_ext->task_disp, ed_ext->task_disp->m_thread_data);
13351c0b2f7Stbbdev         return ed_ext->task_disp->m_thread_data->my_arena_index;
13451c0b2f7Stbbdev     } else {
13551c0b2f7Stbbdev         thread_data* td = governor::get_thread_data_if_initialized();
136c6045a4fSPavel         return td ? td->my_arena_index : d1::slot_id(-1);
13751c0b2f7Stbbdev     }
13851c0b2f7Stbbdev }
13951c0b2f7Stbbdev 
current_context()14051c0b2f7Stbbdev d1::task_group_context* __TBB_EXPORTED_FUNC current_context() {
14151c0b2f7Stbbdev     thread_data* td = governor::get_thread_data();
14251c0b2f7Stbbdev     assert_pointers_valid(td, td->my_task_dispatcher);
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev     task_dispatcher* task_disp = td->my_task_dispatcher;
14551c0b2f7Stbbdev     if (task_disp->m_properties.outermost) {
14651c0b2f7Stbbdev         // No one task is executed, so no execute_data.
14751c0b2f7Stbbdev         return nullptr;
14851c0b2f7Stbbdev     } else {
14951c0b2f7Stbbdev         return td->my_task_dispatcher->m_execute_data_ext.context;
15051c0b2f7Stbbdev     }
15151c0b2f7Stbbdev }
15251c0b2f7Stbbdev 
execute_and_wait(d1::task * t,d1::wait_context & wait_ctx,d1::task_group_context & w_ctx)15351c0b2f7Stbbdev void task_dispatcher::execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
15451c0b2f7Stbbdev     // Get an associated task dispatcher
15551c0b2f7Stbbdev     thread_data* tls = governor::get_thread_data();
15651c0b2f7Stbbdev     __TBB_ASSERT(tls->my_task_dispatcher != nullptr, nullptr);
15751c0b2f7Stbbdev     task_dispatcher& local_td = *tls->my_task_dispatcher;
15851c0b2f7Stbbdev 
15951c0b2f7Stbbdev     // TODO: factor out the binding to execute_and_wait_impl
16051c0b2f7Stbbdev     if (t) {
16151c0b2f7Stbbdev         task_group_context_impl::bind_to(*task_accessor::context(*t), tls);
16251c0b2f7Stbbdev         // Propagate the isolation to the task executed without spawn.
16351c0b2f7Stbbdev         task_accessor::isolation(*t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
16451c0b2f7Stbbdev     }
16551c0b2f7Stbbdev 
16651c0b2f7Stbbdev     // Waiting on special object tied to a waiting thread.
16751c0b2f7Stbbdev     external_waiter waiter{ *tls->my_arena, wait_ctx };
16851c0b2f7Stbbdev     t = local_td.local_wait_for_all(t, waiter);
16951c0b2f7Stbbdev     __TBB_ASSERT_EX(t == nullptr, "External waiter must not leave dispatch loop with a task");
17051c0b2f7Stbbdev 
171b15aabb3Stbbdev     // The external thread couldn't exit the dispatch loop in an idle state
17251c0b2f7Stbbdev     if (local_td.m_thread_data->my_inbox.is_idle_state(true)) {
17351c0b2f7Stbbdev         local_td.m_thread_data->my_inbox.set_is_idle(false);
17451c0b2f7Stbbdev     }
17551c0b2f7Stbbdev 
176a080baf9SAlex     auto exception = w_ctx.my_exception.load(std::memory_order_acquire);
177a080baf9SAlex     if (exception) {
17851c0b2f7Stbbdev         __TBB_ASSERT(w_ctx.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
179a080baf9SAlex         exception->throw_self();
18051c0b2f7Stbbdev     }
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev 
18351c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
18451c0b2f7Stbbdev 
18551c0b2f7Stbbdev #if _WIN32
co_local_wait_for_all(void * addr)186478de5b1Stbbdev /* [[noreturn]] */ void __stdcall co_local_wait_for_all(void* addr) noexcept
18751c0b2f7Stbbdev #else
188478de5b1Stbbdev /* [[noreturn]] */ void co_local_wait_for_all(unsigned hi, unsigned lo) noexcept
18951c0b2f7Stbbdev #endif
19051c0b2f7Stbbdev {
191478de5b1Stbbdev #if !_WIN32
192478de5b1Stbbdev     std::uintptr_t addr = lo;
193478de5b1Stbbdev     __TBB_ASSERT(sizeof(addr) == 8 || hi == 0, nullptr);
194478de5b1Stbbdev     addr += std::uintptr_t(std::uint64_t(hi) << 32);
195478de5b1Stbbdev #endif
196478de5b1Stbbdev     task_dispatcher& task_disp = *reinterpret_cast<task_dispatcher*>(addr);
19751c0b2f7Stbbdev     assert_pointers_valid(task_disp.m_thread_data, task_disp.m_thread_data->my_arena);
19851c0b2f7Stbbdev     task_disp.set_stealing_threshold(task_disp.m_thread_data->my_arena->calculate_stealing_threshold());
19951c0b2f7Stbbdev     __TBB_ASSERT(task_disp.can_steal(), nullptr);
20051c0b2f7Stbbdev     task_disp.co_local_wait_for_all();
20151c0b2f7Stbbdev     // This code is unreachable
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev 
co_local_wait_for_all()20451c0b2f7Stbbdev /* [[noreturn]] */ void task_dispatcher::co_local_wait_for_all() noexcept {
20551c0b2f7Stbbdev     // Do not create non-trivial objects on the stack of this function. They will never be destroyed.
20651c0b2f7Stbbdev     assert_pointer_valid(m_thread_data);
20751c0b2f7Stbbdev 
208*e77098d6SPavel Kumbrasev     m_suspend_point->finilize_resume();
20951c0b2f7Stbbdev     // Basically calls the user callback passed to the tbb::task::suspend function
210*e77098d6SPavel Kumbrasev     do_post_resume_action();
21151c0b2f7Stbbdev 
21251c0b2f7Stbbdev     // Endless loop here because coroutine could be reused
21343c1805eSAlex     d1::task* resume_task{};
21443c1805eSAlex     do {
21551c0b2f7Stbbdev         arena* a = m_thread_data->my_arena;
21651c0b2f7Stbbdev         coroutine_waiter waiter(*a);
21743c1805eSAlex         resume_task = local_wait_for_all(nullptr, waiter);
21851c0b2f7Stbbdev         assert_task_valid(resume_task);
21951c0b2f7Stbbdev         __TBB_ASSERT(this == m_thread_data->my_task_dispatcher, nullptr);
22051c0b2f7Stbbdev 
221*e77098d6SPavel Kumbrasev         m_thread_data->set_post_resume_action(post_resume_action::cleanup, this);
22243c1805eSAlex 
22343c1805eSAlex     } while (resume(static_cast<suspend_point_type::resume_task*>(resume_task)->m_target));
22443c1805eSAlex     // This code might be unreachable
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev 
get_suspend_point()22751c0b2f7Stbbdev d1::suspend_point task_dispatcher::get_suspend_point() {
22851c0b2f7Stbbdev     if (m_suspend_point == nullptr) {
22951c0b2f7Stbbdev         assert_pointer_valid(m_thread_data);
23051c0b2f7Stbbdev         // 0 means that we attach this task dispatcher to the current stack
23151c0b2f7Stbbdev         init_suspend_point(m_thread_data->my_arena, 0);
23251c0b2f7Stbbdev     }
23351c0b2f7Stbbdev     assert_pointer_valid(m_suspend_point);
23451c0b2f7Stbbdev     return m_suspend_point;
23551c0b2f7Stbbdev }
init_suspend_point(arena * a,std::size_t stack_size)23651c0b2f7Stbbdev void task_dispatcher::init_suspend_point(arena* a, std::size_t stack_size) {
23751c0b2f7Stbbdev     __TBB_ASSERT(m_suspend_point == nullptr, nullptr);
23851c0b2f7Stbbdev     m_suspend_point = new(cache_aligned_allocate(sizeof(suspend_point_type)))
23951c0b2f7Stbbdev         suspend_point_type(a, stack_size, *this);
24051c0b2f7Stbbdev }
24151c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
24251c0b2f7Stbbdev } // namespace r1
24351c0b2f7Stbbdev } // namespace detail
24451c0b2f7Stbbdev } // namespace tbb
245