1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_thread_data_H 18 #define __TBB_thread_data_H 19 20 #include "oneapi/tbb/detail/_task.h" 21 #include "oneapi/tbb/task.h" 22 23 #include "rml_base.h" // rml::job 24 25 #include "scheduler_common.h" 26 #include "arena.h" 27 #include "concurrent_monitor.h" 28 #include "mailbox.h" 29 #include "misc.h" // FastRandom 30 #include "small_object_pool_impl.h" 31 32 #include <atomic> 33 34 namespace tbb { 35 namespace detail { 36 namespace r1 { 37 38 class task; 39 class arena_slot; 40 class task_group_context; 41 class task_dispatcher; 42 43 //------------------------------------------------------------------------ 44 // Thread Data 45 //------------------------------------------------------------------------ 46 class thread_data : public ::rml::job 47 , public intrusive_list_node 48 , no_copy { 49 public: 50 thread_data(unsigned short index, bool is_worker) 51 : my_arena_index{ index } 52 , my_is_worker{ is_worker } 53 , my_task_dispatcher{ nullptr } 54 , my_arena{} 55 , my_arena_slot{} 56 , my_inbox{} 57 , my_random{ this } 58 , my_last_observer{ nullptr } 59 , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}} 60 , my_context_list_state{} 61 #if __TBB_RESUMABLE_TASKS 62 , my_post_resume_action{ post_resume_action::none } 63 , my_post_resume_arg{nullptr} 64 #endif /* __TBB_RESUMABLE_TASKS */ 65 { 66 ITT_SYNC_CREATE(&my_context_list_state.mutex, SyncType_Scheduler, SyncObj_ContextsList); 67 my_context_list_state.head.next.store(&my_context_list_state.head, std::memory_order_relaxed); 68 my_context_list_state.head.prev.store(&my_context_list_state.head, std::memory_order_relaxed); 69 } 70 71 ~thread_data() { 72 context_list_cleanup(); 73 my_small_object_pool->destroy(); 74 poison_pointer(my_task_dispatcher); 75 poison_pointer(my_arena); 76 poison_pointer(my_arena_slot); 77 poison_pointer(my_last_observer); 78 poison_pointer(my_small_object_pool); 79 #if __TBB_RESUMABLE_TASKS 80 poison_pointer(my_post_resume_arg); 81 #endif /* __TBB_RESUMABLE_TASKS */ 82 poison_value(my_context_list_state.epoch); 83 poison_value(my_context_list_state.local_update); 84 poison_value(my_context_list_state.nonlocal_update); 85 } 86 87 void attach_arena(arena& a, std::size_t index); 88 bool is_attached_to(arena*); 89 void attach_task_dispatcher(task_dispatcher&); 90 void detach_task_dispatcher(); 91 void context_list_cleanup(); 92 template <typename T> 93 void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state); 94 95 //! Index of the arena slot the scheduler occupies now, or occupied last time 96 unsigned short my_arena_index; 97 98 //! Indicates if the thread is created by RML 99 const bool my_is_worker; 100 101 //! The current task dipsatcher 102 task_dispatcher* my_task_dispatcher; 103 104 //! The arena that I own (if external thread) or am servicing at the moment (if worker) 105 arena* my_arena; 106 107 //! Pointer to the slot in the arena we own at the moment 108 arena_slot* my_arena_slot; 109 110 //! The mailbox (affinity mechanism) the current thread attached to 111 mail_inbox my_inbox; 112 113 //! The random generator 114 FastRandom my_random; 115 116 //! Last observer in the observers list processed on this slot 117 observer_proxy* my_last_observer; 118 119 //! Pool of small object for fast task allocation 120 small_object_pool_impl* my_small_object_pool; 121 122 struct context_list_state { 123 //! Head of the thread specific list of task group contexts. 124 d1::context_list_node head{}; 125 126 //! Mutex protecting access to the list of task group contexts. 127 // TODO: check whether it can be deadly preempted and replace by spinning/sleeping mutex 128 spin_mutex mutex{}; 129 130 //! Last state propagation epoch known to this thread 131 /** Together with the_context_state_propagation_epoch constitute synchronization protocol 132 that keeps hot path of task group context construction destruction mostly 133 lock-free. 134 When local epoch equals the global one, the state of task group contexts 135 registered with this thread is consistent with that of the task group trees 136 they belong to. **/ 137 std::atomic<std::uintptr_t> epoch{}; 138 139 //! Flag indicating that a context is being destructed by its owner thread 140 /** Together with my_nonlocal_ctx_list_update constitute synchronization protocol 141 that keeps hot path of context destruction (by the owner thread) mostly 142 lock-free. **/ 143 std::atomic<std::uintptr_t> local_update{}; 144 145 //! Flag indicating that a context is being destructed by non-owner thread. 146 /** See also my_local_update. **/ 147 std::atomic<std::uintptr_t> nonlocal_update{}; 148 } my_context_list_state; 149 150 #if __TBB_RESUMABLE_TASKS 151 //! The list of possible post resume actions. 152 enum class post_resume_action { 153 invalid, 154 register_waiter, 155 resume, 156 callback, 157 cleanup, 158 notify, 159 none 160 }; 161 162 //! The callback to call the user callback passed to tbb::suspend. 163 struct suspend_callback_wrapper { 164 suspend_callback_type suspend_callback; 165 void* user_callback; 166 suspend_point_type* tag; 167 168 void operator()() { 169 __TBB_ASSERT(suspend_callback && user_callback && tag, nullptr); 170 suspend_callback(user_callback, tag); 171 } 172 }; 173 174 //! Suspends the current coroutine (task_dispatcher). 175 void suspend(void* suspend_callback, void* user_callback); 176 177 //! Resumes the target task_dispatcher. 178 void resume(task_dispatcher& target); 179 180 //! Set post resume action to perform after resume. 181 void set_post_resume_action(post_resume_action pra, void* arg) { 182 __TBB_ASSERT(my_post_resume_action == post_resume_action::none, "The Post resume action must not be set"); 183 __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument"); 184 my_post_resume_action = pra; 185 my_post_resume_arg = arg; 186 } 187 188 void clear_post_resume_action() { 189 my_post_resume_action = thread_data::post_resume_action::none; 190 my_post_resume_arg = nullptr; 191 } 192 193 //! Performs post resume action. 194 void do_post_resume_action(); 195 196 //! The post resume action requested after the swap contexts. 197 post_resume_action my_post_resume_action; 198 199 //! The post resume action argument. 200 void* my_post_resume_arg; 201 #endif /* __TBB_RESUMABLE_TASKS */ 202 203 //! The default context 204 // TODO: consider using common default context because it is used only to simplify 205 // cancellation check. 206 d1::task_group_context my_default_context; 207 }; 208 209 inline void thread_data::attach_arena(arena& a, std::size_t index) { 210 my_arena = &a; 211 my_arena_index = static_cast<unsigned short>(index); 212 my_arena_slot = a.my_slots + index; 213 // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe) 214 my_inbox.attach(my_arena->mailbox(index)); 215 } 216 217 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; } 218 219 inline void thread_data::context_list_cleanup() { 220 // Detach contexts remaining in the local list. 221 { 222 spin_mutex::scoped_lock lock(my_context_list_state.mutex); 223 d1::context_list_node* node = my_context_list_state.head.next.load(std::memory_order_relaxed); 224 while (node != &my_context_list_state.head) { 225 using state_t = d1::task_group_context::lifetime_state; 226 227 d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, node); 228 std::atomic<state_t>& state = ctx.my_lifetime_state; 229 230 node = node->next.load(std::memory_order_relaxed); 231 232 __TBB_ASSERT(ctx.my_owner == this, "The context should belong to the current thread."); 233 state_t expected = state_t::bound; 234 if ( 235 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910 236 !((std::atomic<typename std::underlying_type<state_t>::type>&)state).compare_exchange_strong( 237 (typename std::underlying_type<state_t>::type&)expected, 238 (typename std::underlying_type<state_t>::type)state_t::detached) 239 #else 240 !state.compare_exchange_strong(expected, state_t::detached) 241 #endif 242 ) { 243 __TBB_ASSERT(expected == state_t::locked || expected == state_t::dying, nullptr); 244 spin_wait_until_eq(state, state_t::dying); 245 } else { 246 __TBB_ASSERT(expected == state_t::bound, nullptr); 247 ctx.my_owner.store(nullptr, std::memory_order_release); 248 } 249 } 250 } 251 spin_wait_until_eq(my_context_list_state.nonlocal_update, 0u); 252 } 253 254 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) { 255 __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr); 256 __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr); 257 task_disp.m_thread_data = this; 258 my_task_dispatcher = &task_disp; 259 } 260 261 inline void thread_data::detach_task_dispatcher() { 262 __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr); 263 __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr); 264 my_task_dispatcher->m_thread_data = nullptr; 265 my_task_dispatcher = nullptr; 266 } 267 268 } // namespace r1 269 } // namespace detail 270 } // namespace tbb 271 272 #endif // __TBB_thread_data_H 273 274