xref: /oneTBB/src/tbb/thread_data.h (revision e77098d6)
1 /*
2     Copyright (c) 2020-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_thread_data_H
18 #define __TBB_thread_data_H
19 
20 #include "oneapi/tbb/detail/_task.h"
21 #include "oneapi/tbb/task.h"
22 
23 #include "rml_base.h" // rml::job
24 
25 #include "scheduler_common.h"
26 #include "arena.h"
27 #include "concurrent_monitor.h"
28 #include "mailbox.h"
29 #include "misc.h" // FastRandom
30 #include "small_object_pool_impl.h"
31 
32 #include <atomic>
33 
34 namespace tbb {
35 namespace detail {
36 namespace r1 {
37 
38 class task;
39 class arena_slot;
40 class task_group_context;
41 class task_dispatcher;
42 
43 class context_list : public intrusive_list<intrusive_list_node> {
44 public:
45     bool orphaned{false};
46 
47     //! Last state propagation epoch known to this thread
48     /** Together with the_context_state_propagation_epoch constitute synchronization protocol
49     that keeps hot path of task group context construction destruction mostly
50     lock-free.
51     When local epoch equals the global one, the state of task group contexts
52     registered with this thread is consistent with that of the task group trees
53     they belong to. **/
54     std::atomic<std::uintptr_t> epoch{};
55 
56     //! Mutex protecting access to the list of task group contexts.
57     d1::mutex m_mutex{};
58 
59     void destroy() {
60         this->~context_list();
61         cache_aligned_deallocate(this);
62     }
63 
64     void remove(intrusive_list_node& val) {
65         mutex::scoped_lock lock(m_mutex);
66 
67         intrusive_list<intrusive_list_node>::remove(val);
68 
69         if (orphaned && empty()) {
70             lock.release();
71             destroy();
72         }
73     }
74 
75     void push_front(intrusive_list_node& val) {
76         mutex::scoped_lock lock(m_mutex);
77 
78         intrusive_list<intrusive_list_node>::push_front(val);
79     }
80 
81     void orphan() {
82         mutex::scoped_lock lock(m_mutex);
83 
84         orphaned = true;
85         if (empty()) {
86             lock.release();
87             destroy();
88         }
89     }
90 };
91 
92 //------------------------------------------------------------------------
93 // Thread Data
94 //------------------------------------------------------------------------
95 class thread_data : public ::rml::job
96                   , public intrusive_list_node
97                   , no_copy {
98 public:
99     thread_data(unsigned short index, bool is_worker)
100         : my_arena_index{ index }
101         , my_is_worker{ is_worker }
102         , my_task_dispatcher{ nullptr }
103         , my_arena{}
104         , my_arena_slot{}
105         , my_random{ this }
106         , my_last_observer{ nullptr }
107         , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}}
108         , my_context_list(new (cache_aligned_allocate(sizeof(context_list))) context_list{})
109 #if __TBB_RESUMABLE_TASKS
110         , my_post_resume_action{ task_dispatcher::post_resume_action::none }
111         , my_post_resume_arg{nullptr}
112 #endif /* __TBB_RESUMABLE_TASKS */
113     {
114         ITT_SYNC_CREATE(&my_context_list->m_mutex, SyncType_Scheduler, SyncObj_ContextsList);
115     }
116 
117     ~thread_data() {
118         my_context_list->orphan();
119         my_small_object_pool->destroy();
120         poison_pointer(my_task_dispatcher);
121         poison_pointer(my_arena);
122         poison_pointer(my_arena_slot);
123         poison_pointer(my_last_observer);
124         poison_pointer(my_small_object_pool);
125         poison_pointer(my_context_list);
126 #if __TBB_RESUMABLE_TASKS
127         poison_pointer(my_post_resume_arg);
128 #endif /* __TBB_RESUMABLE_TASKS */
129     }
130 
131     void attach_arena(arena& a, std::size_t index);
132     bool is_attached_to(arena*);
133     void attach_task_dispatcher(task_dispatcher&);
134     void detach_task_dispatcher();
135     void enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold);
136     void leave_task_dispatcher();
137     template <typename T>
138     void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state);
139 
140     //! Index of the arena slot the scheduler occupies now, or occupied last time
141     unsigned short my_arena_index;
142 
143     //! Indicates if the thread is created by RML
144     const bool my_is_worker;
145 
146     //! The current task dipsatcher
147     task_dispatcher* my_task_dispatcher;
148 
149     //! The arena that I own (if external thread) or am servicing at the moment (if worker)
150     arena* my_arena;
151 
152     //! Pointer to the slot in the arena we own at the moment
153     arena_slot* my_arena_slot;
154 
155     //! The mailbox (affinity mechanism) the current thread attached to
156     mail_inbox my_inbox;
157 
158     //! The random generator
159     FastRandom my_random;
160 
161     //! Last observer in the observers list processed on this slot
162     observer_proxy* my_last_observer;
163 
164     //! Pool of small object for fast task allocation
165     small_object_pool_impl* my_small_object_pool;
166 
167     context_list* my_context_list;
168 #if __TBB_RESUMABLE_TASKS
169     //! The callback to call the user callback passed to tbb::suspend.
170     struct suspend_callback_wrapper {
171         suspend_callback_type suspend_callback;
172         void* user_callback;
173         suspend_point_type* sp;
174 
175         void operator()() {
176             __TBB_ASSERT(suspend_callback && user_callback && sp, nullptr);
177             suspend_callback(user_callback, sp);
178         }
179     };
180 
181     //! Suspends the current coroutine (task_dispatcher).
182     void suspend(void* suspend_callback, void* user_callback);
183 
184     //! Resumes the target task_dispatcher.
185     void resume(task_dispatcher& target);
186 
187     //! Set post resume action to perform after resume.
188     void set_post_resume_action(task_dispatcher::post_resume_action pra, void* arg) {
189         __TBB_ASSERT(my_post_resume_action == task_dispatcher::post_resume_action::none, "The Post resume action must not be set");
190         __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument");
191         my_post_resume_action = pra;
192         my_post_resume_arg = arg;
193     }
194 
195     void clear_post_resume_action() {
196         my_post_resume_action = task_dispatcher::post_resume_action::none;
197         my_post_resume_arg = nullptr;
198     }
199 
200     //! The post resume action requested after the swap contexts.
201     task_dispatcher::post_resume_action my_post_resume_action;
202 
203     //! The post resume action argument.
204     void* my_post_resume_arg;
205 #endif /* __TBB_RESUMABLE_TASKS */
206 
207     //! The default context
208     // TODO: consider using common default context because it is used only to simplify
209     // cancellation check.
210     d1::task_group_context my_default_context;
211 };
212 
213 inline void thread_data::attach_arena(arena& a, std::size_t index) {
214     my_arena = &a;
215     my_arena_index = static_cast<unsigned short>(index);
216     my_arena_slot = a.my_slots + index;
217     // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
218     my_inbox.attach(my_arena->mailbox(index));
219 }
220 
221 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; }
222 
223 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) {
224     __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr);
225     __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
226     task_disp.m_thread_data = this;
227     my_task_dispatcher = &task_disp;
228 }
229 
230 inline void thread_data::detach_task_dispatcher() {
231     __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr);
232     __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr);
233     my_task_dispatcher->m_thread_data = nullptr;
234     my_task_dispatcher = nullptr;
235 }
236 
237 inline void thread_data::enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold) {
238     task_disp.set_stealing_threshold(stealing_threshold);
239     attach_task_dispatcher(task_disp);
240 }
241 
242 inline void thread_data::leave_task_dispatcher() {
243     my_task_dispatcher->set_stealing_threshold(0);
244     detach_task_dispatcher();
245 }
246 
247 } // namespace r1
248 } // namespace detail
249 } // namespace tbb
250 
251 #endif // __TBB_thread_data_H
252 
253