151c0b2f7Stbbdev /* 2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev // Do not include task.h directly. Use scheduler_common.h instead 1851c0b2f7Stbbdev #include "scheduler_common.h" 1951c0b2f7Stbbdev #include "governor.h" 2051c0b2f7Stbbdev #include "arena.h" 2151c0b2f7Stbbdev #include "thread_data.h" 2251c0b2f7Stbbdev #include "task_dispatcher.h" 2351c0b2f7Stbbdev #include "waiters.h" 2451c0b2f7Stbbdev #include "itt_notify.h" 2551c0b2f7Stbbdev 2649e08aacStbbdev #include "oneapi/tbb/detail/_task.h" 2749e08aacStbbdev #include "oneapi/tbb/partitioner.h" 2849e08aacStbbdev #include "oneapi/tbb/task.h" 2951c0b2f7Stbbdev 3051c0b2f7Stbbdev #include <cstring> 3151c0b2f7Stbbdev 3251c0b2f7Stbbdev namespace tbb { 3351c0b2f7Stbbdev namespace detail { 3451c0b2f7Stbbdev namespace r1 { 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev //------------------------------------------------------------------------ 3751c0b2f7Stbbdev // resumable tasks 3851c0b2f7Stbbdev //------------------------------------------------------------------------ 3951c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS 4051c0b2f7Stbbdev 4151c0b2f7Stbbdev void suspend(suspend_callback_type suspend_callback, void* user_callback) { 4251c0b2f7Stbbdev thread_data& td = *governor::get_thread_data(); 4351c0b2f7Stbbdev td.my_task_dispatcher->suspend(suspend_callback, user_callback); 4451c0b2f7Stbbdev // Do not access td after suspend. 4551c0b2f7Stbbdev } 4651c0b2f7Stbbdev 4751c0b2f7Stbbdev void resume(suspend_point_type* sp) { 4851c0b2f7Stbbdev assert_pointers_valid(sp, sp->m_arena); 4951c0b2f7Stbbdev task_dispatcher& task_disp = sp->m_resume_task.m_target; 5051c0b2f7Stbbdev __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr); 5151c0b2f7Stbbdev 5251c0b2f7Stbbdev // TODO: remove this work-around 5351c0b2f7Stbbdev // Prolong the arena's lifetime while all coroutines are alive 5451c0b2f7Stbbdev // (otherwise the arena can be destroyed while some tasks are suspended). 5551c0b2f7Stbbdev arena& a = *sp->m_arena; 5651c0b2f7Stbbdev a.my_references += arena::ref_external; 5751c0b2f7Stbbdev 5851c0b2f7Stbbdev if (task_disp.m_properties.critical_task_allowed) { 5951c0b2f7Stbbdev // The target is not in the process of executing critical task, so the resume task is not critical. 6051c0b2f7Stbbdev a.my_resume_task_stream.push(&sp->m_resume_task, random_lane_selector(sp->m_random)); 6151c0b2f7Stbbdev } else { 6251c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS 6351c0b2f7Stbbdev // The target is in the process of executing critical task, so the resume task is critical. 6451c0b2f7Stbbdev a.my_critical_task_stream.push(&sp->m_resume_task, random_lane_selector(sp->m_random)); 6551c0b2f7Stbbdev #endif 6651c0b2f7Stbbdev } 6751c0b2f7Stbbdev 6851c0b2f7Stbbdev // Do not access target after that point. 6951c0b2f7Stbbdev a.advertise_new_work<arena::wakeup>(); 7051c0b2f7Stbbdev 7151c0b2f7Stbbdev // Release our reference to my_arena. 7251c0b2f7Stbbdev a.on_thread_leaving<arena::ref_external>(); 7351c0b2f7Stbbdev } 7451c0b2f7Stbbdev 7551c0b2f7Stbbdev suspend_point_type* current_suspend_point() { 7651c0b2f7Stbbdev thread_data& td = *governor::get_thread_data(); 7751c0b2f7Stbbdev return td.my_task_dispatcher->get_suspend_point(); 7851c0b2f7Stbbdev } 7951c0b2f7Stbbdev 8051c0b2f7Stbbdev static task_dispatcher& create_coroutine(thread_data& td) { 8151c0b2f7Stbbdev // We may have some task dispatchers cached 8251c0b2f7Stbbdev task_dispatcher* task_disp = td.my_arena->my_co_cache.pop(); 8351c0b2f7Stbbdev if (!task_disp) { 8451c0b2f7Stbbdev void* ptr = cache_aligned_allocate(sizeof(task_dispatcher)); 8551c0b2f7Stbbdev task_disp = new(ptr) task_dispatcher(td.my_arena); 8651c0b2f7Stbbdev task_disp->init_suspend_point(td.my_arena, td.my_arena->my_market->worker_stack_size()); 8751c0b2f7Stbbdev } 8851c0b2f7Stbbdev // Prolong the arena's lifetime until all coroutines is alive 8951c0b2f7Stbbdev // (otherwise the arena can be destroyed while some tasks are suspended). 9051c0b2f7Stbbdev // TODO: consider behavior if there are more than 4K external references. 9151c0b2f7Stbbdev td.my_arena->my_references += arena::ref_external; 9251c0b2f7Stbbdev return *task_disp; 9351c0b2f7Stbbdev } 9451c0b2f7Stbbdev 9551c0b2f7Stbbdev void task_dispatcher::suspend(suspend_callback_type suspend_callback, void* user_callback) { 9651c0b2f7Stbbdev __TBB_ASSERT(suspend_callback != nullptr, nullptr); 9751c0b2f7Stbbdev __TBB_ASSERT(user_callback != nullptr, nullptr); 9851c0b2f7Stbbdev __TBB_ASSERT(m_thread_data != nullptr, nullptr); 9951c0b2f7Stbbdev 10051c0b2f7Stbbdev arena_slot* slot = m_thread_data->my_arena_slot; 10151c0b2f7Stbbdev __TBB_ASSERT(slot != nullptr, nullptr); 10251c0b2f7Stbbdev 10351c0b2f7Stbbdev task_dispatcher& default_task_disp = slot->default_task_dispatcher(); 10451c0b2f7Stbbdev // TODO: simplify the next line, e.g. is_task_dispatcher_recalled( task_dispatcher& ) 10551c0b2f7Stbbdev bool is_recalled = default_task_disp.get_suspend_point()->m_is_owner_recalled.load(std::memory_order_acquire); 10651c0b2f7Stbbdev task_dispatcher& target = is_recalled ? default_task_disp : create_coroutine(*m_thread_data); 10751c0b2f7Stbbdev 10851c0b2f7Stbbdev thread_data::suspend_callback_wrapper callback = { suspend_callback, user_callback, get_suspend_point() }; 10951c0b2f7Stbbdev m_thread_data->set_post_resume_action(thread_data::post_resume_action::callback, &callback); 11051c0b2f7Stbbdev resume(target); 11151c0b2f7Stbbdev 11251c0b2f7Stbbdev if (m_properties.outermost) { 11351c0b2f7Stbbdev recall_point(); 11451c0b2f7Stbbdev } 11551c0b2f7Stbbdev } 11651c0b2f7Stbbdev 117*43c1805eSAlex bool task_dispatcher::resume(task_dispatcher& target) { 11851c0b2f7Stbbdev // Do not create non-trivial objects on the stack of this function. They might never be destroyed 11951c0b2f7Stbbdev { 12051c0b2f7Stbbdev thread_data* td = m_thread_data; 12151c0b2f7Stbbdev __TBB_ASSERT(&target != this, "We cannot resume to ourself"); 12251c0b2f7Stbbdev __TBB_ASSERT(td != nullptr, "This task dispatcher must be attach to a thread data"); 12351c0b2f7Stbbdev __TBB_ASSERT(td->my_task_dispatcher == this, "Thread data must be attached to this task dispatcher"); 12451c0b2f7Stbbdev __TBB_ASSERT(td->my_post_resume_action != thread_data::post_resume_action::none, "The post resume action must be set"); 12551c0b2f7Stbbdev __TBB_ASSERT(td->my_post_resume_arg, "The post resume action must have an argument"); 12651c0b2f7Stbbdev 12751c0b2f7Stbbdev // Change the task dispatcher 12851c0b2f7Stbbdev td->detach_task_dispatcher(); 12951c0b2f7Stbbdev td->attach_task_dispatcher(target); 13051c0b2f7Stbbdev } 13151c0b2f7Stbbdev __TBB_ASSERT(m_suspend_point != nullptr, "Suspend point must be created"); 13251c0b2f7Stbbdev __TBB_ASSERT(target.m_suspend_point != nullptr, "Suspend point must be created"); 13351c0b2f7Stbbdev // Swap to the target coroutine. 13451c0b2f7Stbbdev m_suspend_point->m_co_context.resume(target.m_suspend_point->m_co_context); 13551c0b2f7Stbbdev // Pay attention that m_thread_data can be changed after resume 136*43c1805eSAlex if (m_thread_data) { 13751c0b2f7Stbbdev thread_data* td = m_thread_data; 13851c0b2f7Stbbdev __TBB_ASSERT(td != nullptr, "This task dispatcher must be attach to a thread data"); 13951c0b2f7Stbbdev __TBB_ASSERT(td->my_task_dispatcher == this, "Thread data must be attached to this task dispatcher"); 14051c0b2f7Stbbdev td->do_post_resume_action(); 14151c0b2f7Stbbdev 14251c0b2f7Stbbdev // Remove the recall flag if the thread in its original task dispatcher 14351c0b2f7Stbbdev arena_slot* slot = td->my_arena_slot; 14451c0b2f7Stbbdev __TBB_ASSERT(slot != nullptr, nullptr); 14551c0b2f7Stbbdev if (this == slot->my_default_task_dispatcher) { 14651c0b2f7Stbbdev __TBB_ASSERT(m_suspend_point != nullptr, nullptr); 14751c0b2f7Stbbdev m_suspend_point->m_is_owner_recalled.store(false, std::memory_order_relaxed); 14851c0b2f7Stbbdev } 149*43c1805eSAlex return true; 15051c0b2f7Stbbdev } 151*43c1805eSAlex return false; 15251c0b2f7Stbbdev } 15351c0b2f7Stbbdev 15451c0b2f7Stbbdev void thread_data::do_post_resume_action() { 15551c0b2f7Stbbdev __TBB_ASSERT(my_post_resume_action != thread_data::post_resume_action::none, "The post resume action must be set"); 15651c0b2f7Stbbdev __TBB_ASSERT(my_post_resume_arg, "The post resume action must have an argument"); 15751c0b2f7Stbbdev 15851c0b2f7Stbbdev switch (my_post_resume_action) { 15951c0b2f7Stbbdev case post_resume_action::register_waiter: 16051c0b2f7Stbbdev { 1614523a761Stbbdev static_cast<market_concurrent_monitor::resume_context*>(my_post_resume_arg)->notify(); 1628dcbd5b1Stbbdev break; 16351c0b2f7Stbbdev } 1648dcbd5b1Stbbdev case post_resume_action::resume: 1658dcbd5b1Stbbdev { 1668dcbd5b1Stbbdev r1::resume(static_cast<suspend_point_type*>(my_post_resume_arg)); 16751c0b2f7Stbbdev break; 16851c0b2f7Stbbdev } 16951c0b2f7Stbbdev case post_resume_action::callback: 17051c0b2f7Stbbdev { 17151c0b2f7Stbbdev suspend_callback_wrapper callback = *static_cast<suspend_callback_wrapper*>(my_post_resume_arg); 17251c0b2f7Stbbdev callback(); 17351c0b2f7Stbbdev break; 17451c0b2f7Stbbdev } 17551c0b2f7Stbbdev case post_resume_action::cleanup: 17651c0b2f7Stbbdev { 17751c0b2f7Stbbdev task_dispatcher* to_cleanup = static_cast<task_dispatcher*>(my_post_resume_arg); 178*43c1805eSAlex // Release coroutine's reference to my_arena 17951c0b2f7Stbbdev my_arena->on_thread_leaving<arena::ref_external>(); 18051c0b2f7Stbbdev // Cache the coroutine for possible later re-usage 18151c0b2f7Stbbdev my_arena->my_co_cache.push(to_cleanup); 18251c0b2f7Stbbdev break; 18351c0b2f7Stbbdev } 18451c0b2f7Stbbdev case post_resume_action::notify: 18551c0b2f7Stbbdev { 186*43c1805eSAlex suspend_point_type* sp = static_cast<suspend_point_type*>(my_post_resume_arg); 187*43c1805eSAlex sp->m_is_owner_recalled.store(true, std::memory_order_release); 188*43c1805eSAlex // Do not access sp because it can be destroyed after the store 189*43c1805eSAlex 190*43c1805eSAlex auto is_our_suspend_point = [sp](market_context ctx) { 191*43c1805eSAlex return std::uintptr_t(sp) == ctx.my_uniq_addr; 192*43c1805eSAlex }; 193*43c1805eSAlex my_arena->my_market->get_wait_list().notify(is_our_suspend_point); 19451c0b2f7Stbbdev break; 19551c0b2f7Stbbdev } 19651c0b2f7Stbbdev default: 19751c0b2f7Stbbdev __TBB_ASSERT(false, "Unknown post resume action"); 19851c0b2f7Stbbdev } 19951c0b2f7Stbbdev 20051c0b2f7Stbbdev my_post_resume_action = post_resume_action::none; 20151c0b2f7Stbbdev my_post_resume_arg = nullptr; 20251c0b2f7Stbbdev } 20351c0b2f7Stbbdev 20451c0b2f7Stbbdev #else 20551c0b2f7Stbbdev 20651c0b2f7Stbbdev void suspend(suspend_callback_type, void*) { 20751c0b2f7Stbbdev __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform"); 20851c0b2f7Stbbdev } 20951c0b2f7Stbbdev 21051c0b2f7Stbbdev void resume(suspend_point_type*) { 21151c0b2f7Stbbdev __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform"); 21251c0b2f7Stbbdev } 21351c0b2f7Stbbdev 21451c0b2f7Stbbdev suspend_point_type* current_suspend_point() { 21551c0b2f7Stbbdev __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform"); 21651c0b2f7Stbbdev return nullptr; 21751c0b2f7Stbbdev } 21851c0b2f7Stbbdev 21951c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */ 22051c0b2f7Stbbdev 2218dcbd5b1Stbbdev void notify_waiters(std::uintptr_t wait_ctx_addr) { 2224523a761Stbbdev auto is_related_wait_ctx = [&] (market_context context) { 2238dcbd5b1Stbbdev return wait_ctx_addr == context.my_uniq_addr; 22449e08aacStbbdev }; 22551c0b2f7Stbbdev 22649e08aacStbbdev r1::governor::get_thread_data()->my_arena->my_market->get_wait_list().notify(is_related_wait_ctx); 22751c0b2f7Stbbdev } 22851c0b2f7Stbbdev 22951c0b2f7Stbbdev } // namespace r1 23051c0b2f7Stbbdev } // namespace detail 23151c0b2f7Stbbdev } // namespace tbb 23251c0b2f7Stbbdev 233