xref: /oneTBB/src/tbb/thread_data.h (revision c4568449)
151c0b2f7Stbbdev /*
2*c4568449SPavel Kumbrasev     Copyright (c) 2020-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #ifndef __TBB_thread_data_H
1851c0b2f7Stbbdev #define __TBB_thread_data_H
1951c0b2f7Stbbdev 
2049e08aacStbbdev #include "oneapi/tbb/detail/_task.h"
2149e08aacStbbdev #include "oneapi/tbb/task.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "rml_base.h" // rml::job
2451c0b2f7Stbbdev 
2551c0b2f7Stbbdev #include "scheduler_common.h"
2651c0b2f7Stbbdev #include "arena.h"
2749e08aacStbbdev #include "concurrent_monitor.h"
2851c0b2f7Stbbdev #include "mailbox.h"
2951c0b2f7Stbbdev #include "misc.h" // FastRandom
3051c0b2f7Stbbdev #include "small_object_pool_impl.h"
31*c4568449SPavel Kumbrasev #include "intrusive_list.h"
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev #include <atomic>
3451c0b2f7Stbbdev 
3551c0b2f7Stbbdev namespace tbb {
3651c0b2f7Stbbdev namespace detail {
3751c0b2f7Stbbdev namespace r1 {
3851c0b2f7Stbbdev 
3951c0b2f7Stbbdev class task;
4051c0b2f7Stbbdev class arena_slot;
4151c0b2f7Stbbdev class task_group_context;
4251c0b2f7Stbbdev class task_dispatcher;
43*c4568449SPavel Kumbrasev class thread_dispatcher_client;
4451c0b2f7Stbbdev 
45*c4568449SPavel Kumbrasev class context_list : public intrusive_list<d1::intrusive_list_node> {
4635147e00SIlya Isaev public:
4735147e00SIlya Isaev     bool orphaned{false};
4835147e00SIlya Isaev 
4935147e00SIlya Isaev     //! Last state propagation epoch known to this thread
5035147e00SIlya Isaev     /** Together with the_context_state_propagation_epoch constitute synchronization protocol
5135147e00SIlya Isaev     that keeps hot path of task group context construction destruction mostly
5235147e00SIlya Isaev     lock-free.
5335147e00SIlya Isaev     When local epoch equals the global one, the state of task group contexts
5435147e00SIlya Isaev     registered with this thread is consistent with that of the task group trees
5535147e00SIlya Isaev     they belong to. **/
5635147e00SIlya Isaev     std::atomic<std::uintptr_t> epoch{};
5735147e00SIlya Isaev 
5835147e00SIlya Isaev     //! Mutex protecting access to the list of task group contexts.
5935147e00SIlya Isaev     d1::mutex m_mutex{};
6035147e00SIlya Isaev 
destroy()6135147e00SIlya Isaev     void destroy() {
6235147e00SIlya Isaev         this->~context_list();
6335147e00SIlya Isaev         cache_aligned_deallocate(this);
6435147e00SIlya Isaev     }
6535147e00SIlya Isaev 
remove(d1::intrusive_list_node & val)66*c4568449SPavel Kumbrasev     void remove(d1::intrusive_list_node& val) {
6735147e00SIlya Isaev         mutex::scoped_lock lock(m_mutex);
6835147e00SIlya Isaev 
69*c4568449SPavel Kumbrasev         intrusive_list<d1::intrusive_list_node>::remove(val);
7035147e00SIlya Isaev 
7135147e00SIlya Isaev         if (orphaned && empty()) {
7235147e00SIlya Isaev             lock.release();
7335147e00SIlya Isaev             destroy();
7435147e00SIlya Isaev         }
7535147e00SIlya Isaev     }
7635147e00SIlya Isaev 
push_front(d1::intrusive_list_node & val)77*c4568449SPavel Kumbrasev     void push_front(d1::intrusive_list_node& val) {
7835147e00SIlya Isaev         mutex::scoped_lock lock(m_mutex);
7935147e00SIlya Isaev 
80*c4568449SPavel Kumbrasev         intrusive_list<d1::intrusive_list_node>::push_front(val);
8135147e00SIlya Isaev     }
8235147e00SIlya Isaev 
orphan()8335147e00SIlya Isaev     void orphan() {
8435147e00SIlya Isaev         mutex::scoped_lock lock(m_mutex);
8535147e00SIlya Isaev 
8635147e00SIlya Isaev         orphaned = true;
8735147e00SIlya Isaev         if (empty()) {
8835147e00SIlya Isaev             lock.release();
8935147e00SIlya Isaev             destroy();
9035147e00SIlya Isaev         }
9135147e00SIlya Isaev     }
9235147e00SIlya Isaev };
9335147e00SIlya Isaev 
9451c0b2f7Stbbdev //------------------------------------------------------------------------
9551c0b2f7Stbbdev // Thread Data
9651c0b2f7Stbbdev //------------------------------------------------------------------------
9751c0b2f7Stbbdev class thread_data : public ::rml::job
98*c4568449SPavel Kumbrasev                   , public d1::intrusive_list_node
9951c0b2f7Stbbdev                   , no_copy {
10051c0b2f7Stbbdev public:
thread_data(unsigned short index,bool is_worker)10151c0b2f7Stbbdev     thread_data(unsigned short index, bool is_worker)
10251c0b2f7Stbbdev         : my_arena_index{ index }
10351c0b2f7Stbbdev         , my_is_worker{ is_worker }
10451c0b2f7Stbbdev         , my_task_dispatcher{ nullptr }
105*c4568449SPavel Kumbrasev         , my_arena{ nullptr }
106*c4568449SPavel Kumbrasev         , my_last_client{ nullptr }
10751c0b2f7Stbbdev         , my_arena_slot{}
10851c0b2f7Stbbdev         , my_random{ this }
10951c0b2f7Stbbdev         , my_last_observer{ nullptr }
new(cache_aligned_allocate (sizeof (small_object_pool_impl)))11051c0b2f7Stbbdev         , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}}
11135147e00SIlya Isaev         , my_context_list(new (cache_aligned_allocate(sizeof(context_list))) context_list{})
11251c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
113e77098d6SPavel Kumbrasev         , my_post_resume_action{ task_dispatcher::post_resume_action::none }
11451c0b2f7Stbbdev         , my_post_resume_arg{nullptr}
11551c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
11651c0b2f7Stbbdev     {
11735147e00SIlya Isaev         ITT_SYNC_CREATE(&my_context_list->m_mutex, SyncType_Scheduler, SyncObj_ContextsList);
11851c0b2f7Stbbdev     }
11951c0b2f7Stbbdev 
~thread_data()12051c0b2f7Stbbdev     ~thread_data() {
12135147e00SIlya Isaev         my_context_list->orphan();
12251c0b2f7Stbbdev         my_small_object_pool->destroy();
12351c0b2f7Stbbdev         poison_pointer(my_task_dispatcher);
12451c0b2f7Stbbdev         poison_pointer(my_arena);
12551c0b2f7Stbbdev         poison_pointer(my_arena_slot);
12651c0b2f7Stbbdev         poison_pointer(my_last_observer);
12751c0b2f7Stbbdev         poison_pointer(my_small_object_pool);
12835147e00SIlya Isaev         poison_pointer(my_context_list);
12951c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
13051c0b2f7Stbbdev         poison_pointer(my_post_resume_arg);
13151c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
13251c0b2f7Stbbdev     }
13351c0b2f7Stbbdev 
13451c0b2f7Stbbdev     void attach_arena(arena& a, std::size_t index);
13551c0b2f7Stbbdev     bool is_attached_to(arena*);
13651c0b2f7Stbbdev     void attach_task_dispatcher(task_dispatcher&);
13751c0b2f7Stbbdev     void detach_task_dispatcher();
138219c4252SAlex     void enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold);
139219c4252SAlex     void leave_task_dispatcher();
140*c4568449SPavel Kumbrasev     void propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, uint32_t new_state);
14151c0b2f7Stbbdev 
14251c0b2f7Stbbdev     //! Index of the arena slot the scheduler occupies now, or occupied last time
14351c0b2f7Stbbdev     unsigned short my_arena_index;
14451c0b2f7Stbbdev 
14551c0b2f7Stbbdev     //! Indicates if the thread is created by RML
14651c0b2f7Stbbdev     const bool my_is_worker;
14751c0b2f7Stbbdev 
14851c0b2f7Stbbdev     //! The current task dipsatcher
14951c0b2f7Stbbdev     task_dispatcher* my_task_dispatcher;
15051c0b2f7Stbbdev 
151b15aabb3Stbbdev     //! The arena that I own (if external thread) or am servicing at the moment (if worker)
15251c0b2f7Stbbdev     arena* my_arena;
15351c0b2f7Stbbdev 
154*c4568449SPavel Kumbrasev     thread_dispatcher_client* my_last_client;
155*c4568449SPavel Kumbrasev 
15651c0b2f7Stbbdev     //! Pointer to the slot in the arena we own at the moment
15751c0b2f7Stbbdev     arena_slot* my_arena_slot;
15851c0b2f7Stbbdev 
15951c0b2f7Stbbdev     //! The mailbox (affinity mechanism) the current thread attached to
16051c0b2f7Stbbdev     mail_inbox my_inbox;
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev     //! The random generator
16351c0b2f7Stbbdev     FastRandom my_random;
16451c0b2f7Stbbdev 
16551c0b2f7Stbbdev     //! Last observer in the observers list processed on this slot
16651c0b2f7Stbbdev     observer_proxy* my_last_observer;
16751c0b2f7Stbbdev 
16851c0b2f7Stbbdev     //! Pool of small object for fast task allocation
16951c0b2f7Stbbdev     small_object_pool_impl* my_small_object_pool;
17051c0b2f7Stbbdev 
17135147e00SIlya Isaev     context_list* my_context_list;
17251c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
17351c0b2f7Stbbdev     //! Suspends the current coroutine (task_dispatcher).
17451c0b2f7Stbbdev     void suspend(void* suspend_callback, void* user_callback);
17551c0b2f7Stbbdev 
17651c0b2f7Stbbdev     //! Resumes the target task_dispatcher.
17751c0b2f7Stbbdev     void resume(task_dispatcher& target);
17851c0b2f7Stbbdev 
17951c0b2f7Stbbdev     //! Set post resume action to perform after resume.
set_post_resume_action(task_dispatcher::post_resume_action pra,void * arg)180e77098d6SPavel Kumbrasev     void set_post_resume_action(task_dispatcher::post_resume_action pra, void* arg) {
181e77098d6SPavel Kumbrasev         __TBB_ASSERT(my_post_resume_action == task_dispatcher::post_resume_action::none, "The Post resume action must not be set");
18251c0b2f7Stbbdev         __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument");
18351c0b2f7Stbbdev         my_post_resume_action = pra;
18451c0b2f7Stbbdev         my_post_resume_arg = arg;
18551c0b2f7Stbbdev     }
18651c0b2f7Stbbdev 
clear_post_resume_action()1878dcbd5b1Stbbdev     void clear_post_resume_action() {
188e77098d6SPavel Kumbrasev         my_post_resume_action = task_dispatcher::post_resume_action::none;
1898dcbd5b1Stbbdev         my_post_resume_arg = nullptr;
1908dcbd5b1Stbbdev     }
1918dcbd5b1Stbbdev 
19251c0b2f7Stbbdev     //! The post resume action requested after the swap contexts.
193e77098d6SPavel Kumbrasev     task_dispatcher::post_resume_action my_post_resume_action;
19451c0b2f7Stbbdev 
19551c0b2f7Stbbdev     //! The post resume action argument.
19651c0b2f7Stbbdev     void* my_post_resume_arg;
19751c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
19851c0b2f7Stbbdev 
19951c0b2f7Stbbdev     //! The default context
20051c0b2f7Stbbdev     // TODO: consider using common default context because it is used only to simplify
20151c0b2f7Stbbdev     // cancellation check.
20251c0b2f7Stbbdev     d1::task_group_context my_default_context;
20351c0b2f7Stbbdev };
20451c0b2f7Stbbdev 
attach_arena(arena & a,std::size_t index)20551c0b2f7Stbbdev inline void thread_data::attach_arena(arena& a, std::size_t index) {
20651c0b2f7Stbbdev     my_arena = &a;
20751c0b2f7Stbbdev     my_arena_index = static_cast<unsigned short>(index);
20851c0b2f7Stbbdev     my_arena_slot = a.my_slots + index;
20951c0b2f7Stbbdev     // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
21051c0b2f7Stbbdev     my_inbox.attach(my_arena->mailbox(index));
21151c0b2f7Stbbdev }
21251c0b2f7Stbbdev 
is_attached_to(arena * a)21351c0b2f7Stbbdev inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; }
21451c0b2f7Stbbdev 
attach_task_dispatcher(task_dispatcher & task_disp)21551c0b2f7Stbbdev inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) {
21651c0b2f7Stbbdev     __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr);
21751c0b2f7Stbbdev     __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
21851c0b2f7Stbbdev     task_disp.m_thread_data = this;
21951c0b2f7Stbbdev     my_task_dispatcher = &task_disp;
22051c0b2f7Stbbdev }
22151c0b2f7Stbbdev 
detach_task_dispatcher()22251c0b2f7Stbbdev inline void thread_data::detach_task_dispatcher() {
22351c0b2f7Stbbdev     __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr);
22451c0b2f7Stbbdev     __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr);
22551c0b2f7Stbbdev     my_task_dispatcher->m_thread_data = nullptr;
22651c0b2f7Stbbdev     my_task_dispatcher = nullptr;
22751c0b2f7Stbbdev }
22851c0b2f7Stbbdev 
enter_task_dispatcher(task_dispatcher & task_disp,std::uintptr_t stealing_threshold)229219c4252SAlex inline void thread_data::enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold) {
230219c4252SAlex     task_disp.set_stealing_threshold(stealing_threshold);
231219c4252SAlex     attach_task_dispatcher(task_disp);
232219c4252SAlex }
233219c4252SAlex 
leave_task_dispatcher()234219c4252SAlex inline void thread_data::leave_task_dispatcher() {
235219c4252SAlex     my_task_dispatcher->set_stealing_threshold(0);
236219c4252SAlex     detach_task_dispatcher();
237219c4252SAlex }
238219c4252SAlex 
propagate_task_group_state(std::atomic<std::uint32_t> d1::task_group_context::* mptr_state,d1::task_group_context & src,std::uint32_t new_state)239*c4568449SPavel Kumbrasev inline void thread_data::propagate_task_group_state(std::atomic<std::uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, std::uint32_t new_state) {
240*c4568449SPavel Kumbrasev     mutex::scoped_lock lock(my_context_list->m_mutex);
241*c4568449SPavel Kumbrasev     // Acquire fence is necessary to ensure that the subsequent node->my_next load
242*c4568449SPavel Kumbrasev     // returned the correct value in case it was just inserted in another thread.
243*c4568449SPavel Kumbrasev     // The fence also ensures visibility of the correct ctx.my_parent value.
244*c4568449SPavel Kumbrasev     for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) {
245*c4568449SPavel Kumbrasev         d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it));
246*c4568449SPavel Kumbrasev         if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state)
247*c4568449SPavel Kumbrasev             task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state);
248*c4568449SPavel Kumbrasev     }
249*c4568449SPavel Kumbrasev     // Sync up local propagation epoch with the global one. Release fence prevents
250*c4568449SPavel Kumbrasev     // reordering of possible store to *mptr_state after the sync point.
251*c4568449SPavel Kumbrasev     my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release);
252*c4568449SPavel Kumbrasev }
253*c4568449SPavel Kumbrasev 
25451c0b2f7Stbbdev } // namespace r1
25551c0b2f7Stbbdev } // namespace detail
25651c0b2f7Stbbdev } // namespace tbb
25751c0b2f7Stbbdev 
25851c0b2f7Stbbdev #endif // __TBB_thread_data_H
25951c0b2f7Stbbdev 
260