xref: /oneTBB/src/tbb/task.cpp (revision c4568449)
151c0b2f7Stbbdev /*
2*c4568449SPavel Kumbrasev     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev // Do not include task.h directly. Use scheduler_common.h instead
1851c0b2f7Stbbdev #include "scheduler_common.h"
1951c0b2f7Stbbdev #include "governor.h"
2051c0b2f7Stbbdev #include "arena.h"
2151c0b2f7Stbbdev #include "thread_data.h"
2251c0b2f7Stbbdev #include "task_dispatcher.h"
2351c0b2f7Stbbdev #include "waiters.h"
2451c0b2f7Stbbdev #include "itt_notify.h"
2551c0b2f7Stbbdev 
2649e08aacStbbdev #include "oneapi/tbb/detail/_task.h"
2749e08aacStbbdev #include "oneapi/tbb/partitioner.h"
2849e08aacStbbdev #include "oneapi/tbb/task.h"
2951c0b2f7Stbbdev 
3051c0b2f7Stbbdev #include <cstring>
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev namespace tbb {
3351c0b2f7Stbbdev namespace detail {
3451c0b2f7Stbbdev namespace r1 {
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev //------------------------------------------------------------------------
3751c0b2f7Stbbdev // resumable tasks
3851c0b2f7Stbbdev //------------------------------------------------------------------------
3951c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
4051c0b2f7Stbbdev 
suspend(suspend_callback_type suspend_callback,void * user_callback)4151c0b2f7Stbbdev void suspend(suspend_callback_type suspend_callback, void* user_callback) {
4251c0b2f7Stbbdev     thread_data& td = *governor::get_thread_data();
4351c0b2f7Stbbdev     td.my_task_dispatcher->suspend(suspend_callback, user_callback);
4451c0b2f7Stbbdev     // Do not access td after suspend.
4551c0b2f7Stbbdev }
4651c0b2f7Stbbdev 
resume(suspend_point_type * sp)4751c0b2f7Stbbdev void resume(suspend_point_type* sp) {
4851c0b2f7Stbbdev     assert_pointers_valid(sp, sp->m_arena);
4951c0b2f7Stbbdev     task_dispatcher& task_disp = sp->m_resume_task.m_target;
5051c0b2f7Stbbdev 
51e77098d6SPavel Kumbrasev     if (sp->try_notify_resume()) {
5251c0b2f7Stbbdev         // TODO: remove this work-around
5351c0b2f7Stbbdev         // Prolong the arena's lifetime while all coroutines are alive
5451c0b2f7Stbbdev         // (otherwise the arena can be destroyed while some tasks are suspended).
5551c0b2f7Stbbdev         arena& a = *sp->m_arena;
56*c4568449SPavel Kumbrasev         a.my_references += arena::ref_worker;
5751c0b2f7Stbbdev 
5851c0b2f7Stbbdev         if (task_disp.m_properties.critical_task_allowed) {
5951c0b2f7Stbbdev             // The target is not in the process of executing critical task, so the resume task is not critical.
6051c0b2f7Stbbdev             a.my_resume_task_stream.push(&sp->m_resume_task, random_lane_selector(sp->m_random));
6151c0b2f7Stbbdev         } else {
6251c0b2f7Stbbdev     #if __TBB_PREVIEW_CRITICAL_TASKS
6351c0b2f7Stbbdev             // The target is in the process of executing critical task, so the resume task is critical.
6451c0b2f7Stbbdev             a.my_critical_task_stream.push(&sp->m_resume_task, random_lane_selector(sp->m_random));
6551c0b2f7Stbbdev     #endif
6651c0b2f7Stbbdev         }
6751c0b2f7Stbbdev         // Do not access target after that point.
6851c0b2f7Stbbdev         a.advertise_new_work<arena::wakeup>();
6951c0b2f7Stbbdev         // Release our reference to my_arena.
70*c4568449SPavel Kumbrasev         a.on_thread_leaving(arena::ref_worker);
7151c0b2f7Stbbdev     }
7251c0b2f7Stbbdev 
73e77098d6SPavel Kumbrasev }
74e77098d6SPavel Kumbrasev 
current_suspend_point()7551c0b2f7Stbbdev suspend_point_type* current_suspend_point() {
7651c0b2f7Stbbdev     thread_data& td = *governor::get_thread_data();
7751c0b2f7Stbbdev     return td.my_task_dispatcher->get_suspend_point();
7851c0b2f7Stbbdev }
7951c0b2f7Stbbdev 
create_coroutine(thread_data & td)80*c4568449SPavel Kumbrasev task_dispatcher& create_coroutine(thread_data& td) {
8151c0b2f7Stbbdev     // We may have some task dispatchers cached
8251c0b2f7Stbbdev     task_dispatcher* task_disp = td.my_arena->my_co_cache.pop();
8351c0b2f7Stbbdev     if (!task_disp) {
8451c0b2f7Stbbdev         void* ptr = cache_aligned_allocate(sizeof(task_dispatcher));
8551c0b2f7Stbbdev         task_disp = new(ptr) task_dispatcher(td.my_arena);
86*c4568449SPavel Kumbrasev         task_disp->init_suspend_point(td.my_arena, td.my_arena->my_threading_control->worker_stack_size());
8751c0b2f7Stbbdev     }
8851c0b2f7Stbbdev     // Prolong the arena's lifetime until all coroutines is alive
8951c0b2f7Stbbdev     // (otherwise the arena can be destroyed while some tasks are suspended).
9051c0b2f7Stbbdev     // TODO: consider behavior if there are more than 4K external references.
9151c0b2f7Stbbdev     td.my_arena->my_references += arena::ref_external;
9251c0b2f7Stbbdev     return *task_disp;
9351c0b2f7Stbbdev }
9451c0b2f7Stbbdev 
internal_suspend()95e77098d6SPavel Kumbrasev void task_dispatcher::internal_suspend() {
9651c0b2f7Stbbdev     __TBB_ASSERT(m_thread_data != nullptr, nullptr);
9751c0b2f7Stbbdev 
9851c0b2f7Stbbdev     arena_slot* slot = m_thread_data->my_arena_slot;
9951c0b2f7Stbbdev     __TBB_ASSERT(slot != nullptr, nullptr);
10051c0b2f7Stbbdev 
10151c0b2f7Stbbdev     task_dispatcher& default_task_disp = slot->default_task_dispatcher();
10251c0b2f7Stbbdev     // TODO: simplify the next line, e.g. is_task_dispatcher_recalled( task_dispatcher& )
10351c0b2f7Stbbdev     bool is_recalled = default_task_disp.get_suspend_point()->m_is_owner_recalled.load(std::memory_order_acquire);
10451c0b2f7Stbbdev     task_dispatcher& target = is_recalled ? default_task_disp : create_coroutine(*m_thread_data);
10551c0b2f7Stbbdev 
10651c0b2f7Stbbdev     resume(target);
10751c0b2f7Stbbdev 
10851c0b2f7Stbbdev     if (m_properties.outermost) {
10951c0b2f7Stbbdev         recall_point();
11051c0b2f7Stbbdev     }
11151c0b2f7Stbbdev }
11251c0b2f7Stbbdev 
suspend(suspend_callback_type suspend_callback,void * user_callback)113e77098d6SPavel Kumbrasev void task_dispatcher::suspend(suspend_callback_type suspend_callback, void* user_callback) {
114e77098d6SPavel Kumbrasev     __TBB_ASSERT(suspend_callback != nullptr, nullptr);
115e77098d6SPavel Kumbrasev     __TBB_ASSERT(user_callback != nullptr, nullptr);
116e77098d6SPavel Kumbrasev     suspend_callback(user_callback, get_suspend_point());
117e77098d6SPavel Kumbrasev 
118e77098d6SPavel Kumbrasev     __TBB_ASSERT(m_thread_data != nullptr, nullptr);
119e77098d6SPavel Kumbrasev     __TBB_ASSERT(m_thread_data->my_post_resume_action == post_resume_action::none, nullptr);
120e77098d6SPavel Kumbrasev     __TBB_ASSERT(m_thread_data->my_post_resume_arg == nullptr, nullptr);
121e77098d6SPavel Kumbrasev     internal_suspend();
122e77098d6SPavel Kumbrasev }
123e77098d6SPavel Kumbrasev 
resume(task_dispatcher & target)12443c1805eSAlex bool task_dispatcher::resume(task_dispatcher& target) {
12551c0b2f7Stbbdev     // Do not create non-trivial objects on the stack of this function. They might never be destroyed
12651c0b2f7Stbbdev     {
12751c0b2f7Stbbdev         thread_data* td = m_thread_data;
12851c0b2f7Stbbdev         __TBB_ASSERT(&target != this, "We cannot resume to ourself");
12951c0b2f7Stbbdev         __TBB_ASSERT(td != nullptr, "This task dispatcher must be attach to a thread data");
13051c0b2f7Stbbdev         __TBB_ASSERT(td->my_task_dispatcher == this, "Thread data must be attached to this task dispatcher");
13151c0b2f7Stbbdev 
13251c0b2f7Stbbdev         // Change the task dispatcher
13351c0b2f7Stbbdev         td->detach_task_dispatcher();
13451c0b2f7Stbbdev         td->attach_task_dispatcher(target);
13551c0b2f7Stbbdev     }
13651c0b2f7Stbbdev     __TBB_ASSERT(m_suspend_point != nullptr, "Suspend point must be created");
13751c0b2f7Stbbdev     __TBB_ASSERT(target.m_suspend_point != nullptr, "Suspend point must be created");
13851c0b2f7Stbbdev     // Swap to the target coroutine.
139e77098d6SPavel Kumbrasev 
140e77098d6SPavel Kumbrasev     m_suspend_point->resume(target.m_suspend_point);
14151c0b2f7Stbbdev     // Pay attention that m_thread_data can be changed after resume
14243c1805eSAlex     if (m_thread_data) {
14351c0b2f7Stbbdev         thread_data* td = m_thread_data;
14451c0b2f7Stbbdev         __TBB_ASSERT(td != nullptr, "This task dispatcher must be attach to a thread data");
14551c0b2f7Stbbdev         __TBB_ASSERT(td->my_task_dispatcher == this, "Thread data must be attached to this task dispatcher");
146e77098d6SPavel Kumbrasev         do_post_resume_action();
14751c0b2f7Stbbdev 
14851c0b2f7Stbbdev         // Remove the recall flag if the thread in its original task dispatcher
14951c0b2f7Stbbdev         arena_slot* slot = td->my_arena_slot;
15051c0b2f7Stbbdev         __TBB_ASSERT(slot != nullptr, nullptr);
15151c0b2f7Stbbdev         if (this == slot->my_default_task_dispatcher) {
15251c0b2f7Stbbdev             __TBB_ASSERT(m_suspend_point != nullptr, nullptr);
15351c0b2f7Stbbdev             m_suspend_point->m_is_owner_recalled.store(false, std::memory_order_relaxed);
15451c0b2f7Stbbdev         }
15543c1805eSAlex         return true;
15651c0b2f7Stbbdev     }
15743c1805eSAlex     return false;
15851c0b2f7Stbbdev }
15951c0b2f7Stbbdev 
do_post_resume_action()160e77098d6SPavel Kumbrasev void task_dispatcher::do_post_resume_action() {
161e77098d6SPavel Kumbrasev     thread_data* td = m_thread_data;
162e77098d6SPavel Kumbrasev     switch (td->my_post_resume_action) {
16351c0b2f7Stbbdev     case post_resume_action::register_waiter:
16451c0b2f7Stbbdev     {
165e77098d6SPavel Kumbrasev         __TBB_ASSERT(td->my_post_resume_arg, "The post resume action must have an argument");
166*c4568449SPavel Kumbrasev         static_cast<thread_control_monitor::resume_context*>(td->my_post_resume_arg)->notify();
16751c0b2f7Stbbdev         break;
16851c0b2f7Stbbdev     }
16951c0b2f7Stbbdev     case post_resume_action::cleanup:
17051c0b2f7Stbbdev     {
171e77098d6SPavel Kumbrasev         __TBB_ASSERT(td->my_post_resume_arg, "The post resume action must have an argument");
172e77098d6SPavel Kumbrasev         task_dispatcher* to_cleanup = static_cast<task_dispatcher*>(td->my_post_resume_arg);
17343c1805eSAlex         // Release coroutine's reference to my_arena
174*c4568449SPavel Kumbrasev         td->my_arena->on_thread_leaving(arena::ref_external);
17551c0b2f7Stbbdev         // Cache the coroutine for possible later re-usage
176e77098d6SPavel Kumbrasev         td->my_arena->my_co_cache.push(to_cleanup);
17751c0b2f7Stbbdev         break;
17851c0b2f7Stbbdev     }
17951c0b2f7Stbbdev     case post_resume_action::notify:
18051c0b2f7Stbbdev     {
181e77098d6SPavel Kumbrasev         __TBB_ASSERT(td->my_post_resume_arg, "The post resume action must have an argument");
182e77098d6SPavel Kumbrasev         suspend_point_type* sp = static_cast<suspend_point_type*>(td->my_post_resume_arg);
183e77098d6SPavel Kumbrasev         sp->recall_owner();
184e77098d6SPavel Kumbrasev         // Do not access sp because it can be destroyed after recall
18543c1805eSAlex 
18643c1805eSAlex         auto is_our_suspend_point = [sp] (market_context ctx) {
18743c1805eSAlex             return std::uintptr_t(sp) == ctx.my_uniq_addr;
18843c1805eSAlex         };
189*c4568449SPavel Kumbrasev         td->my_arena->get_waiting_threads_monitor().notify(is_our_suspend_point);
19051c0b2f7Stbbdev         break;
19151c0b2f7Stbbdev     }
19251c0b2f7Stbbdev     default:
193e77098d6SPavel Kumbrasev         __TBB_ASSERT(td->my_post_resume_action == post_resume_action::none, "Unknown post resume action");
194e77098d6SPavel Kumbrasev         __TBB_ASSERT(td->my_post_resume_arg == nullptr, "The post resume argument should not be set");
19551c0b2f7Stbbdev     }
196e77098d6SPavel Kumbrasev     td->clear_post_resume_action();
19751c0b2f7Stbbdev }
19851c0b2f7Stbbdev 
19951c0b2f7Stbbdev #else
20051c0b2f7Stbbdev 
20151c0b2f7Stbbdev void suspend(suspend_callback_type, void*) {
20251c0b2f7Stbbdev     __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform");
20351c0b2f7Stbbdev }
20451c0b2f7Stbbdev 
20551c0b2f7Stbbdev void resume(suspend_point_type*) {
20651c0b2f7Stbbdev     __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform");
20751c0b2f7Stbbdev }
20851c0b2f7Stbbdev 
20951c0b2f7Stbbdev suspend_point_type* current_suspend_point() {
21051c0b2f7Stbbdev     __TBB_ASSERT_RELEASE(false, "Resumable tasks are unsupported on this platform");
21151c0b2f7Stbbdev     return nullptr;
21251c0b2f7Stbbdev }
21351c0b2f7Stbbdev 
21451c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
21551c0b2f7Stbbdev 
notify_waiters(std::uintptr_t wait_ctx_addr)2168dcbd5b1Stbbdev void notify_waiters(std::uintptr_t wait_ctx_addr) {
2174523a761Stbbdev     auto is_related_wait_ctx = [&] (market_context context) {
2188dcbd5b1Stbbdev         return wait_ctx_addr == context.my_uniq_addr;
21949e08aacStbbdev     };
22051c0b2f7Stbbdev 
221*c4568449SPavel Kumbrasev     governor::get_thread_data()->my_arena->get_waiting_threads_monitor().notify(is_related_wait_ctx);
22251c0b2f7Stbbdev }
22351c0b2f7Stbbdev 
22451c0b2f7Stbbdev } // namespace r1
22551c0b2f7Stbbdev } // namespace detail
22651c0b2f7Stbbdev } // namespace tbb
22751c0b2f7Stbbdev 
228