xref: /oneTBB/src/tbb/thread_data.h (revision 6caecf96)
1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_thread_data_H
18 #define __TBB_thread_data_H
19 
20 #include "oneapi/tbb/detail/_task.h"
21 #include "oneapi/tbb/task.h"
22 
23 #include "rml_base.h" // rml::job
24 
25 #include "scheduler_common.h"
26 #include "arena.h"
27 #include "concurrent_monitor.h"
28 #include "mailbox.h"
29 #include "misc.h" // FastRandom
30 #include "small_object_pool_impl.h"
31 
32 #include <atomic>
33 
34 namespace tbb {
35 namespace detail {
36 namespace r1 {
37 
38 class task;
39 class arena_slot;
40 class task_group_context;
41 class task_dispatcher;
42 
43 //------------------------------------------------------------------------
44 // Thread Data
45 //------------------------------------------------------------------------
46 class thread_data : public ::rml::job
47                   , public intrusive_list_node
48                   , no_copy {
49 public:
50     thread_data(unsigned short index, bool is_worker)
51         : my_arena_index{ index }
52         , my_is_worker{ is_worker }
53         , my_task_dispatcher{ nullptr }
54         , my_arena{}
55         , my_arena_slot{}
56         , my_inbox{}
57         , my_random{ this }
58         , my_last_observer{ nullptr }
59         , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}}
60         , my_context_list_state{}
61 #if __TBB_RESUMABLE_TASKS
62         , my_post_resume_action{ post_resume_action::none }
63         , my_post_resume_arg{nullptr}
64 #endif /* __TBB_RESUMABLE_TASKS */
65     {
66         ITT_SYNC_CREATE(&my_context_list_state.mutex, SyncType_Scheduler, SyncObj_ContextsList);
67         my_context_list_state.head.next.store(&my_context_list_state.head, std::memory_order_relaxed);
68         my_context_list_state.head.prev.store(&my_context_list_state.head, std::memory_order_relaxed);
69     }
70 
71     ~thread_data() {
72         context_list_cleanup();
73         my_small_object_pool->destroy();
74         poison_pointer(my_task_dispatcher);
75         poison_pointer(my_arena);
76         poison_pointer(my_arena_slot);
77         poison_pointer(my_last_observer);
78         poison_pointer(my_small_object_pool);
79 #if __TBB_RESUMABLE_TASKS
80         poison_pointer(my_post_resume_arg);
81 #endif /* __TBB_RESUMABLE_TASKS */
82         poison_value(my_context_list_state.epoch);
83         poison_value(my_context_list_state.local_update);
84         poison_value(my_context_list_state.nonlocal_update);
85     }
86 
87     void attach_arena(arena& a, std::size_t index);
88     bool is_attached_to(arena*);
89     void attach_task_dispatcher(task_dispatcher&);
90     void detach_task_dispatcher();
91     void context_list_cleanup();
92     template <typename T>
93     void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state);
94 
95     //! Index of the arena slot the scheduler occupies now, or occupied last time
96     unsigned short my_arena_index;
97 
98     //! Indicates if the thread is created by RML
99     const bool my_is_worker;
100 
101     //! The current task dipsatcher
102     task_dispatcher* my_task_dispatcher;
103 
104     //! The arena that I own (if external thread) or am servicing at the moment (if worker)
105     arena* my_arena;
106 
107     //! Pointer to the slot in the arena we own at the moment
108     arena_slot* my_arena_slot;
109 
110     //! The mailbox (affinity mechanism) the current thread attached to
111     mail_inbox my_inbox;
112 
113     //! The random generator
114     FastRandom my_random;
115 
116     //! Last observer in the observers list processed on this slot
117     observer_proxy* my_last_observer;
118 
119     //! Pool of small object for fast task allocation
120     small_object_pool_impl* my_small_object_pool;
121 
122     struct context_list_state {
123         //! Head of the thread specific list of task group contexts.
124         d1::context_list_node head{};
125 
126         //! Mutex protecting access to the list of task group contexts.
127         // TODO: check whether it can be deadly preempted and replace by spinning/sleeping mutex
128         spin_mutex mutex{};
129 
130         //! Last state propagation epoch known to this thread
131         /** Together with the_context_state_propagation_epoch constitute synchronization protocol
132         that keeps hot path of task group context construction destruction mostly
133         lock-free.
134         When local epoch equals the global one, the state of task group contexts
135         registered with this thread is consistent with that of the task group trees
136         they belong to. **/
137         std::atomic<std::uintptr_t> epoch{};
138 
139         //! Flag indicating that a context is being destructed by its owner thread
140         /** Together with my_nonlocal_ctx_list_update constitute synchronization protocol
141         that keeps hot path of context destruction (by the owner thread) mostly
142         lock-free. **/
143         std::atomic<std::uintptr_t> local_update{};
144 
145         //! Flag indicating that a context is being destructed by non-owner thread.
146         /** See also my_local_update. **/
147         std::atomic<std::uintptr_t> nonlocal_update{};
148     } my_context_list_state;
149 
150 #if __TBB_RESUMABLE_TASKS
151     //! The list of possible post resume actions.
152     enum class post_resume_action {
153         invalid,
154         register_waiter,
155         resume,
156         callback,
157         cleanup,
158         notify,
159         none
160     };
161 
162     //! The callback to call the user callback passed to tbb::suspend.
163     struct suspend_callback_wrapper {
164         suspend_callback_type suspend_callback;
165         void* user_callback;
166         suspend_point_type* tag;
167 
168         void operator()() {
169             __TBB_ASSERT(suspend_callback && user_callback && tag, nullptr);
170             suspend_callback(user_callback, tag);
171         }
172     };
173 
174     //! Suspends the current coroutine (task_dispatcher).
175     void suspend(void* suspend_callback, void* user_callback);
176 
177     //! Resumes the target task_dispatcher.
178     void resume(task_dispatcher& target);
179 
180     //! Set post resume action to perform after resume.
181     void set_post_resume_action(post_resume_action pra, void* arg) {
182         __TBB_ASSERT(my_post_resume_action == post_resume_action::none, "The Post resume action must not be set");
183         __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument");
184         my_post_resume_action = pra;
185         my_post_resume_arg = arg;
186     }
187 
188     void clear_post_resume_action() {
189         my_post_resume_action = thread_data::post_resume_action::none;
190         my_post_resume_arg = nullptr;
191     }
192 
193     //! Performs post resume action.
194     void do_post_resume_action();
195 
196     //! The post resume action requested after the swap contexts.
197     post_resume_action my_post_resume_action;
198 
199     //! The post resume action argument.
200     void* my_post_resume_arg;
201 #endif /* __TBB_RESUMABLE_TASKS */
202 
203     //! The default context
204     // TODO: consider using common default context because it is used only to simplify
205     // cancellation check.
206     d1::task_group_context my_default_context;
207 };
208 
209 inline void thread_data::attach_arena(arena& a, std::size_t index) {
210     my_arena = &a;
211     my_arena_index = static_cast<unsigned short>(index);
212     my_arena_slot = a.my_slots + index;
213     // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
214     my_inbox.attach(my_arena->mailbox(index));
215 }
216 
217 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; }
218 
219 inline void thread_data::context_list_cleanup() {
220     // Detach contexts remaining in the local list.
221     {
222         spin_mutex::scoped_lock lock(my_context_list_state.mutex);
223         d1::context_list_node* node = my_context_list_state.head.next.load(std::memory_order_relaxed);
224         while (node != &my_context_list_state.head) {
225             using state_t = d1::task_group_context::lifetime_state;
226 
227             d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, node);
228             std::atomic<state_t>& state = ctx.my_lifetime_state;
229 
230             node = node->next.load(std::memory_order_relaxed);
231 
232             __TBB_ASSERT(ctx.my_owner == this, "The context should belong to the current thread.");
233             state_t expected = state_t::bound;
234             if (
235 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910
236                 !((std::atomic<typename std::underlying_type<state_t>::type>&)state).compare_exchange_strong(
237                     (typename std::underlying_type<state_t>::type&)expected,
238                     (typename std::underlying_type<state_t>::type)state_t::detached)
239 #else
240                 !state.compare_exchange_strong(expected, state_t::detached)
241 #endif
242             ) {
243                 __TBB_ASSERT(expected == state_t::locked || expected == state_t::dying, nullptr);
244                 spin_wait_until_eq(state, state_t::dying);
245             } else {
246                 __TBB_ASSERT(expected == state_t::bound, nullptr);
247                 ctx.my_owner.store(nullptr, std::memory_order_release);
248             }
249         }
250     }
251     spin_wait_until_eq(my_context_list_state.nonlocal_update, 0u);
252 }
253 
254 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) {
255     __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr);
256     __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
257     task_disp.m_thread_data = this;
258     my_task_dispatcher = &task_disp;
259 }
260 
261 inline void thread_data::detach_task_dispatcher() {
262     __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr);
263     __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr);
264     my_task_dispatcher->m_thread_data = nullptr;
265     my_task_dispatcher = nullptr;
266 }
267 
268 } // namespace r1
269 } // namespace detail
270 } // namespace tbb
271 
272 #endif // __TBB_thread_data_H
273 
274