xref: /oneTBB/src/tbb/thread_data.h (revision 49e08aac)
1 /*
2     Copyright (c) 2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_thread_data_H
18 #define __TBB_thread_data_H
19 
20 #include "oneapi/tbb/detail/_task.h"
21 #include "oneapi/tbb/task.h"
22 
23 #include "rml_base.h" // rml::job
24 
25 #include "scheduler_common.h"
26 #include "arena.h"
27 #include "concurrent_monitor.h"
28 #include "mailbox.h"
29 #include "misc.h" // FastRandom
30 #include "small_object_pool_impl.h"
31 
32 #include <atomic>
33 
34 namespace tbb {
35 namespace detail {
36 namespace r1 {
37 
38 class task;
39 class arena_slot;
40 class task_group_context;
41 class task_dispatcher;
42 
43 //------------------------------------------------------------------------
44 // Thread Data
45 //------------------------------------------------------------------------
46 class thread_data : public ::rml::job
47                   , public intrusive_list_node
48                   , no_copy {
49 public:
50     thread_data(unsigned short index, bool is_worker)
51         : my_arena_index{ index }
52         , my_is_worker{ is_worker }
53         , my_task_dispatcher{ nullptr }
54         , my_arena{}
55         , my_arena_slot{}
56         , my_inbox{}
57         , my_random{ this }
58         , my_last_observer{ nullptr }
59         , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}}
60         , my_context_list_state{}
61 #if __TBB_RESUMABLE_TASKS
62         , my_post_resume_action{ post_resume_action::none }
63         , my_post_resume_arg{nullptr}
64 #endif /* __TBB_RESUMABLE_TASKS */
65     {
66         ITT_SYNC_CREATE(&my_context_list_state.mutex, SyncType_Scheduler, SyncObj_ContextsList);
67         my_context_list_state.head.next.store(&my_context_list_state.head, std::memory_order_relaxed);
68         my_context_list_state.head.prev.store(&my_context_list_state.head, std::memory_order_relaxed);
69     }
70 
71     ~thread_data() {
72         context_list_cleanup();
73         my_small_object_pool->destroy();
74         poison_pointer(my_task_dispatcher);
75         poison_pointer(my_arena);
76         poison_pointer(my_arena_slot);
77         poison_pointer(my_last_observer);
78         poison_pointer(my_small_object_pool);
79 #if __TBB_RESUMABLE_TASKS
80         poison_pointer(my_post_resume_arg);
81 #endif /* __TBB_RESUMABLE_TASKS */
82         poison_value(my_context_list_state.epoch);
83         poison_value(my_context_list_state.local_update);
84         poison_value(my_context_list_state.nonlocal_update);
85     }
86 
87     void attach_arena(arena& a, std::size_t index);
88     bool is_attached_to(arena*);
89     void attach_task_dispatcher(task_dispatcher&);
90     void detach_task_dispatcher();
91     void context_list_cleanup();
92     template <typename T>
93     void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state);
94 
95     //! Index of the arena slot the scheduler occupies now, or occupied last time
96     unsigned short my_arena_index;
97 
98     //! Indicates if the thread is created by RML
99     const bool my_is_worker;
100 
101     //! The current task dipsatcher
102     task_dispatcher* my_task_dispatcher;
103 
104     //! The arena that I own (if master) or am servicing at the moment (if worker)
105     arena* my_arena;
106 
107     //! Pointer to the slot in the arena we own at the moment
108     arena_slot* my_arena_slot;
109 
110     //! The mailbox (affinity mechanism) the current thread attached to
111     mail_inbox my_inbox;
112 
113     //! The random generator
114     FastRandom my_random;
115 
116     //! Last observer in the observers list processed on this slot
117     observer_proxy* my_last_observer;
118 
119     //! Pool of small object for fast task allocation
120     small_object_pool_impl* my_small_object_pool;
121 
122     struct context_list_state {
123         //! Head of the thread specific list of task group contexts.
124         d1::context_list_node head{};
125 
126         //! Mutex protecting access to the list of task group contexts.
127         // TODO: check whether it can be deadly preempted and replace by spinning/sleeping mutex
128         spin_mutex mutex{};
129 
130         //! Last state propagation epoch known to this thread
131         /** Together with the_context_state_propagation_epoch constitute synchronization protocol
132         that keeps hot path of task group context construction destruction mostly
133         lock-free.
134         When local epoch equals the global one, the state of task group contexts
135         registered with this thread is consistent with that of the task group trees
136         they belong to. **/
137         std::atomic<std::uintptr_t> epoch{};
138 
139         //! Flag indicating that a context is being destructed by its owner thread
140         /** Together with my_nonlocal_ctx_list_update constitute synchronization protocol
141         that keeps hot path of context destruction (by the owner thread) mostly
142         lock-free. **/
143         std::atomic<std::uintptr_t> local_update{};
144 
145         //! Flag indicating that a context is being destructed by non-owner thread.
146         /** See also my_local_update. **/
147         std::atomic<std::uintptr_t> nonlocal_update{};
148     } my_context_list_state;
149 
150 #if __TBB_RESUMABLE_TASKS
151     //! The list of possible post resume actions.
152     enum class post_resume_action {
153         invalid,
154         register_waiter,
155         callback,
156         cleanup,
157         notify,
158         none
159     };
160 
161     //! The callback to call the user callback passed to tbb::suspend.
162     struct suspend_callback_wrapper {
163         suspend_callback_type suspend_callback;
164         void* user_callback;
165         suspend_point_type* tag;
166 
167         void operator()() {
168             __TBB_ASSERT(suspend_callback && user_callback && tag, nullptr);
169             suspend_callback(user_callback, tag);
170         }
171     };
172 
173     struct register_waiter_data {
174         d1::wait_context* wo;
175         concurrent_monitor::resume_context& node;
176     };
177 
178     //! Suspends the current coroutine (task_dispatcher).
179     void suspend(void* suspend_callback, void* user_callback);
180 
181     //! Resumes the target task_dispatcher.
182     void resume(task_dispatcher& target);
183 
184     //! Set post resume action to perform after resume.
185     void set_post_resume_action(post_resume_action pra, void* arg) {
186         __TBB_ASSERT(my_post_resume_action == post_resume_action::none, "The Post resume action must not be set");
187         __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument");
188         my_post_resume_action = pra;
189         my_post_resume_arg = arg;
190     }
191 
192     //! Performs post resume action.
193     void do_post_resume_action();
194 
195     //! The post resume action requested after the swap contexts.
196     post_resume_action my_post_resume_action;
197 
198     //! The post resume action argument.
199     void* my_post_resume_arg;
200 #endif /* __TBB_RESUMABLE_TASKS */
201 
202     //! The default context
203     // TODO: consider using common default context because it is used only to simplify
204     // cancellation check.
205     d1::task_group_context my_default_context;
206 };
207 
208 inline void thread_data::attach_arena(arena& a, std::size_t index) {
209     my_arena = &a;
210     my_arena_index = static_cast<unsigned short>(index);
211     my_arena_slot = a.my_slots + index;
212     // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe)
213     my_inbox.attach(my_arena->mailbox(index));
214 }
215 
216 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; }
217 
218 inline void thread_data::context_list_cleanup() {
219     // Detach contexts remaining in the local list.
220     {
221         spin_mutex::scoped_lock lock(my_context_list_state.mutex);
222         d1::context_list_node* node = my_context_list_state.head.next.load(std::memory_order_relaxed);
223         while (node != &my_context_list_state.head) {
224             using state_t = d1::task_group_context::lifetime_state;
225 
226             d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, node);
227             std::atomic<state_t>& state = ctx.my_lifetime_state;
228 
229             node = node->next.load(std::memory_order_relaxed);
230 
231             __TBB_ASSERT(ctx.my_owner == this, "The context should belong to the current thread.");
232             state_t expected = state_t::bound;
233             if (
234 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910
235                 !((std::atomic<typename std::underlying_type<state_t>::type>&)state).compare_exchange_strong(
236                     (typename std::underlying_type<state_t>::type&)expected,
237                     (typename std::underlying_type<state_t>::type)state_t::detached)
238 #else
239                 !state.compare_exchange_strong(expected, state_t::detached)
240 #endif
241             ) {
242                 __TBB_ASSERT(expected == state_t::locked || expected == state_t::dying, nullptr);
243                 spin_wait_until_eq(state, state_t::dying);
244             } else {
245                 __TBB_ASSERT(expected == state_t::bound, nullptr);
246                 ctx.my_owner = NULL;
247             }
248         }
249     }
250     spin_wait_until_eq(my_context_list_state.nonlocal_update, 0u);
251 }
252 
253 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) {
254     __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr);
255     __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr);
256     task_disp.m_thread_data = this;
257     my_task_dispatcher = &task_disp;
258 }
259 
260 inline void thread_data::detach_task_dispatcher() {
261     __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr);
262     __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr);
263     my_task_dispatcher->m_thread_data = nullptr;
264     my_task_dispatcher = nullptr;
265 }
266 
267 } // namespace r1
268 } // namespace detail
269 } // namespace tbb
270 
271 #endif // __TBB_thread_data_H
272 
273