xref: /oneTBB/src/tbb/thread_data.h (revision c4568449)
1 /*
2     Copyright (c) 2020-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_thread_data_H
18 #define __TBB_thread_data_H
19 
20 #include "oneapi/tbb/detail/_task.h"
21 #include "oneapi/tbb/task.h"
22 
23 #include "rml_base.h" // rml::job
24 
25 #include "scheduler_common.h"
26 #include "arena.h"
27 #include "concurrent_monitor.h"
28 #include "mailbox.h"
29 #include "misc.h" // FastRandom
30 #include "small_object_pool_impl.h"
31 #include "intrusive_list.h"
32 
33 #include <atomic>
34 
35 namespace tbb {
36 namespace detail {
37 namespace r1 {
38 
39 class task;
40 class arena_slot;
41 class task_group_context;
42 class task_dispatcher;
43 class thread_dispatcher_client;
44 
45 class context_list : public intrusive_list<d1::intrusive_list_node> {
46 public:
47     bool orphaned{false};
48 
49     //! Last state propagation epoch known to this thread
50     /** Together with the_context_state_propagation_epoch constitute synchronization protocol
51     that keeps hot path of task group context construction destruction mostly
52     lock-free.
53     When local epoch equals the global one, the state of task group contexts
54     registered with this thread is consistent with that of the task group trees
55     they belong to. **/
56     std::atomic<std::uintptr_t> epoch{};
57 
58     //! Mutex protecting access to the list of task group contexts.
59     d1::mutex m_mutex{};
60 
destroy()61     void destroy() {
62         this->~context_list();
63         cache_aligned_deallocate(this);
64     }
65 
remove(d1::intrusive_list_node & val)66     void remove(d1::intrusive_list_node& val) {
67         mutex::scoped_lock lock(m_mutex);
68 
69         intrusive_list<d1::intrusive_list_node>::remove(val);
70 
71         if (orphaned && empty()) {
72             lock.release();
73             destroy();
74         }
75     }
76 
push_front(d1::intrusive_list_node & val)77     void push_front(d1::intrusive_list_node& val) {
78         mutex::scoped_lock lock(m_mutex);
79 
80         intrusive_list<d1::intrusive_list_node>::push_front(val);
81     }
82 
orphan()83     void orphan() {
84         mutex::scoped_lock lock(m_mutex);
85 
86         orphaned = true;
87         if (empty()) {
88             lock.release();
89             destroy();
90         }
91     }
92 };
93 
94 //------------------------------------------------------------------------
95 // Thread Data
96 //------------------------------------------------------------------------
97 class thread_data : public ::rml::job
98                   , public d1::intrusive_list_node
99                   , no_copy {
100 public:
thread_data(unsigned short index,bool is_worker)101     thread_data(unsigned short index, bool is_worker)
102         : my_arena_index{ index }
103         , my_is_worker{ is_worker }
104         , my_task_dispatcher{ nullptr }
105         , my_arena{ nullptr }
106         , my_last_client{ nullptr }
107         , my_arena_slot{}
108         , my_random{ this }
109         , my_last_observer{ nullptr }
new(cache_aligned_allocate (sizeof (small_object_pool_impl)))110         , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}}
111         , my_context_list(new (cache_aligned_allocate(sizeof(context_list))) context_list{})
112 #if __TBB_RESUMABLE_TASKS
113         , my_post_resume_action{ task_dispatcher::post_resume_action::none }
114         , my_post_resume_arg{nullptr}
115 #endif /* __TBB_RESUMABLE_TASKS */
116     {
117         ITT_SYNC_CREATE(&my_context_list->m_mutex, SyncType_Scheduler, SyncObj_ContextsList);
118     }
119 
~thread_data()120     ~thread_data() {
121         my_context_list->orphan();
122         my_small_object_pool->destroy();
123         poison_pointer(my_task_dispatcher);
124         poison_pointer(my_arena);
125         poison_pointer(my_arena_slot);
126         poison_pointer(my_last_observer);
127         poison_pointer(my_small_object_pool);
128         poison_pointer(my_context_list);
129 #if __TBB_RESUMABLE_TASKS
130         poison_pointer(my_post_resume_arg);
131 #endif /* __TBB_RESUMABLE_TASKS */
132     }
133 
134     void attach_arena(arena& a, std::size_t index);
135     bool is_attached_to(arena*);
136     void attach_task_dispatcher(task_dispatcher&);
137     void detach_task_dispatcher();
138     void enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold);
139     void leave_task_dispatcher();
140     void propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, uint32_t new_state);
141 
142     //! Index of the arena slot the scheduler occupies now, or occupied last time
143     unsigned short my_arena_index;
144 
145     //! Indicates if the thread is created by RML
146     const bool my_is_worker;
147 
148     //! The current task dipsatcher
149     task_dispatcher* my_task_dispatcher;
150 
151     //! The arena that I own (if external thread) or am servicing at the moment (if worker)
152     arena* my_arena;
153 
154     thread_dispatcher_client* my_last_client;
155 
156     //! Pointer to the slot in the arena we own at the moment
157     arena_slot* my_arena_slot;
158 
159     //! The mailbox (affinity mechanism) the current thread attached to
160     mail_inbox my_inbox;
161 
162     //! The random generator
163     FastRandom my_random;
164 
165     //! Last observer in the observers list processed on this slot
166     observer_proxy* my_last_observer;
167 
168     //! Pool of small object for fast task allocation
169     small_object_pool_impl* my_small_object_pool;
170 
171     context_list* my_context_list;
172 #if __TBB_RESUMABLE_TASKS
173     //! Suspends the current coroutine (task_dispatcher).
174     void suspend(void* suspend_callback, void* user_callback);
175 
176     //! Resumes the target task_dispatcher.
177     void resume(task_dispatcher& target);
178 
179     //! Set post resume action to perform after resume.
set_post_resume_action(task_dispatcher::post_resume_action pra,void * arg)180     void set_post_resume_action(task_dispatcher::post_resume_action pra, void* arg) {
181         __TBB_ASSERT(my_post_resume_action == task_dispatcher::post_resume_action::none, "The Post resume action must not be set");
182         __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument");
183         my_post_resume_action = pra;
184         my_post_resume_arg = arg;
185     }
186 
clear_post_resume_action()187     void clear_post_resume_action() {
188         my_post_resume_action = task_dispatcher::post_resume_action::none;
189         my_post_resume_arg = nullptr;
190     }
191 
192     //! The post resume action requested after the swap contexts.
193     task_dispatcher::post_resume_action my_post_resume_action;
194 
195     //! The post resume action argument.
196     void* my_post_resume_arg;
197 #endif /* __TBB_RESUMABLE_TASKS */
198 
199     //! The default context
200     // TODO: consider using common default context because it is used only to simplify
201     // cancellation check.
202     d1::task_group_context my_default_context;
203 };
204 
attach_arena(arena & a,std::size_t index)205 inline void thread_data::attach_arena(arena& a, std::size_t index) {
206     my_arena = &a;
207     my_arena_index = static_cast<unsigned short>(index);
208     my_arena_slot = a.my_slots + index;
209     // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
210     my_inbox.attach(my_arena->mailbox(index));
211 }
212 
is_attached_to(arena * a)213 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; }
214 
attach_task_dispatcher(task_dispatcher & task_disp)215 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) {
216     __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr);
217     __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
218     task_disp.m_thread_data = this;
219     my_task_dispatcher = &task_disp;
220 }
221 
detach_task_dispatcher()222 inline void thread_data::detach_task_dispatcher() {
223     __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr);
224     __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr);
225     my_task_dispatcher->m_thread_data = nullptr;
226     my_task_dispatcher = nullptr;
227 }
228 
enter_task_dispatcher(task_dispatcher & task_disp,std::uintptr_t stealing_threshold)229 inline void thread_data::enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold) {
230     task_disp.set_stealing_threshold(stealing_threshold);
231     attach_task_dispatcher(task_disp);
232 }
233 
leave_task_dispatcher()234 inline void thread_data::leave_task_dispatcher() {
235     my_task_dispatcher->set_stealing_threshold(0);
236     detach_task_dispatcher();
237 }
238 
propagate_task_group_state(std::atomic<std::uint32_t> d1::task_group_context::* mptr_state,d1::task_group_context & src,std::uint32_t new_state)239 inline void thread_data::propagate_task_group_state(std::atomic<std::uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, std::uint32_t new_state) {
240     mutex::scoped_lock lock(my_context_list->m_mutex);
241     // Acquire fence is necessary to ensure that the subsequent node->my_next load
242     // returned the correct value in case it was just inserted in another thread.
243     // The fence also ensures visibility of the correct ctx.my_parent value.
244     for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) {
245         d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it));
246         if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state)
247             task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state);
248     }
249     // Sync up local propagation epoch with the global one. Release fence prevents
250     // reordering of possible store to *mptr_state after the sync point.
251     my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release);
252 }
253 
254 } // namespace r1
255 } // namespace detail
256 } // namespace tbb
257 
258 #endif // __TBB_thread_data_H
259 
260