1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/detail/_config.h" 18 #include "oneapi/tbb/tbb_allocator.h" 19 #include "oneapi/tbb/task_group.h" 20 #include "governor.h" 21 #include "thread_data.h" 22 #include "scheduler_common.h" 23 #include "itt_notify.h" 24 #include "task_dispatcher.h" 25 26 #include <type_traits> 27 28 namespace tbb { 29 namespace detail { 30 namespace r1 { 31 32 //------------------------------------------------------------------------ 33 // tbb_exception_ptr 34 //------------------------------------------------------------------------ 35 tbb_exception_ptr* tbb_exception_ptr::allocate() noexcept { 36 tbb_exception_ptr* eptr = (tbb_exception_ptr*)allocate_memory(sizeof(tbb_exception_ptr)); 37 return eptr ? new (eptr) tbb_exception_ptr(std::current_exception()) : nullptr; 38 } 39 40 void tbb_exception_ptr::destroy() noexcept { 41 this->~tbb_exception_ptr(); 42 deallocate_memory(this); 43 } 44 45 void tbb_exception_ptr::throw_self() { 46 if (governor::rethrow_exception_broken()) fix_broken_rethrow(); 47 std::rethrow_exception(my_ptr); 48 } 49 50 //------------------------------------------------------------------------ 51 // task_group_context 52 //------------------------------------------------------------------------ 53 54 void task_group_context_impl::destroy(d1::task_group_context& ctx) { 55 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 56 57 if (ctx.my_context_list != nullptr) { 58 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::bound, nullptr); 59 // The owner can be destroyed at any moment. Access the associate data with caution. 60 ctx.my_context_list->remove(ctx.my_node); 61 } 62 d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env); 63 #if _MSC_VER && _MSC_VER <= 1900 && !__INTEL_COMPILER 64 suppress_unused_warning(ctl); 65 #endif 66 ctl->~cpu_ctl_env(); 67 68 auto exception = ctx.my_exception.load(std::memory_order_relaxed); 69 if (exception) { 70 exception->destroy(); 71 } 72 ITT_STACK_DESTROY(ctx.my_itt_caller); 73 74 poison_pointer(ctx.my_parent); 75 poison_pointer(ctx.my_context_list); 76 poison_pointer(ctx.my_node.my_next_node); 77 poison_pointer(ctx.my_node.my_prev_node); 78 poison_pointer(ctx.my_exception); 79 poison_pointer(ctx.my_itt_caller); 80 81 ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::dead, std::memory_order_release); 82 } 83 84 void task_group_context_impl::initialize(d1::task_group_context& ctx) { 85 ITT_TASK_GROUP(&ctx, ctx.my_name, nullptr); 86 87 ctx.my_node.my_next_node = &ctx.my_node; 88 ctx.my_node.my_prev_node = &ctx.my_node; 89 ctx.my_cpu_ctl_env = 0; 90 ctx.my_cancellation_requested = 0; 91 ctx.my_state.store(0, std::memory_order_relaxed); 92 // Set the created state to bound at the first usage. 93 ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::created, std::memory_order_relaxed); 94 ctx.my_parent = nullptr; 95 ctx.my_context_list = nullptr; 96 ctx.my_exception.store(nullptr, std::memory_order_relaxed); 97 ctx.my_itt_caller = nullptr; 98 99 static_assert(sizeof(d1::cpu_ctl_env) <= sizeof(ctx.my_cpu_ctl_env), "FPU settings storage does not fit to uint64_t"); 100 d1::cpu_ctl_env* ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env; 101 if (ctx.my_traits.fp_settings) 102 ctl->get_env(); 103 } 104 105 void task_group_context_impl::register_with(d1::task_group_context& ctx, thread_data* td) { 106 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 107 __TBB_ASSERT(td, nullptr); 108 ctx.my_context_list = td->my_context_list; 109 110 ctx.my_context_list->push_front(ctx.my_node); 111 } 112 113 void task_group_context_impl::bind_to_impl(d1::task_group_context& ctx, thread_data* td) { 114 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 115 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::locked, "The context can be bound only under the lock."); 116 __TBB_ASSERT(!ctx.my_parent, "Parent is set before initial binding"); 117 118 ctx.my_parent = td->my_task_dispatcher->m_execute_data_ext.context; 119 __TBB_ASSERT(ctx.my_parent, nullptr); 120 121 // Inherit FPU settings only if the context has not captured FPU settings yet. 122 if (!ctx.my_traits.fp_settings) 123 copy_fp_settings(ctx, *ctx.my_parent); 124 125 // Condition below prevents unnecessary thrashing parent context's cache line 126 if (ctx.my_parent->my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) { 127 ctx.my_parent->my_state.store(d1::task_group_context::may_have_children, std::memory_order_relaxed); // full fence is below 128 } 129 if (ctx.my_parent->my_parent) { 130 // Even if this context were made accessible for state change propagation 131 // (by placing store_with_release(td->my_context_list_state.head.my_next, &ctx.my_node) 132 // above), it still could be missed if state propagation from a grand-ancestor 133 // was underway concurrently with binding. 134 // Speculative propagation from the parent together with epoch counters 135 // detecting possibility of such a race allow to avoid taking locks when 136 // there is no contention. 137 138 // Acquire fence is necessary to prevent reordering subsequent speculative 139 // loads of parent state data out of the scope where epoch counters comparison 140 // can reliably validate it. 141 uintptr_t local_count_snapshot = ctx.my_parent->my_context_list->epoch.load(std::memory_order_acquire); 142 // Speculative propagation of parent's state. The speculation will be 143 // validated by the epoch counters check further on. 144 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 145 register_with(ctx, td); // Issues full fence 146 147 // If no state propagation was detected by the following condition, the above 148 // full fence guarantees that the parent had correct state during speculative 149 // propagation before the fence. Otherwise the propagation from parent is 150 // repeated under the lock. 151 if (local_count_snapshot != the_context_state_propagation_epoch.load(std::memory_order_relaxed)) { 152 // Another thread may be propagating state change right now. So resort to lock. 153 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 154 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 155 } 156 } else { 157 register_with(ctx, td); // Issues full fence 158 // As we do not have grand-ancestors, concurrent state propagation (if any) 159 // may originate only from the parent context, and thus it is safe to directly 160 // copy the state from it. 161 ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed); 162 } 163 } 164 165 void task_group_context_impl::bind_to(d1::task_group_context& ctx, thread_data* td) { 166 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 167 d1::task_group_context::lifetime_state state = ctx.my_lifetime_state.load(std::memory_order_acquire); 168 if (state <= d1::task_group_context::lifetime_state::locked) { 169 if (state == d1::task_group_context::lifetime_state::created && 170 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910 171 ((std::atomic<typename std::underlying_type<d1::task_group_context::lifetime_state>::type>&)ctx.my_lifetime_state).compare_exchange_strong( 172 (typename std::underlying_type<d1::task_group_context::lifetime_state>::type&)state, 173 (typename std::underlying_type<d1::task_group_context::lifetime_state>::type)d1::task_group_context::lifetime_state::locked) 174 #else 175 ctx.my_lifetime_state.compare_exchange_strong(state, d1::task_group_context::lifetime_state::locked) 176 #endif 177 ) { 178 // If we are in the outermost task dispatch loop of an external thread, then 179 // there is nothing to bind this context to, and we skip the binding part 180 // treating the context as isolated. 181 __TBB_ASSERT(td->my_task_dispatcher->m_execute_data_ext.context != nullptr, nullptr); 182 d1::task_group_context::lifetime_state release_state{}; 183 if (td->my_task_dispatcher->m_execute_data_ext.context == td->my_arena->my_default_ctx || !ctx.my_traits.bound) { 184 if (!ctx.my_traits.fp_settings) { 185 copy_fp_settings(ctx, *td->my_arena->my_default_ctx); 186 } 187 release_state = d1::task_group_context::lifetime_state::isolated; 188 } else { 189 bind_to_impl(ctx, td); 190 release_state = d1::task_group_context::lifetime_state::bound; 191 } 192 ITT_STACK_CREATE(ctx.my_itt_caller); 193 ctx.my_lifetime_state.store(release_state, std::memory_order_release); 194 } 195 spin_wait_while_eq(ctx.my_lifetime_state, d1::task_group_context::lifetime_state::locked); 196 } 197 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::created, nullptr); 198 __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::locked, nullptr); 199 } 200 201 template <typename T> 202 void task_group_context_impl::propagate_task_group_state(d1::task_group_context& ctx, std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 203 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 204 /* 1. if ((ctx.*mptr_state).load(std::memory_order_relaxed) == new_state): 205 Nothing to do, whether descending from "src" or not, so no need to scan. 206 Hopefully this happens often thanks to earlier invocations. 207 This optimization is enabled by LIFO order in the context lists: 208 - new contexts are bound to the beginning of lists; 209 - descendants are newer than ancestors; 210 - earlier invocations are therefore likely to "paint" long chains. 211 2. if (&ctx != &src): 212 This clause is disjunct from the traversal below, which skips src entirely. 213 Note that src.*mptr_state is not necessarily still equal to new_state (another thread may have changed it again). 214 Such interference is probably not frequent enough to aim for optimisation by writing new_state again (to make the other thread back down). 215 Letting the other thread prevail may also be fairer. 216 */ 217 if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state && &ctx != &src) { 218 for (d1::task_group_context* ancestor = ctx.my_parent; ancestor != nullptr; ancestor = ancestor->my_parent) { 219 if (ancestor == &src) { 220 for (d1::task_group_context* c = &ctx; c != ancestor; c = c->my_parent) 221 (c->*mptr_state).store(new_state, std::memory_order_relaxed); 222 break; 223 } 224 } 225 } 226 } 227 228 template <typename T> 229 void thread_data::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 230 mutex::scoped_lock lock(my_context_list->m_mutex); 231 // Acquire fence is necessary to ensure that the subsequent node->my_next load 232 // returned the correct value in case it was just inserted in another thread. 233 // The fence also ensures visibility of the correct ctx.my_parent value. 234 for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) { 235 d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it)); 236 if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state) 237 task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state); 238 } 239 // Sync up local propagation epoch with the global one. Release fence prevents 240 // reordering of possible store to *mptr_state after the sync point. 241 my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release); 242 } 243 244 template <typename T> 245 bool market::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) { 246 if (src.my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) 247 return true; 248 // The whole propagation algorithm is under the lock in order to ensure correctness 249 // in case of concurrent state changes at the different levels of the context tree. 250 // See comment at the bottom of scheduler.cpp 251 context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex); 252 if ((src.*mptr_state).load(std::memory_order_relaxed) != new_state) 253 // Another thread has concurrently changed the state. Back down. 254 return false; 255 // Advance global state propagation epoch 256 ++the_context_state_propagation_epoch; 257 // Propagate to all workers and external threads and sync up their local epochs with the global one 258 unsigned num_workers = my_first_unused_worker_idx; 259 for (unsigned i = 0; i < num_workers; ++i) { 260 thread_data* td = my_workers[i]; 261 // If the worker is only about to be registered, skip it. 262 if (td) 263 td->propagate_task_group_state(mptr_state, src, new_state); 264 } 265 // Propagate to all external threads 266 // The whole propagation sequence is locked, thus no contention is expected 267 for (thread_data_list_type::iterator it = my_masters.begin(); it != my_masters.end(); it++) 268 it->propagate_task_group_state(mptr_state, src, new_state); 269 return true; 270 } 271 272 bool task_group_context_impl::cancel_group_execution(d1::task_group_context& ctx) { 273 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 274 __TBB_ASSERT(ctx.my_cancellation_requested.load(std::memory_order_relaxed) <= 1, "The cancellation state can be either 0 or 1"); 275 if (ctx.my_cancellation_requested.load(std::memory_order_relaxed) || ctx.my_cancellation_requested.exchange(1)) { 276 // This task group and any descendants have already been canceled. 277 // (A newly added descendant would inherit its parent's ctx.my_cancellation_requested, 278 // not missing out on any cancellation still being propagated, and a context cannot be uncanceled.) 279 return false; 280 } 281 governor::get_thread_data()->my_arena->my_market->propagate_task_group_state(&d1::task_group_context::my_cancellation_requested, ctx, uint32_t(1)); 282 return true; 283 } 284 285 bool task_group_context_impl::is_group_execution_cancelled(const d1::task_group_context& ctx) { 286 return ctx.my_cancellation_requested.load(std::memory_order_relaxed) != 0; 287 } 288 289 // IMPORTANT: It is assumed that this method is not used concurrently! 290 void task_group_context_impl::reset(d1::task_group_context& ctx) { 291 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 292 //! TODO: Add assertion that this context does not have children 293 // No fences are necessary since this context can be accessed from another thread 294 // only after stealing happened (which means necessary fences were used). 295 296 auto exception = ctx.my_exception.load(std::memory_order_relaxed); 297 if (exception) { 298 exception->destroy(); 299 ctx.my_exception.store(nullptr, std::memory_order_relaxed); 300 } 301 ctx.my_cancellation_requested = 0; 302 } 303 304 // IMPORTANT: It is assumed that this method is not used concurrently! 305 void task_group_context_impl::capture_fp_settings(d1::task_group_context& ctx) { 306 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 307 //! TODO: Add assertion that this context does not have children 308 // No fences are necessary since this context can be accessed from another thread 309 // only after stealing happened (which means necessary fences were used). 310 d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env); 311 if (!ctx.my_traits.fp_settings) { 312 ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env; 313 ctx.my_traits.fp_settings = true; 314 } 315 ctl->get_env(); 316 } 317 318 void task_group_context_impl::copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src) { 319 __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr); 320 __TBB_ASSERT(!ctx.my_traits.fp_settings, "The context already has FPU settings."); 321 __TBB_ASSERT(src.my_traits.fp_settings, "The source context does not have FPU settings."); 322 323 const d1::cpu_ctl_env* src_ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&src.my_cpu_ctl_env); 324 new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env(*src_ctl); 325 ctx.my_traits.fp_settings = true; 326 } 327 328 /* 329 Comments: 330 331 1. The premise of the cancellation support implementation is that cancellations are 332 not part of the hot path of the program execution. Therefore all changes in its 333 implementation in order to reduce the overhead of the cancellation control flow 334 should be done only in ways that do not increase overhead of the normal execution. 335 336 In general, contexts are used by all threads and their descendants are created in 337 different threads as well. In order to minimize impact of the cross-thread tree 338 maintenance (first of all because of the synchronization), the tree of contexts 339 is split into pieces, each of which is handled by a single thread. Such pieces 340 are represented as lists of contexts, members of which are contexts that were 341 bound to their parents in the given thread. 342 343 The context tree maintenance and cancellation propagation algorithms are designed 344 in such a manner that cross-thread access to a context list will take place only 345 when cancellation signal is sent (by user or when an exception happens), and 346 synchronization is necessary only then. Thus the normal execution flow (without 347 exceptions and cancellation) remains free from any synchronization done on 348 behalf of exception handling and cancellation support. 349 350 2. Consider parallel cancellations at the different levels of the context tree: 351 352 Ctx1 <- Cancelled by Thread1 |- Thread2 started processing 353 | | 354 Ctx2 |- Thread1 started processing 355 | T1 |- Thread2 finishes and syncs up local counters 356 Ctx3 <- Cancelled by Thread2 | 357 | |- Ctx5 is bound to Ctx2 358 Ctx4 | 359 T2 |- Thread1 reaches Ctx2 360 361 Thread-propagator of each cancellation increments global counter. However the thread 362 propagating the cancellation from the outermost context (Thread1) may be the last 363 to finish. Which means that the local counters may be synchronized earlier (by Thread2, 364 at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context 365 (Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only 366 (Ctx2) may result in cancellation request being lost. 367 368 This issue is solved by doing the whole propagation under the lock. 369 370 If we need more concurrency while processing parallel cancellations, we could try 371 the following modification of the propagation algorithm: 372 373 advance global counter and remember it 374 for each thread: 375 scan thread's list of contexts 376 for each thread: 377 sync up its local counter only if the global counter has not been changed 378 379 However this version of the algorithm requires more analysis and verification. 380 */ 381 382 void __TBB_EXPORTED_FUNC initialize(d1::task_group_context& ctx) { 383 task_group_context_impl::initialize(ctx); 384 } 385 void __TBB_EXPORTED_FUNC destroy(d1::task_group_context& ctx) { 386 task_group_context_impl::destroy(ctx); 387 } 388 void __TBB_EXPORTED_FUNC reset(d1::task_group_context& ctx) { 389 task_group_context_impl::reset(ctx); 390 } 391 bool __TBB_EXPORTED_FUNC cancel_group_execution(d1::task_group_context& ctx) { 392 return task_group_context_impl::cancel_group_execution(ctx); 393 } 394 bool __TBB_EXPORTED_FUNC is_group_execution_cancelled(d1::task_group_context& ctx) { 395 return task_group_context_impl::is_group_execution_cancelled(ctx); 396 } 397 void __TBB_EXPORTED_FUNC capture_fp_settings(d1::task_group_context& ctx) { 398 task_group_context_impl::capture_fp_settings(ctx); 399 } 400 401 } // namespace r1 402 } // namespace detail 403 } // namespace tbb 404 405