1 /* 2 Copyright (c) 2020-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_thread_data_H 18 #define __TBB_thread_data_H 19 20 #include "oneapi/tbb/detail/_task.h" 21 #include "oneapi/tbb/task.h" 22 23 #include "rml_base.h" // rml::job 24 25 #include "scheduler_common.h" 26 #include "arena.h" 27 #include "concurrent_monitor.h" 28 #include "mailbox.h" 29 #include "misc.h" // FastRandom 30 #include "small_object_pool_impl.h" 31 #include "intrusive_list.h" 32 33 #include <atomic> 34 35 namespace tbb { 36 namespace detail { 37 namespace r1 { 38 39 class task; 40 class arena_slot; 41 class task_group_context; 42 class task_dispatcher; 43 class thread_dispatcher_client; 44 45 class context_list : public intrusive_list<d1::intrusive_list_node> { 46 public: 47 bool orphaned{false}; 48 49 //! Last state propagation epoch known to this thread 50 /** Together with the_context_state_propagation_epoch constitute synchronization protocol 51 that keeps hot path of task group context construction destruction mostly 52 lock-free. 53 When local epoch equals the global one, the state of task group contexts 54 registered with this thread is consistent with that of the task group trees 55 they belong to. **/ 56 std::atomic<std::uintptr_t> epoch{}; 57 58 //! Mutex protecting access to the list of task group contexts. 59 d1::mutex m_mutex{}; 60 61 void destroy() { 62 this->~context_list(); 63 cache_aligned_deallocate(this); 64 } 65 66 void remove(d1::intrusive_list_node& val) { 67 mutex::scoped_lock lock(m_mutex); 68 69 intrusive_list<d1::intrusive_list_node>::remove(val); 70 71 if (orphaned && empty()) { 72 lock.release(); 73 destroy(); 74 } 75 } 76 77 void push_front(d1::intrusive_list_node& val) { 78 mutex::scoped_lock lock(m_mutex); 79 80 intrusive_list<d1::intrusive_list_node>::push_front(val); 81 } 82 83 void orphan() { 84 mutex::scoped_lock lock(m_mutex); 85 86 orphaned = true; 87 if (empty()) { 88 lock.release(); 89 destroy(); 90 } 91 } 92 }; 93 94 //------------------------------------------------------------------------ 95 // Thread Data 96 //------------------------------------------------------------------------ 97 class thread_data : public ::rml::job 98 , public d1::intrusive_list_node 99 , no_copy { 100 public: 101 thread_data(unsigned short index, bool is_worker) 102 : my_arena_index{ index } 103 , my_is_worker{ is_worker } 104 , my_task_dispatcher{ nullptr } 105 , my_arena{ nullptr } 106 , my_last_client{ nullptr } 107 , my_arena_slot{} 108 , my_random{ this } 109 , my_last_observer{ nullptr } 110 , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}} 111 , my_context_list(new (cache_aligned_allocate(sizeof(context_list))) context_list{}) 112 #if __TBB_RESUMABLE_TASKS 113 , my_post_resume_action{ task_dispatcher::post_resume_action::none } 114 , my_post_resume_arg{nullptr} 115 #endif /* __TBB_RESUMABLE_TASKS */ 116 { 117 ITT_SYNC_CREATE(&my_context_list->m_mutex, SyncType_Scheduler, SyncObj_ContextsList); 118 } 119 120 ~thread_data() { 121 my_context_list->orphan(); 122 my_small_object_pool->destroy(); 123 poison_pointer(my_task_dispatcher); 124 poison_pointer(my_arena); 125 poison_pointer(my_arena_slot); 126 poison_pointer(my_last_observer); 127 poison_pointer(my_small_object_pool); 128 poison_pointer(my_context_list); 129 #if __TBB_RESUMABLE_TASKS 130 poison_pointer(my_post_resume_arg); 131 #endif /* __TBB_RESUMABLE_TASKS */ 132 } 133 134 void attach_arena(arena& a, std::size_t index); 135 bool is_attached_to(arena*); 136 void attach_task_dispatcher(task_dispatcher&); 137 void detach_task_dispatcher(); 138 void enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold); 139 void leave_task_dispatcher(); 140 void propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, uint32_t new_state); 141 142 //! Index of the arena slot the scheduler occupies now, or occupied last time 143 unsigned short my_arena_index; 144 145 //! Indicates if the thread is created by RML 146 const bool my_is_worker; 147 148 //! The current task dipsatcher 149 task_dispatcher* my_task_dispatcher; 150 151 //! The arena that I own (if external thread) or am servicing at the moment (if worker) 152 arena* my_arena; 153 154 thread_dispatcher_client* my_last_client; 155 156 //! Pointer to the slot in the arena we own at the moment 157 arena_slot* my_arena_slot; 158 159 //! The mailbox (affinity mechanism) the current thread attached to 160 mail_inbox my_inbox; 161 162 //! The random generator 163 FastRandom my_random; 164 165 //! Last observer in the observers list processed on this slot 166 observer_proxy* my_last_observer; 167 168 //! Pool of small object for fast task allocation 169 small_object_pool_impl* my_small_object_pool; 170 171 context_list* my_context_list; 172 #if __TBB_RESUMABLE_TASKS 173 //! Suspends the current coroutine (task_dispatcher). 174 void suspend(void* suspend_callback, void* user_callback); 175 176 //! Resumes the target task_dispatcher. 177 void resume(task_dispatcher& target); 178 179 //! Set post resume action to perform after resume. 180 void set_post_resume_action(task_dispatcher::post_resume_action pra, void* arg) { 181 __TBB_ASSERT(my_post_resume_action == task_dispatcher::post_resume_action::none, "The Post resume action must not be set"); 182 __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument"); 183 my_post_resume_action = pra; 184 my_post_resume_arg = arg; 185 } 186 187 void clear_post_resume_action() { 188 my_post_resume_action = task_dispatcher::post_resume_action::none; 189 my_post_resume_arg = nullptr; 190 } 191 192 //! The post resume action requested after the swap contexts. 193 task_dispatcher::post_resume_action my_post_resume_action; 194 195 //! The post resume action argument. 196 void* my_post_resume_arg; 197 #endif /* __TBB_RESUMABLE_TASKS */ 198 199 //! The default context 200 // TODO: consider using common default context because it is used only to simplify 201 // cancellation check. 202 d1::task_group_context my_default_context; 203 }; 204 205 inline void thread_data::attach_arena(arena& a, std::size_t index) { 206 my_arena = &a; 207 my_arena_index = static_cast<unsigned short>(index); 208 my_arena_slot = a.my_slots + index; 209 // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe) 210 my_inbox.attach(my_arena->mailbox(index)); 211 } 212 213 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; } 214 215 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) { 216 __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr); 217 __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr); 218 task_disp.m_thread_data = this; 219 my_task_dispatcher = &task_disp; 220 } 221 222 inline void thread_data::detach_task_dispatcher() { 223 __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr); 224 __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr); 225 my_task_dispatcher->m_thread_data = nullptr; 226 my_task_dispatcher = nullptr; 227 } 228 229 inline void thread_data::enter_task_dispatcher(task_dispatcher& task_disp, std::uintptr_t stealing_threshold) { 230 task_disp.set_stealing_threshold(stealing_threshold); 231 attach_task_dispatcher(task_disp); 232 } 233 234 inline void thread_data::leave_task_dispatcher() { 235 my_task_dispatcher->set_stealing_threshold(0); 236 detach_task_dispatcher(); 237 } 238 239 inline void thread_data::propagate_task_group_state(std::atomic<std::uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, std::uint32_t new_state) { 240 mutex::scoped_lock lock(my_context_list->m_mutex); 241 // Acquire fence is necessary to ensure that the subsequent node->my_next load 242 // returned the correct value in case it was just inserted in another thread. 243 // The fence also ensures visibility of the correct ctx.my_parent value. 244 for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) { 245 d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it)); 246 if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state) 247 task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state); 248 } 249 // Sync up local propagation epoch with the global one. Release fence prevents 250 // reordering of possible store to *mptr_state after the sync point. 251 my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release); 252 } 253 254 } // namespace r1 255 } // namespace detail 256 } // namespace tbb 257 258 #endif // __TBB_thread_data_H 259 260