1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/detail/_config.h" 18 #include "oneapi/tbb/tbb_allocator.h" 19 #include "oneapi/tbb/task_group.h" 20 #include "governor.h" 21 #include "thread_data.h" 22 #include "scheduler_common.h" 23 #include "itt_notify.h" 24 #include "task_dispatcher.h" 25 26 #include <type_traits> 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 //------------------------------------------------------------------------ 33 // tbb_exception_ptr 34 //------------------------------------------------------------------------ 35 tbb_exception_ptr* tbb_exception_ptr::allocate() noexcept { 36 tbb_exception_ptr* eptr = (tbb_exception_ptr*)allocate_memory(sizeof(tbb_exception_ptr)); 37 return eptr ? new (eptr) tbb_exception_ptr(std::current_exception()) : nullptr; 38 } 39 40 void tbb_exception_ptr::destroy() noexcept { 41 this->~tbb_exception_ptr(); 42 deallocate_memory(this); 43 } 44 45 void tbb_exception_ptr::throw_self() { 46 if (governor::rethrow_exception_broken()) fix_broken_rethrow(); 47 std::rethrow_exception(my_ptr); 48 } 49 50 //------------------------------------------------------------------------ 51 // task_group_context 52 //------------------------------------------------------------------------ 53 54 void task_group_context_impl::destroy(d1::task_group_context& ctx) { 55 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 56 57 if (ctx.my_context_list != nullptr) { 58 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::bound, nullptr); 59 // The owner can be destroyed at any moment. Access the associate data with caution. 60 ctx.my_context_list->remove(ctx.my_node); 61 } 62 d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env); 63 #if _MSC_VER && _MSC_VER <= 1900 && !__INTEL_COMPILER 64 suppress_unused_warning(ctl); 65 #endif 66 ctl->~cpu_ctl_env(); 67 68 if (ctx.my_exception) 69 ctx.my_exception->destroy(); 70 ITT_STACK_DESTROY(ctx.my_itt_caller); 71 72 poison_pointer(ctx.my_parent); 73 poison_pointer(ctx.my_context_list); 74 poison_pointer(ctx.my_node.my_next_node); 75 poison_pointer(ctx.my_node.my_prev_node); 76 poison_pointer(ctx.my_exception); 77 poison_pointer(ctx.my_itt_caller); 78 79 ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::dead, std::memory_order_release); 80 } 81 82 void task_group_context_impl::initialize(d1::task_group_context& ctx) { 83 ITT_TASK_GROUP(&ctx, ctx.my_name, nullptr); 84 85 ctx.my_node.my_next_node = &ctx.my_node; 86 ctx.my_node.my_prev_node = &ctx.my_node; 87 ctx.my_cpu_ctl_env = 0; 88 ctx.my_cancellation_requested = 0; 89 ctx.my_state.store(0, std::memory_order_relaxed); 90 // Set the created state to bound at the first usage. 91 ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::created, std::memory_order_relaxed); 92 ctx.my_parent = nullptr; 93 ctx.my_context_list = nullptr; 94 ctx.my_exception = nullptr; 95 ctx.my_itt_caller = nullptr; 96 97 static_assert(sizeof(d1::cpu_ctl_env) <= sizeof(ctx.my_cpu_ctl_env), "FPU settings storage does not fit to uint64_t"); 98 d1::cpu_ctl_env* ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env; 99 if (ctx.my_traits.fp_settings) 100 ctl->get_env(); 101 } 102 103 void task_group_context_impl::register_with(d1::task_group_context& ctx, thread_data* td) { 104 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 105 __TBB_ASSERT(td, nullptr); 106 ctx.my_context_list = td->my_context_list; 107 108 ctx.my_context_list->push_front(ctx.my_node); 109 } 110 111 void task_group_context_impl::bind_to_impl(d1::task_group_context& ctx, thread_data* td) { 112 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 113 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::locked, "The context can be bound only under the lock."); 114 __TBB_ASSERT(!ctx.my_parent, "Parent is set before initial binding"); 115 116 ctx.my_parent = td->my_task_dispatcher->m_execute_data_ext.context; 117 __TBB_ASSERT(ctx.my_parent, nullptr); 118 119 // Inherit FPU settings only if the context has not captured FPU settings yet. 120 if (!ctx.my_traits.fp_settings) 121 copy_fp_settings(ctx, *ctx.my_parent); 122 123 // Condition below prevents unnecessary thrashing parent context's cache line 124 if (ctx.my_parent->my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) { 125 ctx.my_parent->my_state.store(d1::task_group_context::may_have_children, std::memory_order_relaxed); // full fence is below 126 } 127 if (ctx.my_parent->my_parent) { 128 // Even if this context were made accessible for state change propagation 129 // (by placing store_with_release(td->my_context_list_state.head.my_next, &ctx.my_node) 130 // above), it still could be missed if state propagation from a grand-ancestor 131 // was underway concurrently with binding. 132 // Speculative propagation from the parent together with epoch counters 133 // detecting possibility of such a race allow to avoid taking locks when 134 // there is no contention. 135 136 // Acquire fence is necessary to prevent reordering subsequent speculative 137 // loads of parent state data out of the scope where epoch counters comparison 138 // can reliably validate it. 139 uintptr_t local_count_snapshot = ctx.my_parent->my_context_list->epoch.load(std::memory_order_acquire); 140 // Speculative propagation of parent's state. The speculation will be 141 // validated by the epoch counters check further on. 142 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 143 register_with(ctx, td); // Issues full fence 144 145 // If no state propagation was detected by the following condition, the above 146 // full fence guarantees that the parent had correct state during speculative 147 // propagation before the fence. Otherwise the propagation from parent is 148 // repeated under the lock. 149 if (local_count_snapshot != the_context_state_propagation_epoch.load(std::memory_order_relaxed)) { 150 // Another thread may be propagating state change right now. So resort to lock. 151 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 152 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 153 } 154 } else { 155 register_with(ctx, td); // Issues full fence 156 // As we do not have grand-ancestors, concurrent state propagation (if any) 157 // may originate only from the parent context, and thus it is safe to directly 158 // copy the state from it. 159 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 160 } 161 162 ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::bound, std::memory_order_release); 163 } 164 165 void task_group_context_impl::bind_to(d1::task_group_context& ctx, thread_data* td) { 166 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 167 d1::task_group_context::lifetime_state state = ctx.my_lifetime_state.load(std::memory_order_acquire); 168 if (state <= d1::task_group_context::lifetime_state::locked) { 169 if (state == d1::task_group_context::lifetime_state::created && 170 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910 171 ((std::atomic<typename std::underlying_type<d1::task_group_context::lifetime_state>::type>&)ctx.my_lifetime_state).compare_exchange_strong( 172 (typename std::underlying_type<d1::task_group_context::lifetime_state>::type&)state, 173 (typename std::underlying_type<d1::task_group_context::lifetime_state>::type)d1::task_group_context::lifetime_state::locked) 174 #else 175 ctx.my_lifetime_state.compare_exchange_strong(state, d1::task_group_context::lifetime_state::locked) 176 #endif 177 ) { 178 // If we are in the outermost task dispatch loop of an external thread, then 179 // there is nothing to bind this context to, and we skip the binding part 180 // treating the context as isolated. 181 __TBB_ASSERT(td->my_task_dispatcher->m_execute_data_ext.context != nullptr, nullptr); 182 if (td->my_task_dispatcher->m_execute_data_ext.context == td->my_arena->my_default_ctx || !ctx.my_traits.bound) { 183 if (!ctx.my_traits.fp_settings) { 184 copy_fp_settings(ctx, *td->my_arena->my_default_ctx); 185 } 186 ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::isolated, std::memory_order_release); 187 } else { 188 bind_to_impl(ctx, td); 189 } 190 ITT_STACK_CREATE(ctx.my_itt_caller); 191 } 192 spin_wait_while_eq(ctx.my_lifetime_state, d1::task_group_context::lifetime_state::locked); 193 } 194 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::created, nullptr); 195 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::locked, nullptr); 196 } 197 198 template <typename T> 199 void task_group_context_impl::propagate_task_group_state(d1::task_group_context& ctx, std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 200 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 201 /* 1. if ((ctx.*mptr_state).load(std::memory_order_relaxed) == new_state): 202 Nothing to do, whether descending from "src" or not, so no need to scan. 203 Hopefully this happens often thanks to earlier invocations. 204 This optimization is enabled by LIFO order in the context lists: 205 - new contexts are bound to the beginning of lists; 206 - descendants are newer than ancestors; 207 - earlier invocations are therefore likely to "paint" long chains. 208 2. if (&ctx != &src): 209 This clause is disjunct from the traversal below, which skips src entirely. 210 Note that src.*mptr_state is not necessarily still equal to new_state (another thread may have changed it again). 211 Such interference is probably not frequent enough to aim for optimisation by writing new_state again (to make the other thread back down). 212 Letting the other thread prevail may also be fairer. 213 */ 214 if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state && &ctx != &src) { 215 for (d1::task_group_context* ancestor = ctx.my_parent; ancestor != nullptr; ancestor = ancestor->my_parent) { 216 if (ancestor == &src) { 217 for (d1::task_group_context* c = &ctx; c != ancestor; c = c->my_parent) 218 (c->*mptr_state).store(new_state, std::memory_order_relaxed); 219 break; 220 } 221 } 222 } 223 } 224 225 template <typename T> 226 void thread_data::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 227 mutex::scoped_lock lock(my_context_list->m_mutex); 228 // Acquire fence is necessary to ensure that the subsequent node->my_next load 229 // returned the correct value in case it was just inserted in another thread. 230 // The fence also ensures visibility of the correct ctx.my_parent value. 231 for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) { 232 d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it)); 233 if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state) 234 task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state); 235 } 236 // Sync up local propagation epoch with the global one. Release fence prevents 237 // reordering of possible store to *mptr_state after the sync point. 238 my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release); 239 } 240 241 template <typename T> 242 bool market::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 243 if (src.my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) 244 return true; 245 // The whole propagation algorithm is under the lock in order to ensure correctness 246 // in case of concurrent state changes at the different levels of the context tree. 247 // See comment at the bottom of scheduler.cpp 248 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 249 if ((src.*mptr_state).load(std::memory_order_relaxed) != new_state) 250 // Another thread has concurrently changed the state. Back down. 251 return false; 252 // Advance global state propagation epoch 253 ++the_context_state_propagation_epoch; 254 // Propagate to all workers and external threads and sync up their local epochs with the global one 255 unsigned num_workers = my_first_unused_worker_idx; 256 for (unsigned i = 0; i < num_workers; ++i) { 257 thread_data* td = my_workers[i]; 258 // If the worker is only about to be registered, skip it. 259 if (td) 260 td->propagate_task_group_state(mptr_state, src, new_state); 261 } 262 // Propagate to all external threads 263 // The whole propagation sequence is locked, thus no contention is expected 264 for (thread_data_list_type::iterator it = my_masters.begin(); it != my_masters.end(); it++) 265 it->propagate_task_group_state(mptr_state, src, new_state); 266 return true; 267 } 268 269 bool task_group_context_impl::cancel_group_execution(d1::task_group_context& ctx) { 270 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 271 __TBB_ASSERT(ctx.my_cancellation_requested.load(std::memory_order_relaxed) <= 1, "The cancellation state can be either 0 or 1"); 272 if (ctx.my_cancellation_requested.load(std::memory_order_relaxed) || ctx.my_cancellation_requested.exchange(1)) { 273 // This task group and any descendants have already been canceled. 274 // (A newly added descendant would inherit its parent's ctx.my_cancellation_requested, 275 // not missing out on any cancellation still being propagated, and a context cannot be uncanceled.) 276 return false; 277 } 278 governor::get_thread_data()->my_arena->my_market->propagate_task_group_state(&d1::task_group_context::my_cancellation_requested, ctx, uint32_t(1)); 279 return true; 280 } 281 282 bool task_group_context_impl::is_group_execution_cancelled(const d1::task_group_context& ctx) { 283 return ctx.my_cancellation_requested.load(std::memory_order_relaxed) != 0; 284 } 285 286 // IMPORTANT: It is assumed that this method is not used concurrently! 287 void task_group_context_impl::reset(d1::task_group_context& ctx) { 288 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 289 //! TODO: Add assertion that this context does not have children 290 // No fences are necessary since this context can be accessed from another thread 291 // only after stealing happened (which means necessary fences were used). 292 if (ctx.my_exception) { 293 ctx.my_exception->destroy(); 294 ctx.my_exception = nullptr; 295 } 296 ctx.my_cancellation_requested = 0; 297 } 298 299 // IMPORTANT: It is assumed that this method is not used concurrently! 300 void task_group_context_impl::capture_fp_settings(d1::task_group_context& ctx) { 301 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 302 //! TODO: Add assertion that this context does not have children 303 // No fences are necessary since this context can be accessed from another thread 304 // only after stealing happened (which means necessary fences were used). 305 d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env); 306 if (!ctx.my_traits.fp_settings) { 307 ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env; 308 ctx.my_traits.fp_settings = true; 309 } 310 ctl->get_env(); 311 } 312 313 void task_group_context_impl::copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src) { 314 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 315 __TBB_ASSERT(!ctx.my_traits.fp_settings, "The context already has FPU settings."); 316 __TBB_ASSERT(src.my_traits.fp_settings, "The source context does not have FPU settings."); 317 318 const d1::cpu_ctl_env* src_ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&src.my_cpu_ctl_env); 319 new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env(*src_ctl); 320 ctx.my_traits.fp_settings = true; 321 } 322 323 /* 324 Comments: 325 326 1. The premise of the cancellation support implementation is that cancellations are 327 not part of the hot path of the program execution. Therefore all changes in its 328 implementation in order to reduce the overhead of the cancellation control flow 329 should be done only in ways that do not increase overhead of the normal execution. 330 331 In general, contexts are used by all threads and their descendants are created in 332 different threads as well. In order to minimize impact of the cross-thread tree 333 maintenance (first of all because of the synchronization), the tree of contexts 334 is split into pieces, each of which is handled by a single thread. Such pieces 335 are represented as lists of contexts, members of which are contexts that were 336 bound to their parents in the given thread. 337 338 The context tree maintenance and cancellation propagation algorithms are designed 339 in such a manner that cross-thread access to a context list will take place only 340 when cancellation signal is sent (by user or when an exception happens), and 341 synchronization is necessary only then. Thus the normal execution flow (without 342 exceptions and cancellation) remains free from any synchronization done on 343 behalf of exception handling and cancellation support. 344 345 2. Consider parallel cancellations at the different levels of the context tree: 346 347 Ctx1 <- Cancelled by Thread1 |- Thread2 started processing 348 | | 349 Ctx2 |- Thread1 started processing 350 | T1 |- Thread2 finishes and syncs up local counters 351 Ctx3 <- Cancelled by Thread2 | 352 | |- Ctx5 is bound to Ctx2 353 Ctx4 | 354 T2 |- Thread1 reaches Ctx2 355 356 Thread-propagator of each cancellation increments global counter. However the thread 357 propagating the cancellation from the outermost context (Thread1) may be the last 358 to finish. Which means that the local counters may be synchronized earlier (by Thread2, 359 at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context 360 (Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only 361 (Ctx2) may result in cancellation request being lost. 362 363 This issue is solved by doing the whole propagation under the lock. 364 365 If we need more concurrency while processing parallel cancellations, we could try 366 the following modification of the propagation algorithm: 367 368 advance global counter and remember it 369 for each thread: 370 scan thread's list of contexts 371 for each thread: 372 sync up its local counter only if the global counter has not been changed 373 374 However this version of the algorithm requires more analysis and verification. 375 */ 376 377 void __TBB_EXPORTED_FUNC initialize(d1::task_group_context& ctx) { 378 task_group_context_impl::initialize(ctx); 379 } 380 void __TBB_EXPORTED_FUNC destroy(d1::task_group_context& ctx) { 381 task_group_context_impl::destroy(ctx); 382 } 383 void __TBB_EXPORTED_FUNC reset(d1::task_group_context& ctx) { 384 task_group_context_impl::reset(ctx); 385 } 386 bool __TBB_EXPORTED_FUNC cancel_group_execution(d1::task_group_context& ctx) { 387 return task_group_context_impl::cancel_group_execution(ctx); 388 } 389 bool __TBB_EXPORTED_FUNC is_group_execution_cancelled(d1::task_group_context& ctx) { 390 return task_group_context_impl::is_group_execution_cancelled(ctx); 391 } 392 void __TBB_EXPORTED_FUNC capture_fp_settings(d1::task_group_context& ctx) { 393 task_group_context_impl::capture_fp_settings(ctx); 394 } 395 396 } // namespace r1 397 } // namespace detail 398 } // namespace tbb 399 400