1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_thread_data_H 18 #define __TBB_thread_data_H 19 20 #include "oneapi/tbb/detail/_task.h" 21 #include "oneapi/tbb/task.h" 22 23 #include "rml_base.h" // rml::job 24 25 #include "scheduler_common.h" 26 #include "arena.h" 27 #include "concurrent_monitor.h" 28 #include "mailbox.h" 29 #include "misc.h" // FastRandom 30 #include "small_object_pool_impl.h" 31 32 #include <atomic> 33 34 namespace tbb { 35 namespace detail { 36 namespace r1 { 37 38 class task; 39 class arena_slot; 40 class task_group_context; 41 class task_dispatcher; 42 43 class context_list : public intrusive_list<intrusive_list_node> { 44 public: 45 bool orphaned{false}; 46 47 //! Last state propagation epoch known to this thread 48 /** Together with the_context_state_propagation_epoch constitute synchronization protocol 49 that keeps hot path of task group context construction destruction mostly 50 lock-free. 51 When local epoch equals the global one, the state of task group contexts 52 registered with this thread is consistent with that of the task group trees 53 they belong to. **/ 54 std::atomic<std::uintptr_t> epoch{}; 55 56 //! Mutex protecting access to the list of task group contexts. 57 d1::mutex m_mutex{}; 58 59 void destroy() { 60 this->~context_list(); 61 cache_aligned_deallocate(this); 62 } 63 64 void remove(intrusive_list_node& val) { 65 mutex::scoped_lock lock(m_mutex); 66 67 intrusive_list<intrusive_list_node>::remove(val); 68 69 if (orphaned && empty()) { 70 lock.release(); 71 destroy(); 72 } 73 } 74 75 void push_front(intrusive_list_node& val) { 76 mutex::scoped_lock lock(m_mutex); 77 78 intrusive_list<intrusive_list_node>::push_front(val); 79 } 80 81 void orphan() { 82 mutex::scoped_lock lock(m_mutex); 83 84 orphaned = true; 85 if (empty()) { 86 lock.release(); 87 destroy(); 88 } 89 } 90 }; 91 92 //------------------------------------------------------------------------ 93 // Thread Data 94 //------------------------------------------------------------------------ 95 class thread_data : public ::rml::job 96 , public intrusive_list_node 97 , no_copy { 98 public: 99 thread_data(unsigned short index, bool is_worker) 100 : my_arena_index{ index } 101 , my_is_worker{ is_worker } 102 , my_task_dispatcher{ nullptr } 103 , my_arena{} 104 , my_arena_slot{} 105 , my_random{ this } 106 , my_last_observer{ nullptr } 107 , my_small_object_pool{new (cache_aligned_allocate(sizeof(small_object_pool_impl))) small_object_pool_impl{}} 108 , my_context_list(new (cache_aligned_allocate(sizeof(context_list))) context_list{}) 109 #if __TBB_RESUMABLE_TASKS 110 , my_post_resume_action{ post_resume_action::none } 111 , my_post_resume_arg{nullptr} 112 #endif /* __TBB_RESUMABLE_TASKS */ 113 { 114 ITT_SYNC_CREATE(&my_context_list->m_mutex, SyncType_Scheduler, SyncObj_ContextsList); 115 } 116 117 ~thread_data() { 118 my_context_list->orphan(); 119 my_small_object_pool->destroy(); 120 poison_pointer(my_task_dispatcher); 121 poison_pointer(my_arena); 122 poison_pointer(my_arena_slot); 123 poison_pointer(my_last_observer); 124 poison_pointer(my_small_object_pool); 125 poison_pointer(my_context_list); 126 #if __TBB_RESUMABLE_TASKS 127 poison_pointer(my_post_resume_arg); 128 #endif /* __TBB_RESUMABLE_TASKS */ 129 } 130 131 void attach_arena(arena& a, std::size_t index); 132 bool is_attached_to(arena*); 133 void attach_task_dispatcher(task_dispatcher&); 134 void detach_task_dispatcher(); 135 void context_list_cleanup(); 136 template <typename T> 137 void propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state); 138 139 //! Index of the arena slot the scheduler occupies now, or occupied last time 140 unsigned short my_arena_index; 141 142 //! Indicates if the thread is created by RML 143 const bool my_is_worker; 144 145 //! The current task dipsatcher 146 task_dispatcher* my_task_dispatcher; 147 148 //! The arena that I own (if external thread) or am servicing at the moment (if worker) 149 arena* my_arena; 150 151 //! Pointer to the slot in the arena we own at the moment 152 arena_slot* my_arena_slot; 153 154 //! The mailbox (affinity mechanism) the current thread attached to 155 mail_inbox my_inbox; 156 157 //! The random generator 158 FastRandom my_random; 159 160 //! Last observer in the observers list processed on this slot 161 observer_proxy* my_last_observer; 162 163 //! Pool of small object for fast task allocation 164 small_object_pool_impl* my_small_object_pool; 165 166 context_list* my_context_list; 167 #if __TBB_RESUMABLE_TASKS 168 //! The list of possible post resume actions. 169 enum class post_resume_action { 170 invalid, 171 register_waiter, 172 resume, 173 callback, 174 cleanup, 175 notify, 176 none 177 }; 178 179 //! The callback to call the user callback passed to tbb::suspend. 180 struct suspend_callback_wrapper { 181 suspend_callback_type suspend_callback; 182 void* user_callback; 183 suspend_point_type* tag; 184 185 void operator()() { 186 __TBB_ASSERT(suspend_callback && user_callback && tag, nullptr); 187 suspend_callback(user_callback, tag); 188 } 189 }; 190 191 //! Suspends the current coroutine (task_dispatcher). 192 void suspend(void* suspend_callback, void* user_callback); 193 194 //! Resumes the target task_dispatcher. 195 void resume(task_dispatcher& target); 196 197 //! Set post resume action to perform after resume. 198 void set_post_resume_action(post_resume_action pra, void* arg) { 199 __TBB_ASSERT(my_post_resume_action == post_resume_action::none, "The Post resume action must not be set"); 200 __TBB_ASSERT(!my_post_resume_arg, "The post resume action must not have an argument"); 201 my_post_resume_action = pra; 202 my_post_resume_arg = arg; 203 } 204 205 void clear_post_resume_action() { 206 my_post_resume_action = thread_data::post_resume_action::none; 207 my_post_resume_arg = nullptr; 208 } 209 210 //! Performs post resume action. 211 void do_post_resume_action(); 212 213 //! The post resume action requested after the swap contexts. 214 post_resume_action my_post_resume_action; 215 216 //! The post resume action argument. 217 void* my_post_resume_arg; 218 #endif /* __TBB_RESUMABLE_TASKS */ 219 220 //! The default context 221 // TODO: consider using common default context because it is used only to simplify 222 // cancellation check. 223 d1::task_group_context my_default_context; 224 }; 225 226 inline void thread_data::attach_arena(arena& a, std::size_t index) { 227 my_arena = &a; 228 my_arena_index = static_cast<unsigned short>(index); 229 my_arena_slot = a.my_slots + index; 230 // Read the current slot mail_outbox and attach it to the mail_inbox (remove inbox later maybe) 231 my_inbox.attach(my_arena->mailbox(index)); 232 } 233 234 inline bool thread_data::is_attached_to(arena* a) { return my_arena == a; } 235 236 inline void thread_data::attach_task_dispatcher(task_dispatcher& task_disp) { 237 __TBB_ASSERT(my_task_dispatcher == nullptr, nullptr); 238 __TBB_ASSERT(task_disp.m_thread_data == nullptr, nullptr); 239 task_disp.m_thread_data = this; 240 my_task_dispatcher = &task_disp; 241 } 242 243 inline void thread_data::detach_task_dispatcher() { 244 __TBB_ASSERT(my_task_dispatcher != nullptr, nullptr); 245 __TBB_ASSERT(my_task_dispatcher->m_thread_data == this, nullptr); 246 my_task_dispatcher->m_thread_data = nullptr; 247 my_task_dispatcher = nullptr; 248 } 249 250 } // namespace r1 251 } // namespace detail 252 } // namespace tbb 253 254 #endif // __TBB_thread_data_H 255 256