151c0b2f7Stbbdev /*
2*e77098d6SPavel Kumbrasev Copyright (c) 2020-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev #include "task_dispatcher.h"
1851c0b2f7Stbbdev #include "waiters.h"
1951c0b2f7Stbbdev
2051c0b2f7Stbbdev namespace tbb {
2151c0b2f7Stbbdev namespace detail {
2251c0b2f7Stbbdev namespace r1 {
2351c0b2f7Stbbdev
spawn_and_notify(d1::task & t,arena_slot * slot,arena * a)2451c0b2f7Stbbdev static inline void spawn_and_notify(d1::task& t, arena_slot* slot, arena* a) {
2551c0b2f7Stbbdev slot->spawn(t);
2651c0b2f7Stbbdev a->advertise_new_work<arena::work_spawned>();
2751c0b2f7Stbbdev // TODO: TBB_REVAMP_TODO slot->assert_task_pool_valid();
2851c0b2f7Stbbdev }
2951c0b2f7Stbbdev
spawn(d1::task & t,d1::task_group_context & ctx)3051c0b2f7Stbbdev void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx) {
3151c0b2f7Stbbdev thread_data* tls = governor::get_thread_data();
3251c0b2f7Stbbdev task_group_context_impl::bind_to(ctx, tls);
3351c0b2f7Stbbdev arena* a = tls->my_arena;
3451c0b2f7Stbbdev arena_slot* slot = tls->my_arena_slot;
3551c0b2f7Stbbdev // Capture current context
3651c0b2f7Stbbdev task_accessor::context(t) = &ctx;
3751c0b2f7Stbbdev // Mark isolation
3851c0b2f7Stbbdev task_accessor::isolation(t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
3951c0b2f7Stbbdev spawn_and_notify(t, slot, a);
4051c0b2f7Stbbdev }
4151c0b2f7Stbbdev
spawn(d1::task & t,d1::task_group_context & ctx,d1::slot_id id)4251c0b2f7Stbbdev void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx, d1::slot_id id) {
4351c0b2f7Stbbdev thread_data* tls = governor::get_thread_data();
4451c0b2f7Stbbdev task_group_context_impl::bind_to(ctx, tls);
4551c0b2f7Stbbdev arena* a = tls->my_arena;
4651c0b2f7Stbbdev arena_slot* slot = tls->my_arena_slot;
47d86ed7fbStbbdev execution_data_ext& ed = tls->my_task_dispatcher->m_execute_data_ext;
4851c0b2f7Stbbdev
4951c0b2f7Stbbdev // Capture context
5051c0b2f7Stbbdev task_accessor::context(t) = &ctx;
5151c0b2f7Stbbdev // Mark isolation
52d86ed7fbStbbdev task_accessor::isolation(t) = ed.isolation;
5351c0b2f7Stbbdev
5434c7d170SAlex if ( id != d1::no_slot && id != tls->my_arena_index && id < a->my_num_slots) {
5551c0b2f7Stbbdev // Allocate proxy task
56d86ed7fbStbbdev d1::small_object_allocator alloc{};
57d86ed7fbStbbdev auto proxy = alloc.new_object<task_proxy>(static_cast<d1::execution_data&>(ed));
5851c0b2f7Stbbdev // Mark as a proxy
5951c0b2f7Stbbdev task_accessor::set_proxy_trait(*proxy);
6051c0b2f7Stbbdev // Mark isolation for the proxy task
61d86ed7fbStbbdev task_accessor::isolation(*proxy) = ed.isolation;
6251c0b2f7Stbbdev // Deallocation hint (tls) from the task allocator
6351c0b2f7Stbbdev proxy->allocator = alloc;
6451c0b2f7Stbbdev proxy->slot = id;
6551c0b2f7Stbbdev proxy->outbox = &a->mailbox(id);
6651c0b2f7Stbbdev // Mark proxy as present in both locations (sender's task pool and destination mailbox)
6751c0b2f7Stbbdev proxy->task_and_tag = intptr_t(&t) | task_proxy::location_mask;
6851c0b2f7Stbbdev // Mail the proxy - after this point t may be destroyed by another thread at any moment.
6951c0b2f7Stbbdev proxy->outbox->push(proxy);
7051c0b2f7Stbbdev // Spawn proxy to the local task pool
7151c0b2f7Stbbdev spawn_and_notify(*proxy, slot, a);
7251c0b2f7Stbbdev } else {
7351c0b2f7Stbbdev spawn_and_notify(t, slot, a);
7451c0b2f7Stbbdev }
7551c0b2f7Stbbdev }
7651c0b2f7Stbbdev
submit(d1::task & t,d1::task_group_context & ctx,arena * a,std::uintptr_t as_critical)7751c0b2f7Stbbdev void __TBB_EXPORTED_FUNC submit(d1::task& t, d1::task_group_context& ctx, arena* a, std::uintptr_t as_critical) {
7851c0b2f7Stbbdev suppress_unused_warning(as_critical);
7951c0b2f7Stbbdev assert_pointer_valid(a);
8051c0b2f7Stbbdev thread_data& tls = *governor::get_thread_data();
8151c0b2f7Stbbdev
8251c0b2f7Stbbdev // TODO revamp: for each use case investigate neccesity to make this call
8351c0b2f7Stbbdev task_group_context_impl::bind_to(ctx, &tls);
8451c0b2f7Stbbdev task_accessor::context(t) = &ctx;
8551c0b2f7Stbbdev // TODO revamp: consider respecting task isolation if this call is being made by external thread
8651c0b2f7Stbbdev task_accessor::isolation(t) = tls.my_task_dispatcher->m_execute_data_ext.isolation;
8751c0b2f7Stbbdev
8851c0b2f7Stbbdev // TODO: consider code refactoring when lane selection mechanism is unified.
8951c0b2f7Stbbdev
9051c0b2f7Stbbdev if ( tls.is_attached_to(a) ) {
9151c0b2f7Stbbdev arena_slot* slot = tls.my_arena_slot;
9251c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
9351c0b2f7Stbbdev if( as_critical ) {
9451c0b2f7Stbbdev a->my_critical_task_stream.push( &t, subsequent_lane_selector(slot->critical_hint()) );
9551c0b2f7Stbbdev } else
9651c0b2f7Stbbdev #endif
9751c0b2f7Stbbdev {
9851c0b2f7Stbbdev slot->spawn(t);
9951c0b2f7Stbbdev }
10051c0b2f7Stbbdev } else {
10151c0b2f7Stbbdev random_lane_selector lane_selector{tls.my_random};
10251c0b2f7Stbbdev #if !__TBB_PREVIEW_CRITICAL_TASKS
10351c0b2f7Stbbdev suppress_unused_warning(as_critical);
10451c0b2f7Stbbdev #else
10551c0b2f7Stbbdev if ( as_critical ) {
10651c0b2f7Stbbdev a->my_critical_task_stream.push( &t, lane_selector );
10751c0b2f7Stbbdev } else
10851c0b2f7Stbbdev #endif
10951c0b2f7Stbbdev {
11051c0b2f7Stbbdev // Avoid joining the arena the thread is not currently in.
11151c0b2f7Stbbdev a->my_fifo_task_stream.push( &t, lane_selector );
11251c0b2f7Stbbdev }
11351c0b2f7Stbbdev }
11451c0b2f7Stbbdev // It is assumed that some thread will explicitly wait in the arena the task is submitted
11551c0b2f7Stbbdev // into. Therefore, no need to utilize mandatory concurrency here.
11651c0b2f7Stbbdev a->advertise_new_work<arena::work_spawned>();
11751c0b2f7Stbbdev }
11851c0b2f7Stbbdev
execute_and_wait(d1::task & t,d1::task_group_context & t_ctx,d1::wait_context & wait_ctx,d1::task_group_context & w_ctx)11951c0b2f7Stbbdev void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group_context& t_ctx, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
12051c0b2f7Stbbdev task_accessor::context(t) = &t_ctx;
12151c0b2f7Stbbdev task_dispatcher::execute_and_wait(&t, wait_ctx, w_ctx);
12251c0b2f7Stbbdev }
12351c0b2f7Stbbdev
wait(d1::wait_context & wait_ctx,d1::task_group_context & w_ctx)12451c0b2f7Stbbdev void __TBB_EXPORTED_FUNC wait(d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
12551c0b2f7Stbbdev // Enter the task dispatch loop without a task
12651c0b2f7Stbbdev task_dispatcher::execute_and_wait(nullptr, wait_ctx, w_ctx);
12751c0b2f7Stbbdev }
12851c0b2f7Stbbdev
execution_slot(const d1::execution_data * ed)12951c0b2f7Stbbdev d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data* ed) {
13051c0b2f7Stbbdev if (ed) {
13151c0b2f7Stbbdev const execution_data_ext* ed_ext = static_cast<const execution_data_ext*>(ed);
13251c0b2f7Stbbdev assert_pointers_valid(ed_ext->task_disp, ed_ext->task_disp->m_thread_data);
13351c0b2f7Stbbdev return ed_ext->task_disp->m_thread_data->my_arena_index;
13451c0b2f7Stbbdev } else {
13551c0b2f7Stbbdev thread_data* td = governor::get_thread_data_if_initialized();
136c6045a4fSPavel return td ? td->my_arena_index : d1::slot_id(-1);
13751c0b2f7Stbbdev }
13851c0b2f7Stbbdev }
13951c0b2f7Stbbdev
current_context()14051c0b2f7Stbbdev d1::task_group_context* __TBB_EXPORTED_FUNC current_context() {
14151c0b2f7Stbbdev thread_data* td = governor::get_thread_data();
14251c0b2f7Stbbdev assert_pointers_valid(td, td->my_task_dispatcher);
14351c0b2f7Stbbdev
14451c0b2f7Stbbdev task_dispatcher* task_disp = td->my_task_dispatcher;
14551c0b2f7Stbbdev if (task_disp->m_properties.outermost) {
14651c0b2f7Stbbdev // No one task is executed, so no execute_data.
14751c0b2f7Stbbdev return nullptr;
14851c0b2f7Stbbdev } else {
14951c0b2f7Stbbdev return td->my_task_dispatcher->m_execute_data_ext.context;
15051c0b2f7Stbbdev }
15151c0b2f7Stbbdev }
15251c0b2f7Stbbdev
execute_and_wait(d1::task * t,d1::wait_context & wait_ctx,d1::task_group_context & w_ctx)15351c0b2f7Stbbdev void task_dispatcher::execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
15451c0b2f7Stbbdev // Get an associated task dispatcher
15551c0b2f7Stbbdev thread_data* tls = governor::get_thread_data();
15651c0b2f7Stbbdev __TBB_ASSERT(tls->my_task_dispatcher != nullptr, nullptr);
15751c0b2f7Stbbdev task_dispatcher& local_td = *tls->my_task_dispatcher;
15851c0b2f7Stbbdev
15951c0b2f7Stbbdev // TODO: factor out the binding to execute_and_wait_impl
16051c0b2f7Stbbdev if (t) {
16151c0b2f7Stbbdev task_group_context_impl::bind_to(*task_accessor::context(*t), tls);
16251c0b2f7Stbbdev // Propagate the isolation to the task executed without spawn.
16351c0b2f7Stbbdev task_accessor::isolation(*t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
16451c0b2f7Stbbdev }
16551c0b2f7Stbbdev
16651c0b2f7Stbbdev // Waiting on special object tied to a waiting thread.
16751c0b2f7Stbbdev external_waiter waiter{ *tls->my_arena, wait_ctx };
16851c0b2f7Stbbdev t = local_td.local_wait_for_all(t, waiter);
16951c0b2f7Stbbdev __TBB_ASSERT_EX(t == nullptr, "External waiter must not leave dispatch loop with a task");
17051c0b2f7Stbbdev
171b15aabb3Stbbdev // The external thread couldn't exit the dispatch loop in an idle state
17251c0b2f7Stbbdev if (local_td.m_thread_data->my_inbox.is_idle_state(true)) {
17351c0b2f7Stbbdev local_td.m_thread_data->my_inbox.set_is_idle(false);
17451c0b2f7Stbbdev }
17551c0b2f7Stbbdev
176a080baf9SAlex auto exception = w_ctx.my_exception.load(std::memory_order_acquire);
177a080baf9SAlex if (exception) {
17851c0b2f7Stbbdev __TBB_ASSERT(w_ctx.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
179a080baf9SAlex exception->throw_self();
18051c0b2f7Stbbdev }
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev
18351c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
18451c0b2f7Stbbdev
18551c0b2f7Stbbdev #if _WIN32
co_local_wait_for_all(void * addr)186478de5b1Stbbdev /* [[noreturn]] */ void __stdcall co_local_wait_for_all(void* addr) noexcept
18751c0b2f7Stbbdev #else
188478de5b1Stbbdev /* [[noreturn]] */ void co_local_wait_for_all(unsigned hi, unsigned lo) noexcept
18951c0b2f7Stbbdev #endif
19051c0b2f7Stbbdev {
191478de5b1Stbbdev #if !_WIN32
192478de5b1Stbbdev std::uintptr_t addr = lo;
193478de5b1Stbbdev __TBB_ASSERT(sizeof(addr) == 8 || hi == 0, nullptr);
194478de5b1Stbbdev addr += std::uintptr_t(std::uint64_t(hi) << 32);
195478de5b1Stbbdev #endif
196478de5b1Stbbdev task_dispatcher& task_disp = *reinterpret_cast<task_dispatcher*>(addr);
19751c0b2f7Stbbdev assert_pointers_valid(task_disp.m_thread_data, task_disp.m_thread_data->my_arena);
19851c0b2f7Stbbdev task_disp.set_stealing_threshold(task_disp.m_thread_data->my_arena->calculate_stealing_threshold());
19951c0b2f7Stbbdev __TBB_ASSERT(task_disp.can_steal(), nullptr);
20051c0b2f7Stbbdev task_disp.co_local_wait_for_all();
20151c0b2f7Stbbdev // This code is unreachable
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev
co_local_wait_for_all()20451c0b2f7Stbbdev /* [[noreturn]] */ void task_dispatcher::co_local_wait_for_all() noexcept {
20551c0b2f7Stbbdev // Do not create non-trivial objects on the stack of this function. They will never be destroyed.
20651c0b2f7Stbbdev assert_pointer_valid(m_thread_data);
20751c0b2f7Stbbdev
208*e77098d6SPavel Kumbrasev m_suspend_point->finilize_resume();
20951c0b2f7Stbbdev // Basically calls the user callback passed to the tbb::task::suspend function
210*e77098d6SPavel Kumbrasev do_post_resume_action();
21151c0b2f7Stbbdev
21251c0b2f7Stbbdev // Endless loop here because coroutine could be reused
21343c1805eSAlex d1::task* resume_task{};
21443c1805eSAlex do {
21551c0b2f7Stbbdev arena* a = m_thread_data->my_arena;
21651c0b2f7Stbbdev coroutine_waiter waiter(*a);
21743c1805eSAlex resume_task = local_wait_for_all(nullptr, waiter);
21851c0b2f7Stbbdev assert_task_valid(resume_task);
21951c0b2f7Stbbdev __TBB_ASSERT(this == m_thread_data->my_task_dispatcher, nullptr);
22051c0b2f7Stbbdev
221*e77098d6SPavel Kumbrasev m_thread_data->set_post_resume_action(post_resume_action::cleanup, this);
22243c1805eSAlex
22343c1805eSAlex } while (resume(static_cast<suspend_point_type::resume_task*>(resume_task)->m_target));
22443c1805eSAlex // This code might be unreachable
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev
get_suspend_point()22751c0b2f7Stbbdev d1::suspend_point task_dispatcher::get_suspend_point() {
22851c0b2f7Stbbdev if (m_suspend_point == nullptr) {
22951c0b2f7Stbbdev assert_pointer_valid(m_thread_data);
23051c0b2f7Stbbdev // 0 means that we attach this task dispatcher to the current stack
23151c0b2f7Stbbdev init_suspend_point(m_thread_data->my_arena, 0);
23251c0b2f7Stbbdev }
23351c0b2f7Stbbdev assert_pointer_valid(m_suspend_point);
23451c0b2f7Stbbdev return m_suspend_point;
23551c0b2f7Stbbdev }
init_suspend_point(arena * a,std::size_t stack_size)23651c0b2f7Stbbdev void task_dispatcher::init_suspend_point(arena* a, std::size_t stack_size) {
23751c0b2f7Stbbdev __TBB_ASSERT(m_suspend_point == nullptr, nullptr);
23851c0b2f7Stbbdev m_suspend_point = new(cache_aligned_allocate(sizeof(suspend_point_type)))
23951c0b2f7Stbbdev suspend_point_type(a, stack_size, *this);
24051c0b2f7Stbbdev }
24151c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
24251c0b2f7Stbbdev } // namespace r1
24351c0b2f7Stbbdev } // namespace detail
24451c0b2f7Stbbdev } // namespace tbb
245