xref: /oneTBB/src/tbb/thread_data.h (revision b95bbc9c)
1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_thread_data_H
18 #define __TBB_thread_data_H
19 
20 #include "oneapi/tbb/detail/_task.h"
21 #include "oneapi/tbb/task.h"
22 
23 #include "rml_base.h" // rml::job
24 
25 #include "scheduler_common.h"
26 #include "arena.h"
27 #include "concurrent_monitor.h"
28 #include "mailbox.h"
29 #include "misc.h" // FastRandom
30 #include "small_object_pool_impl.h"
31 
32 #include <atomic>
33 
34 namespace tbb {
35 namespace detail {
36 namespace r1 {
37 
38 class task;
39 class arena_slot;
40 class task_group_context;
41 class task_dispatcher;
42 
43 class context_list : public intrusive_list<intrusive_list_node> {
44 public:
45     bool orphaned{false};
46 
47     //! Last state propagation epoch known to this thread
48     /** Together with the_context_state_propagation_epoch constitute synchronization protocol
49     that keeps hot path of task group context construction destruction mostly
50     lock-free.
51     When local epoch equals the global one, the state of task group contexts
52     registered with this thread is consistent with that of the task group trees
53     they belong to. **/
54     std::atomic<std::uintptr_t> epoch{};
55 
56     //! Mutex protecting access to the list of task group contexts.
57     d1::mutex m_mutex{};
58 
59     void destroy() {
60         this->~context_list();
61         cache_aligned_deallocate(this);
62     }
63 
64     void remove(intrusive_list_node& val) {
65         mutex::scoped_lock lock(m_mutex);
66 
67         intrusive_list<intrusive_list_node>::remove(val);
68 
69         if (orphaned && empty()) {
70             lock.release();
71             destroy();
72         }
73     }
74 
75     void push_front(intrusive_list_node& val) {
76         mutex::scoped_lock lock(m_mutex);
77 
78         intrusive_list<intrusive_list_node>::push_front(val);
79     }
80 
81     void orphan() {
82         mutex::scoped_lock lock(m_mutex);
83 
84         orphaned = true;
85         if (empty()) {
86             lock.release();
87             destroy();
88         }
89     }
90 };
91 
92 //------------------------------------------------------------------------
93 // Thread Data
94 //------------------------------------------------------------------------
95 class thread_data : public ::rml::job
96                   , public intrusive_list_node
97                   , no_copy {
98 public:
99     thread_data(unsigned short index, bool is_worker)
100         : my_arena_index{ index }
101         , my_is_worker{ is_worker }
102         , my_task_dispatcher{ nullptr }
103         , my_arena{}
104         , my_arena_slot{}
105         , my_random{ this }
106         , my_last_observer{ nullptr }
107         , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}}
108         , my_context_list(new (cache_aligned_allocate(sizeof(context_list))) context_list{})
109 #if __TBB_RESUMABLE_TASKS
110         , my_post_resume_action{ post_resume_action::none }
111         , my_post_resume_arg{nullptr}
112 #endif /* __TBB_RESUMABLE_TASKS */
113     {
114         ITT_SYNC_CREATE(&my_context_list->m_mutex, SyncType_Scheduler, SyncObj_ContextsList);
115     }
116 
117     ~thread_data() {
118         my_context_list->orphan();
119         my_small_object_pool->destroy();
120         poison_pointer(my_task_dispatcher);
121         poison_pointer(my_arena);
122         poison_pointer(my_arena_slot);
123         poison_pointer(my_last_observer);
124         poison_pointer(my_small_object_pool);
125         poison_pointer(my_context_list);
126 #if __TBB_RESUMABLE_TASKS
127         poison_pointer(my_post_resume_arg);
128 #endif /* __TBB_RESUMABLE_TASKS */
129     }
130 
131     void attach_arena(arena& a, std::size_t index);
132     bool is_attached_to(arena*);
133     void attach_task_dispatcher(task_dispatcher&);
134     void detach_task_dispatcher();
135     void context_list_cleanup();
136     template <typename T>
137     void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state);
138 
139     //! Index of the arena slot the scheduler occupies now, or occupied last time
140     unsigned short my_arena_index;
141 
142     //! Indicates if the thread is created by RML
143     const bool my_is_worker;
144 
145     //! The current task dipsatcher
146     task_dispatcher* my_task_dispatcher;
147 
148     //! The arena that I own (if external thread) or am servicing at the moment (if worker)
149     arena* my_arena;
150 
151     //! Pointer to the slot in the arena we own at the moment
152     arena_slot* my_arena_slot;
153 
154     //! The mailbox (affinity mechanism) the current thread attached to
155     mail_inbox my_inbox;
156 
157     //! The random generator
158     FastRandom my_random;
159 
160     //! Last observer in the observers list processed on this slot
161     observer_proxy* my_last_observer;
162 
163     //! Pool of small object for fast task allocation
164     small_object_pool_impl* my_small_object_pool;
165 
166     context_list* my_context_list;
167 #if __TBB_RESUMABLE_TASKS
168     //! The list of possible post resume actions.
169     enum class post_resume_action {
170         invalid,
171         register_waiter,
172         resume,
173         callback,
174         cleanup,
175         notify,
176         none
177     };
178 
179     //! The callback to call the user callback passed to tbb::suspend.
180     struct suspend_callback_wrapper {
181         suspend_callback_type suspend_callback;
182         void* user_callback;
183         suspend_point_type* tag;
184 
185         void operator()() {
186             __TBB_ASSERT(suspend_callback && user_callback && tag, nullptr);
187             suspend_callback(user_callback, tag);
188         }
189     };
190 
191     //! Suspends the current coroutine (task_dispatcher).
192     void suspend(void* suspend_callback, void* user_callback);
193 
194     //! Resumes the target task_dispatcher.
195     void resume(task_dispatcher& target);
196 
197     //! Set post resume action to perform after resume.
198     void set_post_resume_action(post_resume_action pra, void* arg) {
199         __TBB_ASSERT(my_post_resume_action == post_resume_action::none, "The Post resume action must not be set");
200         __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument");
201         my_post_resume_action = pra;
202         my_post_resume_arg = arg;
203     }
204 
205     void clear_post_resume_action() {
206         my_post_resume_action = thread_data::post_resume_action::none;
207         my_post_resume_arg = nullptr;
208     }
209 
210     //! Performs post resume action.
211     void do_post_resume_action();
212 
213     //! The post resume action requested after the swap contexts.
214     post_resume_action my_post_resume_action;
215 
216     //! The post resume action argument.
217     void* my_post_resume_arg;
218 #endif /* __TBB_RESUMABLE_TASKS */
219 
220     //! The default context
221     // TODO: consider using common default context because it is used only to simplify
222     // cancellation check.
223     d1::task_group_context my_default_context;
224 };
225 
226 inline void thread_data::attach_arena(arena& a, std::size_t index) {
227     my_arena = &a;
228     my_arena_index = static_cast<unsigned short>(index);
229     my_arena_slot = a.my_slots + index;
230     // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
231     my_inbox.attach(my_arena->mailbox(index));
232 }
233 
234 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; }
235 
236 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) {
237     __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr);
238     __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
239     task_disp.m_thread_data = this;
240     my_task_dispatcher = &task_disp;
241 }
242 
243 inline void thread_data::detach_task_dispatcher() {
244     __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr);
245     __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr);
246     my_task_dispatcher->m_thread_data = nullptr;
247     my_task_dispatcher = nullptr;
248 }
249 
250 } // namespace r1
251 } // namespace detail
252 } // namespace tbb
253 
254 #endif // __TBB_thread_data_H
255 
256