xref: /oneTBB/src/tbb/task_group_context.cpp (revision 155a9af9)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/tbb_allocator.h"
19 #include "oneapi/tbb/task_group.h"
20 #include "governor.h"
21 #include "thread_data.h"
22 #include "scheduler_common.h"
23 #include "itt_notify.h"
24 #include "task_dispatcher.h"
25 
26 #include <type_traits>
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 //------------------------------------------------------------------------
33 // tbb_exception_ptr
34 //------------------------------------------------------------------------
35 tbb_exception_ptr* tbb_exception_ptr::allocate() noexcept {
36     tbb_exception_ptr* eptr = (tbb_exception_ptr*)allocate_memory(sizeof(tbb_exception_ptr));
37     return eptr ? new (eptr) tbb_exception_ptr(std::current_exception()) : nullptr;
38 }
39 
40 void tbb_exception_ptr::destroy() noexcept {
41     this->~tbb_exception_ptr();
42     deallocate_memory(this);
43 }
44 
45 void tbb_exception_ptr::throw_self() {
46     if (governor::rethrow_exception_broken()) fix_broken_rethrow();
47     std::rethrow_exception(my_ptr);
48 }
49 
50 //------------------------------------------------------------------------
51 // task_group_context
52 //------------------------------------------------------------------------
53 
54 void task_group_context_impl::destroy(d1::task_group_context& ctx) {
55     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
56 
57     if (ctx.my_context_list != nullptr) {
58         __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::bound, nullptr);
59         // The owner can be destroyed at any moment. Access the associate data with caution.
60         ctx.my_context_list->remove(ctx.my_node);
61     }
62     d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env);
63 #if _MSC_VER && _MSC_VER <= 1900 && !__INTEL_COMPILER
64     suppress_unused_warning(ctl);
65 #endif
66     ctl->~cpu_ctl_env();
67 
68     auto exception = ctx.my_exception.load(std::memory_order_relaxed);
69     if (exception) {
70         exception->destroy();
71     }
72     ITT_STACK_DESTROY(ctx.my_itt_caller);
73 
74     poison_pointer(ctx.my_parent);
75     poison_pointer(ctx.my_context_list);
76     poison_pointer(ctx.my_node.my_next_node);
77     poison_pointer(ctx.my_node.my_prev_node);
78     poison_pointer(ctx.my_exception);
79     poison_pointer(ctx.my_itt_caller);
80 
81     ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::dead, std::memory_order_release);
82 }
83 
84 void task_group_context_impl::initialize(d1::task_group_context& ctx) {
85     ITT_TASK_GROUP(&ctx, ctx.my_name, nullptr);
86 
87     ctx.my_node.my_next_node = &ctx.my_node;
88     ctx.my_node.my_prev_node = &ctx.my_node;
89     ctx.my_cpu_ctl_env = 0;
90     ctx.my_cancellation_requested = 0;
91     ctx.my_state.store(0, std::memory_order_relaxed);
92     // Set the created state to bound at the first usage.
93     ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::created, std::memory_order_relaxed);
94     ctx.my_parent = nullptr;
95     ctx.my_context_list = nullptr;
96     ctx.my_exception.store(nullptr, std::memory_order_relaxed);
97     ctx.my_itt_caller = nullptr;
98 
99     static_assert(sizeof(d1::cpu_ctl_env) <= sizeof(ctx.my_cpu_ctl_env), "FPU settings storage does not fit to uint64_t");
100     d1::cpu_ctl_env* ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env;
101     if (ctx.my_traits.fp_settings)
102         ctl->get_env();
103 }
104 
105 void task_group_context_impl::register_with(d1::task_group_context& ctx, thread_data* td) {
106     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
107     __TBB_ASSERT(td, nullptr);
108     ctx.my_context_list = td->my_context_list;
109 
110     ctx.my_context_list->push_front(ctx.my_node);
111 }
112 
113 void task_group_context_impl::bind_to_impl(d1::task_group_context& ctx, thread_data* td) {
114     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
115     __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::locked, "The context can be bound only under the lock.");
116     __TBB_ASSERT(!ctx.my_parent, "Parent is set before initial binding");
117 
118     ctx.my_parent = td->my_task_dispatcher->m_execute_data_ext.context;
119     __TBB_ASSERT(ctx.my_parent, nullptr);
120 
121     // Inherit FPU settings only if the context has not captured FPU settings yet.
122     if (!ctx.my_traits.fp_settings)
123         copy_fp_settings(ctx, *ctx.my_parent);
124 
125     // Condition below prevents unnecessary thrashing parent context's cache line
126     if (ctx.my_parent->my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) {
127         ctx.my_parent->my_state.store(d1::task_group_context::may_have_children, std::memory_order_relaxed); // full fence is below
128     }
129     if (ctx.my_parent->my_parent) {
130         // Even if this context were made accessible for state change propagation
131         // (by placing store_with_release(td->my_context_list_state.head.my_next, &ctx.my_node)
132         // above), it still could be missed if state propagation from a grand-ancestor
133         // was underway concurrently with binding.
134         // Speculative propagation from the parent together with epoch counters
135         // detecting possibility of such a race allow to avoid taking locks when
136         // there is no contention.
137 
138         // Acquire fence is necessary to prevent reordering subsequent speculative
139         // loads of parent state data out of the scope where epoch counters comparison
140         // can reliably validate it.
141         uintptr_t local_count_snapshot = ctx.my_parent->my_context_list->epoch.load(std::memory_order_acquire);
142         // Speculative propagation of parent's state. The speculation will be
143         // validated by the epoch counters check further on.
144         ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
145         register_with(ctx, td); // Issues full fence
146 
147         // If no state propagation was detected by the following condition, the above
148         // full fence guarantees that the parent had correct state during speculative
149         // propagation before the fence. Otherwise the propagation from parent is
150         // repeated under the lock.
151         if (local_count_snapshot != the_context_state_propagation_epoch.load(std::memory_order_relaxed)) {
152             // Another thread may be propagating state change right now. So resort to lock.
153             context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
154             ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
155         }
156     } else {
157         register_with(ctx, td); // Issues full fence
158         // As we do not have grand-ancestors, concurrent state propagation (if any)
159         // may originate only from the parent context, and thus it is safe to directly
160         // copy the state from it.
161         ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
162     }
163 }
164 
165 void task_group_context_impl::bind_to(d1::task_group_context& ctx, thread_data* td) {
166     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
167     d1::task_group_context::lifetime_state state = ctx.my_lifetime_state.load(std::memory_order_acquire);
168     if (state <= d1::task_group_context::lifetime_state::locked) {
169         if (state == d1::task_group_context::lifetime_state::created &&
170 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910
171             ((std::atomic<typename std::underlying_type<d1::task_group_context::lifetime_state>::type>&)ctx.my_lifetime_state).compare_exchange_strong(
172                 (typename std::underlying_type<d1::task_group_context::lifetime_state>::type&)state,
173                 (typename std::underlying_type<d1::task_group_context::lifetime_state>::type)d1::task_group_context::lifetime_state::locked)
174 #else
175             ctx.my_lifetime_state.compare_exchange_strong(state, d1::task_group_context::lifetime_state::locked)
176 #endif
177             ) {
178             // If we are in the outermost task dispatch loop of an external thread, then
179             // there is nothing to bind this context to, and we skip the binding part
180             // treating the context as isolated.
181             __TBB_ASSERT(td->my_task_dispatcher->m_execute_data_ext.context != nullptr, nullptr);
182             d1::task_group_context::lifetime_state release_state{};
183             if (td->my_task_dispatcher->m_execute_data_ext.context == td->my_arena->my_default_ctx || !ctx.my_traits.bound) {
184                 if (!ctx.my_traits.fp_settings) {
185                     copy_fp_settings(ctx, *td->my_arena->my_default_ctx);
186                 }
187                 release_state = d1::task_group_context::lifetime_state::isolated;
188             } else {
189                 bind_to_impl(ctx, td);
190                 release_state = d1::task_group_context::lifetime_state::bound;
191             }
192             ITT_STACK_CREATE(ctx.my_itt_caller);
193             ctx.my_lifetime_state.store(release_state, std::memory_order_release);
194         }
195         spin_wait_while_eq(ctx.my_lifetime_state, d1::task_group_context::lifetime_state::locked);
196     }
197     __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::created, nullptr);
198     __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::locked, nullptr);
199 }
200 
201 template <typename T>
202 void task_group_context_impl::propagate_task_group_state(d1::task_group_context& ctx, std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) {
203     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
204     /*  1. if ((ctx.*mptr_state).load(std::memory_order_relaxed) == new_state):
205             Nothing to do, whether descending from "src" or not, so no need to scan.
206             Hopefully this happens often thanks to earlier invocations.
207             This optimization is enabled by LIFO order in the context lists:
208                 - new contexts are bound to the beginning of lists;
209                 - descendants are newer than ancestors;
210                 - earlier invocations are therefore likely to "paint" long chains.
211         2. if (&ctx != &src):
212             This clause is disjunct from the traversal below, which skips src entirely.
213             Note that src.*mptr_state is not necessarily still equal to new_state (another thread may have changed it again).
214             Such interference is probably not frequent enough to aim for optimisation by writing new_state again (to make the other thread back down).
215             Letting the other thread prevail may also be fairer.
216     */
217     if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state && &ctx != &src) {
218         for (d1::task_group_context* ancestor = ctx.my_parent; ancestor != nullptr; ancestor = ancestor->my_parent) {
219             if (ancestor == &src) {
220                 for (d1::task_group_context* c = &ctx; c != ancestor; c = c->my_parent)
221                     (c->*mptr_state).store(new_state, std::memory_order_relaxed);
222                 break;
223             }
224         }
225     }
226 }
227 
228 template <typename T>
229 void thread_data::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) {
230     mutex::scoped_lock lock(my_context_list->m_mutex);
231     // Acquire fence is necessary to ensure that the subsequent node->my_next load
232     // returned the correct value in case it was just inserted in another thread.
233     // The fence also ensures visibility of the correct ctx.my_parent value.
234     for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) {
235         d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it));
236         if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state)
237             task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state);
238     }
239     // Sync up local propagation epoch with the global one. Release fence prevents
240     // reordering of possible store to *mptr_state after the sync point.
241     my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release);
242 }
243 
244 template <typename T>
245 bool market::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) {
246     if (src.my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children)
247         return true;
248     // The whole propagation algorithm is under the lock in order to ensure correctness
249     // in case of concurrent state changes at the different levels of the context tree.
250     // See comment at the bottom of scheduler.cpp
251     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
252     if ((src.*mptr_state).load(std::memory_order_relaxed) != new_state)
253         // Another thread has concurrently changed the state. Back down.
254         return false;
255     // Advance global state propagation epoch
256     ++the_context_state_propagation_epoch;
257     // Propagate to all workers and external threads and sync up their local epochs with the global one
258     unsigned num_workers = my_first_unused_worker_idx;
259     for (unsigned i = 0; i < num_workers; ++i) {
260         thread_data* td = my_workers[i].load(std::memory_order_acquire);
261         // If the worker is only about to be registered, skip it.
262         if (td)
263             td->propagate_task_group_state(mptr_state, src, new_state);
264     }
265     // Propagate to all external threads
266     // The whole propagation sequence is locked, thus no contention is expected
267     for (thread_data_list_type::iterator it = my_masters.begin(); it != my_masters.end(); it++)
268         it->propagate_task_group_state(mptr_state, src, new_state);
269     return true;
270 }
271 
272 bool task_group_context_impl::cancel_group_execution(d1::task_group_context& ctx) {
273     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
274     __TBB_ASSERT(ctx.my_cancellation_requested.load(std::memory_order_relaxed) <= 1, "The cancellation state can be either 0 or 1");
275     if (ctx.my_cancellation_requested.load(std::memory_order_relaxed) || ctx.my_cancellation_requested.exchange(1)) {
276         // This task group and any descendants have already been canceled.
277         // (A newly added descendant would inherit its parent's ctx.my_cancellation_requested,
278         // not missing out on any cancellation still being propagated, and a context cannot be uncanceled.)
279         return false;
280     }
281     governor::get_thread_data()->my_arena->my_market->propagate_task_group_state(&d1::task_group_context::my_cancellation_requested, ctx, uint32_t(1));
282     return true;
283 }
284 
285 bool task_group_context_impl::is_group_execution_cancelled(const d1::task_group_context& ctx) {
286     return ctx.my_cancellation_requested.load(std::memory_order_relaxed) != 0;
287 }
288 
289 // IMPORTANT: It is assumed that this method is not used concurrently!
290 void task_group_context_impl::reset(d1::task_group_context& ctx) {
291     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
292     //! TODO: Add assertion that this context does not have children
293     // No fences are necessary since this context can be accessed from another thread
294     // only after stealing happened (which means necessary fences were used).
295 
296     auto exception = ctx.my_exception.load(std::memory_order_relaxed);
297     if (exception) {
298         exception->destroy();
299         ctx.my_exception.store(nullptr, std::memory_order_relaxed);
300     }
301     ctx.my_cancellation_requested = 0;
302 }
303 
304 // IMPORTANT: It is assumed that this method is not used concurrently!
305 void task_group_context_impl::capture_fp_settings(d1::task_group_context& ctx) {
306     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
307     //! TODO: Add assertion that this context does not have children
308     // No fences are necessary since this context can be accessed from another thread
309     // only after stealing happened (which means necessary fences were used).
310     d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env);
311     if (!ctx.my_traits.fp_settings) {
312         ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env;
313         ctx.my_traits.fp_settings = true;
314     }
315     ctl->get_env();
316 }
317 
318 void task_group_context_impl::copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src) {
319     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
320     __TBB_ASSERT(!ctx.my_traits.fp_settings, "The context already has FPU settings.");
321     __TBB_ASSERT(src.my_traits.fp_settings, "The source context does not have FPU settings.");
322 
323     const d1::cpu_ctl_env* src_ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&src.my_cpu_ctl_env);
324     new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env(*src_ctl);
325     ctx.my_traits.fp_settings = true;
326 }
327 
328 /*
329     Comments:
330 
331 1.  The premise of the cancellation support implementation is that cancellations are
332     not part of the hot path of the program execution. Therefore all changes in its
333     implementation in order to reduce the overhead of the cancellation control flow
334     should be done only in ways that do not increase overhead of the normal execution.
335 
336     In general, contexts are used by all threads and their descendants are created in
337     different threads as well. In order to minimize impact of the cross-thread tree
338     maintenance (first of all because of the synchronization), the tree of contexts
339     is split into pieces, each of which is handled by a single thread. Such pieces
340     are represented as lists of contexts, members of which are contexts that were
341     bound to their parents in the given thread.
342 
343     The context tree maintenance and cancellation propagation algorithms are designed
344     in such a manner that cross-thread access to a context list will take place only
345     when cancellation signal is sent (by user or when an exception happens), and
346     synchronization is necessary only then. Thus the normal execution flow (without
347     exceptions and cancellation) remains free from any synchronization done on
348     behalf of exception handling and cancellation support.
349 
350 2.  Consider parallel cancellations at the different levels of the context tree:
351 
352         Ctx1 <- Cancelled by Thread1            |- Thread2 started processing
353          |                                      |
354         Ctx2                                    |- Thread1 started processing
355          |                                   T1 |- Thread2 finishes and syncs up local counters
356         Ctx3 <- Cancelled by Thread2            |
357          |                                      |- Ctx5 is bound to Ctx2
358         Ctx4                                    |
359                                              T2 |- Thread1 reaches Ctx2
360 
361     Thread-propagator of each cancellation increments global counter. However the thread
362     propagating the cancellation from the outermost context (Thread1) may be the last
363     to finish. Which means that the local counters may be synchronized earlier (by Thread2,
364     at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context
365     (Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only
366     (Ctx2) may result in cancellation request being lost.
367 
368     This issue is solved by doing the whole propagation under the lock.
369 
370     If we need more concurrency while processing parallel cancellations, we could try
371     the following modification of the propagation algorithm:
372 
373     advance global counter and remember it
374     for each thread:
375         scan thread's list of contexts
376     for each thread:
377         sync up its local counter only if the global counter has not been changed
378 
379     However this version of the algorithm requires more analysis and verification.
380 */
381 
382 void __TBB_EXPORTED_FUNC initialize(d1::task_group_context& ctx) {
383     task_group_context_impl::initialize(ctx);
384 }
385 void __TBB_EXPORTED_FUNC destroy(d1::task_group_context& ctx) {
386     task_group_context_impl::destroy(ctx);
387 }
388 void __TBB_EXPORTED_FUNC reset(d1::task_group_context& ctx) {
389     task_group_context_impl::reset(ctx);
390 }
391 bool __TBB_EXPORTED_FUNC cancel_group_execution(d1::task_group_context& ctx) {
392     return task_group_context_impl::cancel_group_execution(ctx);
393 }
394 bool __TBB_EXPORTED_FUNC is_group_execution_cancelled(d1::task_group_context& ctx) {
395     return task_group_context_impl::is_group_execution_cancelled(ctx);
396 }
397 void __TBB_EXPORTED_FUNC capture_fp_settings(d1::task_group_context& ctx) {
398     task_group_context_impl::capture_fp_settings(ctx);
399 }
400 
401 } // namespace r1
402 } // namespace detail
403 } // namespace tbb
404 
405