1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/detail/_config.h" 18 #include "oneapi/tbb/tbb_allocator.h" 19 #include "oneapi/tbb/task_group.h" 20 #include "governor.h" 21 #include "thread_data.h" 22 #include "scheduler_common.h" 23 #include "itt_notify.h" 24 #include "task_dispatcher.h" 25 26 #include <type_traits> 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 //------------------------------------------------------------------------ 33 // tbb_exception_ptr 34 //------------------------------------------------------------------------ 35 tbb_exception_ptr* tbb_exception_ptr::allocate() noexcept { 36 tbb_exception_ptr* eptr = (tbb_exception_ptr*)allocate_memory(sizeof(tbb_exception_ptr)); 37 return eptr ? new (eptr) tbb_exception_ptr(std::current_exception()) : nullptr; 38 } 39 40 void tbb_exception_ptr::destroy() noexcept { 41 this->~tbb_exception_ptr(); 42 deallocate_memory(this); 43 } 44 45 void tbb_exception_ptr::throw_self() { 46 if (governor::rethrow_exception_broken()) fix_broken_rethrow(); 47 std::rethrow_exception(my_ptr); 48 } 49 50 //------------------------------------------------------------------------ 51 // task_group_context 52 //------------------------------------------------------------------------ 53 54 void task_group_context_impl::destroy(d1::task_group_context& ctx) { 55 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 56 57 if (ctx.my_context_list != nullptr) { 58 __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) == d1::task_group_context::state::bound, nullptr); 59 // The owner can be destroyed at any moment. Access the associate data with caution. 60 ctx.my_context_list->remove(ctx.my_node); 61 } 62 d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env); 63 #if _MSC_VER && _MSC_VER <= 1900 && !__INTEL_COMPILER 64 suppress_unused_warning(ctl); 65 #endif 66 ctl->~cpu_ctl_env(); 67 68 auto exception = ctx.my_exception.load(std::memory_order_relaxed); 69 if (exception) { 70 exception->destroy(); 71 } 72 ITT_STACK_DESTROY(ctx.my_itt_caller); 73 74 poison_pointer(ctx.my_parent); 75 poison_pointer(ctx.my_context_list); 76 poison_pointer(ctx.my_node.my_next_node); 77 poison_pointer(ctx.my_node.my_prev_node); 78 poison_pointer(ctx.my_exception); 79 poison_pointer(ctx.my_itt_caller); 80 81 ctx.my_state.store(d1::task_group_context::state::dead, std::memory_order_release); 82 } 83 84 void task_group_context_impl::initialize(d1::task_group_context& ctx) { 85 ITT_TASK_GROUP(&ctx, ctx.my_name, nullptr); 86 87 ctx.my_node.my_next_node = &ctx.my_node; 88 ctx.my_node.my_prev_node = &ctx.my_node; 89 ctx.my_cpu_ctl_env = 0; 90 ctx.my_cancellation_requested = 0; 91 ctx.my_may_have_children.store(0, std::memory_order_relaxed); 92 // Set the created state to bound at the first usage. 93 ctx.my_state.store(d1::task_group_context::state::created, std::memory_order_relaxed); 94 ctx.my_parent = nullptr; 95 ctx.my_context_list = nullptr; 96 ctx.my_exception.store(nullptr, std::memory_order_relaxed); 97 ctx.my_itt_caller = nullptr; 98 99 static_assert(sizeof(d1::cpu_ctl_env) <= sizeof(ctx.my_cpu_ctl_env), "FPU settings storage does not fit to uint64_t"); 100 d1::cpu_ctl_env* ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env; 101 if (ctx.my_traits.fp_settings) 102 ctl->get_env(); 103 } 104 105 void task_group_context_impl::register_with(d1::task_group_context& ctx, thread_data* td) { 106 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 107 __TBB_ASSERT(td, nullptr); 108 ctx.my_context_list = td->my_context_list; 109 110 ctx.my_context_list->push_front(ctx.my_node); 111 } 112 113 void task_group_context_impl::bind_to_impl(d1::task_group_context& ctx, thread_data* td) { 114 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 115 __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) == d1::task_group_context::state::locked, "The context can be bound only under the lock."); 116 __TBB_ASSERT(!ctx.my_parent, "Parent is set before initial binding"); 117 118 ctx.my_parent = td->my_task_dispatcher->m_execute_data_ext.context; 119 __TBB_ASSERT(ctx.my_parent, nullptr); 120 121 // Inherit FPU settings only if the context has not captured FPU settings yet. 122 if (!ctx.my_traits.fp_settings) 123 copy_fp_settings(ctx, *ctx.my_parent); 124 125 // Condition below prevents unnecessary thrashing parent context's cache line 126 if (ctx.my_parent->my_may_have_children.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) { 127 ctx.my_parent->my_may_have_children.store(d1::task_group_context::may_have_children, std::memory_order_relaxed); // full fence is below 128 } 129 if (ctx.my_parent->my_parent) { 130 // Even if this context were made accessible for state change propagation 131 // (by placing store_with_release(td->my_context_list_state.head.my_next, &ctx.my_node) 132 // above), it still could be missed if state propagation from a grand-ancestor 133 // was underway concurrently with binding. 134 // Speculative propagation from the parent together with epoch counters 135 // detecting possibility of such a race allow to avoid taking locks when 136 // there is no contention. 137 138 // Acquire fence is necessary to prevent reordering subsequent speculative 139 // loads of parent state data out of the scope where epoch counters comparison 140 // can reliably validate it. 141 uintptr_t local_count_snapshot = ctx.my_parent->my_context_list->epoch.load(std::memory_order_acquire); 142 // Speculative propagation of parent's state. The speculation will be 143 // validated by the epoch counters check further on. 144 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 145 register_with(ctx, td); // Issues full fence 146 147 // If no state propagation was detected by the following condition, the above 148 // full fence guarantees that the parent had correct state during speculative 149 // propagation before the fence. Otherwise the propagation from parent is 150 // repeated under the lock. 151 if (local_count_snapshot != the_context_state_propagation_epoch.load(std::memory_order_relaxed)) { 152 // Another thread may be propagating state change right now. So resort to lock. 153 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 154 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 155 } 156 } else { 157 register_with(ctx, td); // Issues full fence 158 // As we do not have grand-ancestors, concurrent state propagation (if any) 159 // may originate only from the parent context, and thus it is safe to directly 160 // copy the state from it. 161 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 162 } 163 } 164 165 void task_group_context_impl::bind_to(d1::task_group_context& ctx, thread_data* td) { 166 d1::task_group_context::state state = ctx.my_state.load(std::memory_order_acquire); 167 if (state <= d1::task_group_context::state::locked) { 168 if (state == d1::task_group_context::state::created && 169 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910 170 ((std::atomic<typename std::underlying_type<d1::task_group_context::state>::type>&)ctx.my_state).compare_exchange_strong( 171 (typename std::underlying_type<d1::task_group_context::state>::type&)state, 172 (typename std::underlying_type<d1::task_group_context::state>::type)d1::task_group_context::state::locked) 173 #else 174 ctx.my_state.compare_exchange_strong(state, d1::task_group_context::state::locked) 175 #endif 176 ) { 177 // If we are in the outermost task dispatch loop of an external thread, then 178 // there is nothing to bind this context to, and we skip the binding part 179 // treating the context as isolated. 180 __TBB_ASSERT(td->my_task_dispatcher->m_execute_data_ext.context != nullptr, nullptr); 181 d1::task_group_context::state release_state{}; 182 if (td->my_task_dispatcher->m_execute_data_ext.context == td->my_arena->my_default_ctx || !ctx.my_traits.bound) { 183 if (!ctx.my_traits.fp_settings) { 184 copy_fp_settings(ctx, *td->my_arena->my_default_ctx); 185 } 186 release_state = d1::task_group_context::state::isolated; 187 } else { 188 bind_to_impl(ctx, td); 189 release_state = d1::task_group_context::state::bound; 190 } 191 ITT_STACK_CREATE(ctx.my_itt_caller); 192 ctx.my_state.store(release_state, std::memory_order_release); 193 } 194 spin_wait_while_eq(ctx.my_state, d1::task_group_context::state::locked); 195 } 196 __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) != d1::task_group_context::state::created, nullptr); 197 __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) != d1::task_group_context::state::locked, nullptr); 198 } 199 200 template <typename T> 201 void task_group_context_impl::propagate_task_group_state(d1::task_group_context& ctx, std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 202 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 203 /* 1. if ((ctx.*mptr_state).load(std::memory_order_relaxed) == new_state): 204 Nothing to do, whether descending from "src" or not, so no need to scan. 205 Hopefully this happens often thanks to earlier invocations. 206 This optimization is enabled by LIFO order in the context lists: 207 - new contexts are bound to the beginning of lists; 208 - descendants are newer than ancestors; 209 - earlier invocations are therefore likely to "paint" long chains. 210 2. if (&ctx != &src): 211 This clause is disjunct from the traversal below, which skips src entirely. 212 Note that src.*mptr_state is not necessarily still equal to new_state (another thread may have changed it again). 213 Such interference is probably not frequent enough to aim for optimisation by writing new_state again (to make the other thread back down). 214 Letting the other thread prevail may also be fairer. 215 */ 216 if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state && &ctx != &src) { 217 for (d1::task_group_context* ancestor = ctx.my_parent; ancestor != nullptr; ancestor = ancestor->my_parent) { 218 if (ancestor == &src) { 219 for (d1::task_group_context* c = &ctx; c != ancestor; c = c->my_parent) 220 (c->*mptr_state).store(new_state, std::memory_order_relaxed); 221 break; 222 } 223 } 224 } 225 } 226 227 template <typename T> 228 void thread_data::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 229 mutex::scoped_lock lock(my_context_list->m_mutex); 230 // Acquire fence is necessary to ensure that the subsequent node->my_next load 231 // returned the correct value in case it was just inserted in another thread. 232 // The fence also ensures visibility of the correct ctx.my_parent value. 233 for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) { 234 d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it)); 235 if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state) 236 task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state); 237 } 238 // Sync up local propagation epoch with the global one. Release fence prevents 239 // reordering of possible store to *mptr_state after the sync point. 240 my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release); 241 } 242 243 template <typename T> 244 bool market::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 245 if (src.my_may_have_children.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) 246 return true; 247 // The whole propagation algorithm is under the lock in order to ensure correctness 248 // in case of concurrent state changes at the different levels of the context tree. 249 // See comment at the bottom of scheduler.cpp 250 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 251 if ((src.*mptr_state).load(std::memory_order_relaxed) != new_state) 252 // Another thread has concurrently changed the state. Back down. 253 return false; 254 // Advance global state propagation epoch 255 ++the_context_state_propagation_epoch; 256 // Propagate to all workers and external threads and sync up their local epochs with the global one 257 unsigned num_workers = my_first_unused_worker_idx; 258 for (unsigned i = 0; i < num_workers; ++i) { 259 thread_data* td = my_workers[i].load(std::memory_order_acquire); 260 // If the worker is only about to be registered, skip it. 261 if (td) 262 td->propagate_task_group_state(mptr_state, src, new_state); 263 } 264 // Propagate to all external threads 265 // The whole propagation sequence is locked, thus no contention is expected 266 for (thread_data_list_type::iterator it = my_masters.begin(); it != my_masters.end(); it++) 267 it->propagate_task_group_state(mptr_state, src, new_state); 268 return true; 269 } 270 271 bool task_group_context_impl::cancel_group_execution(d1::task_group_context& ctx) { 272 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 273 __TBB_ASSERT(ctx.my_cancellation_requested.load(std::memory_order_relaxed) <= 1, "The cancellation state can be either 0 or 1"); 274 if (ctx.my_cancellation_requested.load(std::memory_order_relaxed) || ctx.my_cancellation_requested.exchange(1)) { 275 // This task group and any descendants have already been canceled. 276 // (A newly added descendant would inherit its parent's ctx.my_cancellation_requested, 277 // not missing out on any cancellation still being propagated, and a context cannot be uncanceled.) 278 return false; 279 } 280 governor::get_thread_data()->my_arena->my_market->propagate_task_group_state(&d1::task_group_context::my_cancellation_requested, ctx, uint32_t(1)); 281 return true; 282 } 283 284 bool task_group_context_impl::is_group_execution_cancelled(const d1::task_group_context& ctx) { 285 return ctx.my_cancellation_requested.load(std::memory_order_relaxed) != 0; 286 } 287 288 // IMPORTANT: It is assumed that this method is not used concurrently! 289 void task_group_context_impl::reset(d1::task_group_context& ctx) { 290 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 291 //! TODO: Add assertion that this context does not have children 292 // No fences are necessary since this context can be accessed from another thread 293 // only after stealing happened (which means necessary fences were used). 294 295 auto exception = ctx.my_exception.load(std::memory_order_relaxed); 296 if (exception) { 297 exception->destroy(); 298 ctx.my_exception.store(nullptr, std::memory_order_relaxed); 299 } 300 ctx.my_cancellation_requested = 0; 301 } 302 303 // IMPORTANT: It is assumed that this method is not used concurrently! 304 void task_group_context_impl::capture_fp_settings(d1::task_group_context& ctx) { 305 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 306 //! TODO: Add assertion that this context does not have children 307 // No fences are necessary since this context can be accessed from another thread 308 // only after stealing happened (which means necessary fences were used). 309 d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env); 310 if (!ctx.my_traits.fp_settings) { 311 ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env; 312 ctx.my_traits.fp_settings = true; 313 } 314 ctl->get_env(); 315 } 316 317 void task_group_context_impl::copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src) { 318 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 319 __TBB_ASSERT(!ctx.my_traits.fp_settings, "The context already has FPU settings."); 320 __TBB_ASSERT(src.my_traits.fp_settings, "The source context does not have FPU settings."); 321 322 const d1::cpu_ctl_env* src_ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&src.my_cpu_ctl_env); 323 new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env(*src_ctl); 324 ctx.my_traits.fp_settings = true; 325 } 326 327 /* 328 Comments: 329 330 1. The premise of the cancellation support implementation is that cancellations are 331 not part of the hot path of the program execution. Therefore all changes in its 332 implementation in order to reduce the overhead of the cancellation control flow 333 should be done only in ways that do not increase overhead of the normal execution. 334 335 In general, contexts are used by all threads and their descendants are created in 336 different threads as well. In order to minimize impact of the cross-thread tree 337 maintenance (first of all because of the synchronization), the tree of contexts 338 is split into pieces, each of which is handled by a single thread. Such pieces 339 are represented as lists of contexts, members of which are contexts that were 340 bound to their parents in the given thread. 341 342 The context tree maintenance and cancellation propagation algorithms are designed 343 in such a manner that cross-thread access to a context list will take place only 344 when cancellation signal is sent (by user or when an exception happens), and 345 synchronization is necessary only then. Thus the normal execution flow (without 346 exceptions and cancellation) remains free from any synchronization done on 347 behalf of exception handling and cancellation support. 348 349 2. Consider parallel cancellations at the different levels of the context tree: 350 351 Ctx1 <- Cancelled by Thread1 |- Thread2 started processing 352 | | 353 Ctx2 |- Thread1 started processing 354 | T1 |- Thread2 finishes and syncs up local counters 355 Ctx3 <- Cancelled by Thread2 | 356 | |- Ctx5 is bound to Ctx2 357 Ctx4 | 358 T2 |- Thread1 reaches Ctx2 359 360 Thread-propagator of each cancellation increments global counter. However the thread 361 propagating the cancellation from the outermost context (Thread1) may be the last 362 to finish. Which means that the local counters may be synchronized earlier (by Thread2, 363 at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context 364 (Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only 365 (Ctx2) may result in cancellation request being lost. 366 367 This issue is solved by doing the whole propagation under the lock. 368 369 If we need more concurrency while processing parallel cancellations, we could try 370 the following modification of the propagation algorithm: 371 372 advance global counter and remember it 373 for each thread: 374 scan thread's list of contexts 375 for each thread: 376 sync up its local counter only if the global counter has not been changed 377 378 However this version of the algorithm requires more analysis and verification. 379 */ 380 381 void __TBB_EXPORTED_FUNC initialize(d1::task_group_context& ctx) { 382 task_group_context_impl::initialize(ctx); 383 } 384 void __TBB_EXPORTED_FUNC destroy(d1::task_group_context& ctx) { 385 task_group_context_impl::destroy(ctx); 386 } 387 void __TBB_EXPORTED_FUNC reset(d1::task_group_context& ctx) { 388 task_group_context_impl::reset(ctx); 389 } 390 bool __TBB_EXPORTED_FUNC cancel_group_execution(d1::task_group_context& ctx) { 391 return task_group_context_impl::cancel_group_execution(ctx); 392 } 393 bool __TBB_EXPORTED_FUNC is_group_execution_cancelled(d1::task_group_context& ctx) { 394 return task_group_context_impl::is_group_execution_cancelled(ctx); 395 } 396 void __TBB_EXPORTED_FUNC capture_fp_settings(d1::task_group_context& ctx) { 397 task_group_context_impl::capture_fp_settings(ctx); 398 } 399 400 } // namespace r1 401 } // namespace detail 402 } // namespace tbb 403 404