xref: /oneTBB/src/tbb/task.cpp (revision 49e08aac)
151c0b2f7Stbbdev /*
251c0b2f7Stbbdev     Copyright (c) 2005-2020 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev // Do not include task.h directly. Use scheduler_common.h instead
1851c0b2f7Stbbdev #include "scheduler_common.h"
1951c0b2f7Stbbdev #include "governor.h"
2051c0b2f7Stbbdev #include "arena.h"
2151c0b2f7Stbbdev #include "thread_data.h"
2251c0b2f7Stbbdev #include "task_dispatcher.h"
2351c0b2f7Stbbdev #include "waiters.h"
2451c0b2f7Stbbdev #include "itt_notify.h"
2551c0b2f7Stbbdev 
26*49e08aacStbbdev #include "oneapi/tbb/detail/_task.h"
27*49e08aacStbbdev #include "oneapi/tbb/partitioner.h"
28*49e08aacStbbdev #include "oneapi/tbb/task.h"
2951c0b2f7Stbbdev 
3051c0b2f7Stbbdev #include <cstring>
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev namespace tbb {
3351c0b2f7Stbbdev namespace detail {
3451c0b2f7Stbbdev namespace r1 {
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev //------------------------------------------------------------------------
3751c0b2f7Stbbdev // resumable tasks
3851c0b2f7Stbbdev //------------------------------------------------------------------------
3951c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev void suspend(suspend_callback_type suspend_callback, void* user_callback) {
4251c0b2f7Stbbdev     thread_data& td = *governor::get_thread_data();
4351c0b2f7Stbbdev     td.my_task_dispatcher->suspend(suspend_callback, user_callback);
4451c0b2f7Stbbdev     // Do not access td after suspend.
4551c0b2f7Stbbdev }
4651c0b2f7Stbbdev 
4751c0b2f7Stbbdev void resume(suspend_point_type* sp) {
4851c0b2f7Stbbdev     assert_pointers_valid(sp, sp->m_arena);
4951c0b2f7Stbbdev     task_dispatcher& task_disp = sp->m_resume_task.m_target;
5051c0b2f7Stbbdev     __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
5151c0b2f7Stbbdev 
5251c0b2f7Stbbdev     // TODO: remove this work-around
5351c0b2f7Stbbdev     // Prolong the arena's lifetime while all coroutines are alive
5451c0b2f7Stbbdev     // (otherwise the arena can be destroyed while some tasks are suspended).
5551c0b2f7Stbbdev     arena& a = *sp->m_arena;
5651c0b2f7Stbbdev     a.my_references += arena::ref_external;
5751c0b2f7Stbbdev 
5851c0b2f7Stbbdev     if (task_disp.m_properties.critical_task_allowed) {
5951c0b2f7Stbbdev         // The target is not in the process of executing critical task, so the resume task is not critical.
6051c0b2f7Stbbdev         a.my_resume_task_stream.push(&sp->m_resume_task, random_lane_selector(sp->m_random));
6151c0b2f7Stbbdev     } else {
6251c0b2f7Stbbdev #if __TBB_PREVIEW_CRITICAL_TASKS
6351c0b2f7Stbbdev         // The target is in the process of executing critical task, so the resume task is critical.
6451c0b2f7Stbbdev         a.my_critical_task_stream.push(&sp->m_resume_task, random_lane_selector(sp->m_random));
6551c0b2f7Stbbdev #endif
6651c0b2f7Stbbdev     }
6751c0b2f7Stbbdev 
6851c0b2f7Stbbdev     // Do not access target after that point.
6951c0b2f7Stbbdev     a.advertise_new_work<arena::wakeup>();
7051c0b2f7Stbbdev 
7151c0b2f7Stbbdev     // Release our reference to my_arena.
7251c0b2f7Stbbdev     a.on_thread_leaving<arena::ref_external>();
7351c0b2f7Stbbdev }
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev suspend_point_type* current_suspend_point() {
7651c0b2f7Stbbdev     thread_data& td = *governor::get_thread_data();
7751c0b2f7Stbbdev     return td.my_task_dispatcher->get_suspend_point();
7851c0b2f7Stbbdev }
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev static task_dispatcher& create_coroutine(thread_data& td) {
8151c0b2f7Stbbdev     // We may have some task dispatchers cached
8251c0b2f7Stbbdev     task_dispatcher* task_disp = td.my_arena->my_co_cache.pop();
8351c0b2f7Stbbdev     if (!task_disp) {
8451c0b2f7Stbbdev         void* ptr = cache_aligned_allocate(sizeof(task_dispatcher));
8551c0b2f7Stbbdev         task_disp = new(ptr) task_dispatcher(td.my_arena);
8651c0b2f7Stbbdev         task_disp->init_suspend_point(td.my_arena, td.my_arena->my_market->worker_stack_size());
8751c0b2f7Stbbdev     }
8851c0b2f7Stbbdev     // Prolong the arena's lifetime until all coroutines is alive
8951c0b2f7Stbbdev     // (otherwise the arena can be destroyed while some tasks are suspended).
9051c0b2f7Stbbdev     // TODO: consider behavior if there are more than 4K external references.
9151c0b2f7Stbbdev     td.my_arena->my_references += arena::ref_external;
9251c0b2f7Stbbdev     return *task_disp;
9351c0b2f7Stbbdev }
9451c0b2f7Stbbdev 
9551c0b2f7Stbbdev void task_dispatcher::suspend(suspend_callback_type suspend_callback, void* user_callback) {
9651c0b2f7Stbbdev     __TBB_ASSERT(suspend_callback != nullptr, nullptr);
9751c0b2f7Stbbdev     __TBB_ASSERT(user_callback != nullptr, nullptr);
9851c0b2f7Stbbdev     __TBB_ASSERT(m_thread_data != nullptr, nullptr);
9951c0b2f7Stbbdev 
10051c0b2f7Stbbdev     arena_slot* slot = m_thread_data->my_arena_slot;
10151c0b2f7Stbbdev     __TBB_ASSERT(slot != nullptr, nullptr);
10251c0b2f7Stbbdev 
10351c0b2f7Stbbdev     task_dispatcher& default_task_disp = slot->default_task_dispatcher();
10451c0b2f7Stbbdev     // TODO: simplify the next line, e.g. is_task_dispatcher_recalled( task_dispatcher& )
10551c0b2f7Stbbdev     bool is_recalled = default_task_disp.get_suspend_point()->m_is_owner_recalled.load(std::memory_order_acquire);
10651c0b2f7Stbbdev     task_dispatcher& target = is_recalled ? default_task_disp : create_coroutine(*m_thread_data);
10751c0b2f7Stbbdev 
10851c0b2f7Stbbdev     thread_data::suspend_callback_wrapper callback = { suspend_callback, user_callback, get_suspend_point() };
10951c0b2f7Stbbdev     m_thread_data->set_post_resume_action(thread_data::post_resume_action::callback, &callback);
11051c0b2f7Stbbdev     resume(target);
11151c0b2f7Stbbdev 
11251c0b2f7Stbbdev     if (m_properties.outermost) {
11351c0b2f7Stbbdev         recall_point();
11451c0b2f7Stbbdev     }
11551c0b2f7Stbbdev }
11651c0b2f7Stbbdev 
11751c0b2f7Stbbdev void task_dispatcher::resume(task_dispatcher& target) {
11851c0b2f7Stbbdev     // Do not create non-trivial objects on the stack of this function. They might never be destroyed
11951c0b2f7Stbbdev     {
12051c0b2f7Stbbdev         thread_data* td = m_thread_data;
12151c0b2f7Stbbdev         __TBB_ASSERT(&target != this, "We cannot resume to ourself");
12251c0b2f7Stbbdev         __TBB_ASSERT(td != nullptr, "This task dispatcher must be attach to a thread data");
12351c0b2f7Stbbdev         __TBB_ASSERT(td->my_task_dispatcher == this, "Thread data must be attached to this task dispatcher");
12451c0b2f7Stbbdev         __TBB_ASSERT(td->my_post_resume_action != thread_data::post_resume_action::none, "The post resume action must be set");
12551c0b2f7Stbbdev         __TBB_ASSERT(td->my_post_resume_arg, "The post resume action must have an argument");
12651c0b2f7Stbbdev 
12751c0b2f7Stbbdev         // Change the task dispatcher
12851c0b2f7Stbbdev         td->detach_task_dispatcher();
12951c0b2f7Stbbdev         td->attach_task_dispatcher(target);
13051c0b2f7Stbbdev     }
13151c0b2f7Stbbdev     __TBB_ASSERT(m_suspend_point != nullptr, "Suspend point must be created");
13251c0b2f7Stbbdev     __TBB_ASSERT(target.m_suspend_point != nullptr, "Suspend point must be created");
13351c0b2f7Stbbdev     // Swap to the target coroutine.
13451c0b2f7Stbbdev     m_suspend_point->m_co_context.resume(target.m_suspend_point->m_co_context);
13551c0b2f7Stbbdev     // Pay attention that m_thread_data can be changed after resume
13651c0b2f7Stbbdev     {
13751c0b2f7Stbbdev         thread_data* td = m_thread_data;
13851c0b2f7Stbbdev         __TBB_ASSERT(td != nullptr, "This task dispatcher must be attach to a thread data");
13951c0b2f7Stbbdev         __TBB_ASSERT(td->my_task_dispatcher == this, "Thread data must be attached to this task dispatcher");
14051c0b2f7Stbbdev         td->do_post_resume_action();
14151c0b2f7Stbbdev 
14251c0b2f7Stbbdev         // Remove the recall flag if the thread in its original task dispatcher
14351c0b2f7Stbbdev         arena_slot* slot = td->my_arena_slot;
14451c0b2f7Stbbdev         __TBB_ASSERT(slot != nullptr, nullptr);
14551c0b2f7Stbbdev         if (this == slot->my_default_task_dispatcher) {
14651c0b2f7Stbbdev             __TBB_ASSERT(m_suspend_point != nullptr, nullptr);
14751c0b2f7Stbbdev             m_suspend_point->m_is_owner_recalled.store(false, std::memory_order_relaxed);
14851c0b2f7Stbbdev         }
14951c0b2f7Stbbdev     }
15051c0b2f7Stbbdev }
15151c0b2f7Stbbdev 
15251c0b2f7Stbbdev void thread_data::do_post_resume_action() {
15351c0b2f7Stbbdev     __TBB_ASSERT(my_post_resume_action != thread_data::post_resume_action::none, "The post resume action must be set");
15451c0b2f7Stbbdev     __TBB_ASSERT(my_post_resume_arg, "The post resume action must have an argument");
15551c0b2f7Stbbdev 
15651c0b2f7Stbbdev     switch (my_post_resume_action) {
15751c0b2f7Stbbdev     case post_resume_action::register_waiter:
15851c0b2f7Stbbdev     {
15951c0b2f7Stbbdev         auto& data = *static_cast<thread_data::register_waiter_data*>(my_post_resume_arg);
160*49e08aacStbbdev         using state = wait_node::node_state;
161*49e08aacStbbdev         state expected = state::not_ready;
16251c0b2f7Stbbdev 
163*49e08aacStbbdev         // There are three possible situations:
164*49e08aacStbbdev         // - wait_context has finished => call resume by ourselves
165*49e08aacStbbdev         // - wait_context::continue_execution() returns true, but CAS fails => call resume by ourselves
166*49e08aacStbbdev         // - wait_context::continue_execution() returns true, and CAS succeeds => successfully committed to wait list
167*49e08aacStbbdev         if (!data.wo->continue_execution() ||
168*49e08aacStbbdev #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910
169*49e08aacStbbdev             !((std::atomic<unsigned>&)data.node.my_ready_flag).compare_exchange_strong((unsigned&)expected, (unsigned)state::ready))
170*49e08aacStbbdev #else
171*49e08aacStbbdev             !data.node.my_ready_flag.compare_exchange_strong(expected, state::ready))
172*49e08aacStbbdev #endif
173*49e08aacStbbdev         {
174*49e08aacStbbdev             data.node.my_suspend_point->m_arena->my_market->get_wait_list().cancel_wait(data.node);
17551c0b2f7Stbbdev             r1::resume(data.node.my_suspend_point);
17651c0b2f7Stbbdev         }
17751c0b2f7Stbbdev 
17851c0b2f7Stbbdev         break;
17951c0b2f7Stbbdev     }
18051c0b2f7Stbbdev     case post_resume_action::callback:
18151c0b2f7Stbbdev     {
18251c0b2f7Stbbdev         suspend_callback_wrapper callback = *static_cast<suspend_callback_wrapper*>(my_post_resume_arg);
18351c0b2f7Stbbdev         callback();
18451c0b2f7Stbbdev         break;
18551c0b2f7Stbbdev     }
18651c0b2f7Stbbdev     case post_resume_action::cleanup:
18751c0b2f7Stbbdev     {
18851c0b2f7Stbbdev         task_dispatcher* to_cleanup = static_cast<task_dispatcher*>(my_post_resume_arg);
18951c0b2f7Stbbdev         // Release coroutine's reference to my_arena.
19051c0b2f7Stbbdev         my_arena->on_thread_leaving<arena::ref_external>();
19151c0b2f7Stbbdev         // Cache the coroutine for possible later re-usage
19251c0b2f7Stbbdev         my_arena->my_co_cache.push(to_cleanup);
19351c0b2f7Stbbdev         break;
19451c0b2f7Stbbdev     }
19551c0b2f7Stbbdev     case post_resume_action::notify:
19651c0b2f7Stbbdev     {
19751c0b2f7Stbbdev         std::atomic<bool>& owner_recall_flag = *static_cast<std::atomic<bool>*>(my_post_resume_arg);
19851c0b2f7Stbbdev         owner_recall_flag.store(true, std::memory_order_release);
19951c0b2f7Stbbdev         // Do not access recall_flag because it can be destroyed after the notification.
20051c0b2f7Stbbdev         break;
20151c0b2f7Stbbdev     }
20251c0b2f7Stbbdev     default:
20351c0b2f7Stbbdev         __TBB_ASSERT(false, "Unknown post resume action");
20451c0b2f7Stbbdev     }
20551c0b2f7Stbbdev 
20651c0b2f7Stbbdev     my_post_resume_action = post_resume_action::none;
20751c0b2f7Stbbdev     my_post_resume_arg = nullptr;
20851c0b2f7Stbbdev }
20951c0b2f7Stbbdev 
21051c0b2f7Stbbdev #else
21151c0b2f7Stbbdev 
21251c0b2f7Stbbdev void suspend(suspend_callback_type, void*) {
21351c0b2f7Stbbdev     __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform");
21451c0b2f7Stbbdev }
21551c0b2f7Stbbdev 
21651c0b2f7Stbbdev void resume(suspend_point_type*) {
21751c0b2f7Stbbdev     __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform");
21851c0b2f7Stbbdev }
21951c0b2f7Stbbdev 
22051c0b2f7Stbbdev suspend_point_type* current_suspend_point() {
22151c0b2f7Stbbdev     __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform");
22251c0b2f7Stbbdev     return nullptr;
22351c0b2f7Stbbdev }
22451c0b2f7Stbbdev 
22551c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
22651c0b2f7Stbbdev 
227*49e08aacStbbdev void notify_waiters(std::uintptr_t wait_ctx_tag) {
228*49e08aacStbbdev     auto is_related_wait_ctx = [&] (extended_context context) {
229*49e08aacStbbdev         return wait_ctx_tag == context.uniq_ctx;
230*49e08aacStbbdev     };
23151c0b2f7Stbbdev 
232*49e08aacStbbdev     r1::governor::get_thread_data()->my_arena->my_market->get_wait_list().notify(is_related_wait_ctx);
23351c0b2f7Stbbdev }
23451c0b2f7Stbbdev 
23551c0b2f7Stbbdev } // namespace r1
23651c0b2f7Stbbdev } // namespace detail
23751c0b2f7Stbbdev } // namespace tbb
23851c0b2f7Stbbdev 
239