1 /* 2 Copyright (c) 2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_thread_data_H 18 #define __TBB_thread_data_H 19 20 #include "oneapi/tbb/detail/_task.h" 21 #include "oneapi/tbb/task.h" 22 23 #include "rml_base.h" // rml::job 24 25 #include "scheduler_common.h" 26 #include "arena.h" 27 #include "concurrent_monitor.h" 28 #include "mailbox.h" 29 #include "misc.h" // FastRandom 30 #include "small_object_pool_impl.h" 31 32 #include <atomic> 33 34 namespace tbb { 35 namespace detail { 36 namespace r1 { 37 38 class task; 39 class arena_slot; 40 class task_group_context; 41 class task_dispatcher; 42 43 //------------------------------------------------------------------------ 44 // Thread Data 45 //------------------------------------------------------------------------ 46 class thread_data : public ::rml::job 47 , public intrusive_list_node 48 , no_copy { 49 public: 50 thread_data(unsigned short index, bool is_worker) 51 : my_arena_index{ index } 52 , my_is_worker{ is_worker } 53 , my_task_dispatcher{ nullptr } 54 , my_arena{} 55 , my_arena_slot{} 56 , my_inbox{} 57 , my_random{ this } 58 , my_last_observer{ nullptr } 59 , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}} 60 , my_context_list_state{} 61 #if __TBB_RESUMABLE_TASKS 62 , my_post_resume_action{ post_resume_action::none } 63 , my_post_resume_arg{nullptr} 64 #endif /* __TBB_RESUMABLE_TASKS */ 65 { 66 ITT_SYNC_CREATE(&my_context_list_state.mutex, SyncType_Scheduler, SyncObj_ContextsList); 67 my_context_list_state.head.next.store(&my_context_list_state.head, std::memory_order_relaxed); 68 my_context_list_state.head.prev.store(&my_context_list_state.head, std::memory_order_relaxed); 69 } 70 71 ~thread_data() { 72 context_list_cleanup(); 73 my_small_object_pool->destroy(); 74 poison_pointer(my_task_dispatcher); 75 poison_pointer(my_arena); 76 poison_pointer(my_arena_slot); 77 poison_pointer(my_last_observer); 78 poison_pointer(my_small_object_pool); 79 #if __TBB_RESUMABLE_TASKS 80 poison_pointer(my_post_resume_arg); 81 #endif /* __TBB_RESUMABLE_TASKS */ 82 poison_value(my_context_list_state.epoch); 83 poison_value(my_context_list_state.local_update); 84 poison_value(my_context_list_state.nonlocal_update); 85 } 86 87 void attach_arena(arena& a, std::size_t index); 88 bool is_attached_to(arena*); 89 void attach_task_dispatcher(task_dispatcher&); 90 void detach_task_dispatcher(); 91 void context_list_cleanup(); 92 template <typename T> 93 void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state); 94 95 //! Index of the arena slot the scheduler occupies now, or occupied last time 96 unsigned short my_arena_index; 97 98 //! Indicates if the thread is created by RML 99 const bool my_is_worker; 100 101 //! The current task dipsatcher 102 task_dispatcher* my_task_dispatcher; 103 104 //! The arena that I own (if master) or am servicing at the moment (if worker) 105 arena* my_arena; 106 107 //! Pointer to the slot in the arena we own at the moment 108 arena_slot* my_arena_slot; 109 110 //! The mailbox (affinity mechanism) the current thread attached to 111 mail_inbox my_inbox; 112 113 //! The random generator 114 FastRandom my_random; 115 116 //! Last observer in the observers list processed on this slot 117 observer_proxy* my_last_observer; 118 119 //! Pool of small object for fast task allocation 120 small_object_pool_impl* my_small_object_pool; 121 122 struct context_list_state { 123 //! Head of the thread specific list of task group contexts. 124 d1::context_list_node head{}; 125 126 //! Mutex protecting access to the list of task group contexts. 127 // TODO: check whether it can be deadly preempted and replace by spinning/sleeping mutex 128 spin_mutex mutex{}; 129 130 //! Last state propagation epoch known to this thread 131 /** Together with the_context_state_propagation_epoch constitute synchronization protocol 132 that keeps hot path of task group context construction destruction mostly 133 lock-free. 134 When local epoch equals the global one, the state of task group contexts 135 registered with this thread is consistent with that of the task group trees 136 they belong to. **/ 137 std::atomic<std::uintptr_t> epoch{}; 138 139 //! Flag indicating that a context is being destructed by its owner thread 140 /** Together with my_nonlocal_ctx_list_update constitute synchronization protocol 141 that keeps hot path of context destruction (by the owner thread) mostly 142 lock-free. **/ 143 std::atomic<std::uintptr_t> local_update{}; 144 145 //! Flag indicating that a context is being destructed by non-owner thread. 146 /** See also my_local_update. **/ 147 std::atomic<std::uintptr_t> nonlocal_update{}; 148 } my_context_list_state; 149 150 #if __TBB_RESUMABLE_TASKS 151 //! The list of possible post resume actions. 152 enum class post_resume_action { 153 invalid, 154 register_waiter, 155 callback, 156 cleanup, 157 notify, 158 none 159 }; 160 161 //! The callback to call the user callback passed to tbb::suspend. 162 struct suspend_callback_wrapper { 163 suspend_callback_type suspend_callback; 164 void* user_callback; 165 suspend_point_type* tag; 166 167 void operator()() { 168 __TBB_ASSERT(suspend_callback && user_callback && tag, nullptr); 169 suspend_callback(user_callback, tag); 170 } 171 }; 172 173 struct register_waiter_data { 174 d1::wait_context* wo; 175 concurrent_monitor::resume_context& node; 176 }; 177 178 //! Suspends the current coroutine (task_dispatcher). 179 void suspend(void* suspend_callback, void* user_callback); 180 181 //! Resumes the target task_dispatcher. 182 void resume(task_dispatcher& target); 183 184 //! Set post resume action to perform after resume. 185 void set_post_resume_action(post_resume_action pra, void* arg) { 186 __TBB_ASSERT(my_post_resume_action == post_resume_action::none, "The Post resume action must not be set"); 187 __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument"); 188 my_post_resume_action = pra; 189 my_post_resume_arg = arg; 190 } 191 192 //! Performs post resume action. 193 void do_post_resume_action(); 194 195 //! The post resume action requested after the swap contexts. 196 post_resume_action my_post_resume_action; 197 198 //! The post resume action argument. 199 void* my_post_resume_arg; 200 #endif /* __TBB_RESUMABLE_TASKS */ 201 202 //! The default context 203 // TODO: consider using common default context because it is used only to simplify 204 // cancellation check. 205 d1::task_group_context my_default_context; 206 }; 207 208 inline void thread_data::attach_arena(arena& a, std::size_t index) { 209 my_arena = &a; 210 my_arena_index = static_cast<unsigned short>(index); 211 my_arena_slot = a.my_slots + index; 212 // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe) 213 my_inbox.attach(my_arena->mailbox(index)); 214 } 215 216 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; } 217 218 inline void thread_data::context_list_cleanup() { 219 // Detach contexts remaining in the local list. 220 { 221 spin_mutex::scoped_lock lock(my_context_list_state.mutex); 222 d1::context_list_node* node = my_context_list_state.head.next.load(std::memory_order_relaxed); 223 while (node != &my_context_list_state.head) { 224 using state_t = d1::task_group_context::lifetime_state; 225 226 d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, node); 227 std::atomic<state_t>& state = ctx.my_lifetime_state; 228 229 node = node->next.load(std::memory_order_relaxed); 230 231 __TBB_ASSERT(ctx.my_owner == this, "The context should belong to the current thread."); 232 state_t expected = state_t::bound; 233 if ( 234 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910 235 !((std::atomic<typename std::underlying_type<state_t>::type>&)state).compare_exchange_strong( 236 (typename std::underlying_type<state_t>::type&)expected, 237 (typename std::underlying_type<state_t>::type)state_t::detached) 238 #else 239 !state.compare_exchange_strong(expected, state_t::detached) 240 #endif 241 ) { 242 __TBB_ASSERT(expected == state_t::locked || expected == state_t::dying, nullptr); 243 spin_wait_until_eq(state, state_t::dying); 244 } else { 245 __TBB_ASSERT(expected == state_t::bound, nullptr); 246 ctx.my_owner = NULL; 247 } 248 } 249 } 250 spin_wait_until_eq(my_context_list_state.nonlocal_update, 0u); 251 } 252 253 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) { 254 __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr); 255 __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr); 256 task_disp.m_thread_data = this; 257 my_task_dispatcher = &task_disp; 258 } 259 260 inline void thread_data::detach_task_dispatcher() { 261 __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr); 262 __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr); 263 my_task_dispatcher->m_thread_data = nullptr; 264 my_task_dispatcher = nullptr; 265 } 266 267 } // namespace r1 268 } // namespace detail 269 } // namespace tbb 270 271 #endif // __TBB_thread_data_H 272 273