xref: /oneTBB/src/tbb/task_group_context.cpp (revision 10e75e44)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/tbb_allocator.h"
19 #include "oneapi/tbb/task_group.h"
20 #include "governor.h"
21 #include "thread_data.h"
22 #include "scheduler_common.h"
23 #include "itt_notify.h"
24 #include "task_dispatcher.h"
25 
26 #include <type_traits>
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 //------------------------------------------------------------------------
33 // tbb_exception_ptr
34 //------------------------------------------------------------------------
35 tbb_exception_ptr* tbb_exception_ptr::allocate() noexcept {
36     tbb_exception_ptr* eptr = (tbb_exception_ptr*)allocate_memory(sizeof(tbb_exception_ptr));
37     return eptr ? new (eptr) tbb_exception_ptr(std::current_exception()) : nullptr;
38 }
39 
40 void tbb_exception_ptr::destroy() noexcept {
41     this->~tbb_exception_ptr();
42     deallocate_memory(this);
43 }
44 
45 void tbb_exception_ptr::throw_self() {
46     if (governor::rethrow_exception_broken()) fix_broken_rethrow();
47     std::rethrow_exception(my_ptr);
48 }
49 
50 //------------------------------------------------------------------------
51 // task_group_context
52 //------------------------------------------------------------------------
53 
54 void task_group_context_impl::destroy(d1::task_group_context& ctx) {
55     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
56 
57     if (ctx.my_context_list != nullptr) {
58         __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) == d1::task_group_context::state::bound, nullptr);
59         // The owner can be destroyed at any moment. Access the associate data with caution.
60         ctx.my_context_list->remove(ctx.my_node);
61     }
62     d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env);
63 #if _MSC_VER && _MSC_VER <= 1900 && !__INTEL_COMPILER
64     suppress_unused_warning(ctl);
65 #endif
66     ctl->~cpu_ctl_env();
67 
68     auto exception = ctx.my_exception.load(std::memory_order_relaxed);
69     if (exception) {
70         exception->destroy();
71     }
72     ITT_STACK_DESTROY(ctx.my_itt_caller);
73 
74     poison_pointer(ctx.my_parent);
75     poison_pointer(ctx.my_context_list);
76     poison_pointer(ctx.my_node.my_next_node);
77     poison_pointer(ctx.my_node.my_prev_node);
78     poison_pointer(ctx.my_exception);
79     poison_pointer(ctx.my_itt_caller);
80 
81     ctx.my_state.store(d1::task_group_context::state::dead, std::memory_order_release);
82 }
83 
84 void task_group_context_impl::initialize(d1::task_group_context& ctx) {
85     ITT_TASK_GROUP(&ctx, ctx.my_name, nullptr);
86 
87     ctx.my_node.my_next_node = &ctx.my_node;
88     ctx.my_node.my_prev_node = &ctx.my_node;
89     ctx.my_cpu_ctl_env = 0;
90     ctx.my_cancellation_requested = 0;
91     ctx.my_may_have_children.store(0, std::memory_order_relaxed);
92     // Set the created state to bound at the first usage.
93     ctx.my_state.store(d1::task_group_context::state::created, std::memory_order_relaxed);
94     ctx.my_parent = nullptr;
95     ctx.my_context_list = nullptr;
96     ctx.my_exception.store(nullptr, std::memory_order_relaxed);
97     ctx.my_itt_caller = nullptr;
98 
99     static_assert(sizeof(d1::cpu_ctl_env) <= sizeof(ctx.my_cpu_ctl_env), "FPU settings storage does not fit to uint64_t");
100     d1::cpu_ctl_env* ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env;
101     if (ctx.my_traits.fp_settings)
102         ctl->get_env();
103 }
104 
105 void task_group_context_impl::register_with(d1::task_group_context& ctx, thread_data* td) {
106     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
107     __TBB_ASSERT(td, nullptr);
108     ctx.my_context_list = td->my_context_list;
109 
110     ctx.my_context_list->push_front(ctx.my_node);
111 }
112 
113 void task_group_context_impl::bind_to_impl(d1::task_group_context& ctx, thread_data* td) {
114     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
115     __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) == d1::task_group_context::state::locked, "The context can be bound only under the lock.");
116     __TBB_ASSERT(!ctx.my_parent, "Parent is set before initial binding");
117 
118     ctx.my_parent = td->my_task_dispatcher->m_execute_data_ext.context;
119     __TBB_ASSERT(ctx.my_parent, nullptr);
120 
121     // Inherit FPU settings only if the context has not captured FPU settings yet.
122     if (!ctx.my_traits.fp_settings)
123         copy_fp_settings(ctx, *ctx.my_parent);
124 
125     // Condition below prevents unnecessary thrashing parent context's cache line
126     if (ctx.my_parent->my_may_have_children.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) {
127         ctx.my_parent->my_may_have_children.store(d1::task_group_context::may_have_children, std::memory_order_relaxed); // full fence is below
128     }
129     if (ctx.my_parent->my_parent) {
130         // Even if this context were made accessible for state change propagation
131         // (by placing store_with_release(td->my_context_list_state.head.my_next, &ctx.my_node)
132         // above), it still could be missed if state propagation from a grand-ancestor
133         // was underway concurrently with binding.
134         // Speculative propagation from the parent together with epoch counters
135         // detecting possibility of such a race allow to avoid taking locks when
136         // there is no contention.
137 
138         // Acquire fence is necessary to prevent reordering subsequent speculative
139         // loads of parent state data out of the scope where epoch counters comparison
140         // can reliably validate it.
141         uintptr_t local_count_snapshot = ctx.my_parent->my_context_list->epoch.load(std::memory_order_acquire);
142         // Speculative propagation of parent's state. The speculation will be
143         // validated by the epoch counters check further on.
144         ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
145         register_with(ctx, td); // Issues full fence
146 
147         // If no state propagation was detected by the following condition, the above
148         // full fence guarantees that the parent had correct state during speculative
149         // propagation before the fence. Otherwise the propagation from parent is
150         // repeated under the lock.
151         if (local_count_snapshot != the_context_state_propagation_epoch.load(std::memory_order_relaxed)) {
152             // Another thread may be propagating state change right now. So resort to lock.
153             context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
154             ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
155         }
156     } else {
157         register_with(ctx, td); // Issues full fence
158         // As we do not have grand-ancestors, concurrent state propagation (if any)
159         // may originate only from the parent context, and thus it is safe to directly
160         // copy the state from it.
161         ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
162     }
163 }
164 
165 void task_group_context_impl::bind_to(d1::task_group_context& ctx, thread_data* td) {
166     d1::task_group_context::state state = ctx.my_state.load(std::memory_order_acquire);
167     if (state <= d1::task_group_context::state::locked) {
168         if (state == d1::task_group_context::state::created &&
169 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910
170             ((std::atomic<typename std::underlying_type<d1::task_group_context::state>::type>&)ctx.my_state).compare_exchange_strong(
171                 (typename std::underlying_type<d1::task_group_context::state>::type&)state,
172                 (typename std::underlying_type<d1::task_group_context::state>::type)d1::task_group_context::state::locked)
173 #else
174             ctx.my_state.compare_exchange_strong(state, d1::task_group_context::state::locked)
175 #endif
176             ) {
177             // If we are in the outermost task dispatch loop of an external thread, then
178             // there is nothing to bind this context to, and we skip the binding part
179             // treating the context as isolated.
180             __TBB_ASSERT(td->my_task_dispatcher->m_execute_data_ext.context != nullptr, nullptr);
181             d1::task_group_context::state release_state{};
182             if (td->my_task_dispatcher->m_execute_data_ext.context == td->my_arena->my_default_ctx || !ctx.my_traits.bound) {
183                 if (!ctx.my_traits.fp_settings) {
184                     copy_fp_settings(ctx, *td->my_arena->my_default_ctx);
185                 }
186                 release_state = d1::task_group_context::state::isolated;
187             } else {
188                 bind_to_impl(ctx, td);
189                 release_state = d1::task_group_context::state::bound;
190             }
191             ITT_STACK_CREATE(ctx.my_itt_caller);
192             ctx.my_state.store(release_state, std::memory_order_release);
193         }
194         spin_wait_while_eq(ctx.my_state, d1::task_group_context::state::locked);
195     }
196     __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) != d1::task_group_context::state::created, nullptr);
197     __TBB_ASSERT(ctx.my_state.load(std::memory_order_relaxed) != d1::task_group_context::state::locked, nullptr);
198 }
199 
200 void task_group_context_impl::propagate_task_group_state(d1::task_group_context& ctx, std::atomic<std::uint32_t> d1::task_group_context::* mptr_state, d1::task_group_context& src, std::uint32_t new_state) {
201     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
202     /*  1. if ((ctx.*mptr_state).load(std::memory_order_relaxed) == new_state):
203             Nothing to do, whether descending from "src" or not, so no need to scan.
204             Hopefully this happens often thanks to earlier invocations.
205             This optimization is enabled by LIFO order in the context lists:
206                 - new contexts are bound to the beginning of lists;
207                 - descendants are newer than ancestors;
208                 - earlier invocations are therefore likely to "paint" long chains.
209         2. if (&ctx != &src):
210             This clause is disjunct from the traversal below, which skips src entirely.
211             Note that src.*mptr_state is not necessarily still equal to new_state (another thread may have changed it again).
212             Such interference is probably not frequent enough to aim for optimisation by writing new_state again (to make the other thread back down).
213             Letting the other thread prevail may also be fairer.
214     */
215     if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state && &ctx != &src) {
216         for (d1::task_group_context* ancestor = ctx.my_parent; ancestor != nullptr; ancestor = ancestor->my_parent) {
217             if (ancestor == &src) {
218                 for (d1::task_group_context* c = &ctx; c != ancestor; c = c->my_parent)
219                     (c->*mptr_state).store(new_state, std::memory_order_relaxed);
220                 break;
221             }
222         }
223     }
224 }
225 
226 bool task_group_context_impl::cancel_group_execution(d1::task_group_context& ctx) {
227     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
228     __TBB_ASSERT(ctx.my_cancellation_requested.load(std::memory_order_relaxed) <= 1, "The cancellation state can be either 0 or 1");
229     if (ctx.my_cancellation_requested.load(std::memory_order_relaxed) || ctx.my_cancellation_requested.exchange(1)) {
230         // This task group and any descendants have already been canceled.
231         // (A newly added descendant would inherit its parent's ctx.my_cancellation_requested,
232         // not missing out on any cancellation still being propagated, and a context cannot be uncanceled.)
233         return false;
234     }
235     governor::get_thread_data()->my_arena->my_threading_control->propagate_task_group_state(&d1::task_group_context::my_cancellation_requested, ctx, uint32_t(1));
236     return true;
237 }
238 
239 bool task_group_context_impl::is_group_execution_cancelled(const d1::task_group_context& ctx) {
240     return ctx.my_cancellation_requested.load(std::memory_order_relaxed) != 0;
241 }
242 
243 // IMPORTANT: It is assumed that this method is not used concurrently!
244 void task_group_context_impl::reset(d1::task_group_context& ctx) {
245     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
246     //! TODO: Add assertion that this context does not have children
247     // No fences are necessary since this context can be accessed from another thread
248     // only after stealing happened (which means necessary fences were used).
249 
250     auto exception = ctx.my_exception.load(std::memory_order_relaxed);
251     if (exception) {
252         exception->destroy();
253         ctx.my_exception.store(nullptr, std::memory_order_relaxed);
254     }
255     ctx.my_cancellation_requested = 0;
256 }
257 
258 // IMPORTANT: It is assumed that this method is not used concurrently!
259 void task_group_context_impl::capture_fp_settings(d1::task_group_context& ctx) {
260     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
261     //! TODO: Add assertion that this context does not have children
262     // No fences are necessary since this context can be accessed from another thread
263     // only after stealing happened (which means necessary fences were used).
264     d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env);
265     if (!ctx.my_traits.fp_settings) {
266         ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env;
267         ctx.my_traits.fp_settings = true;
268     }
269     ctl->get_env();
270 }
271 
272 void task_group_context_impl::copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src) {
273     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
274     __TBB_ASSERT(!ctx.my_traits.fp_settings, "The context already has FPU settings.");
275     __TBB_ASSERT(src.my_traits.fp_settings, "The source context does not have FPU settings.");
276 
277     const d1::cpu_ctl_env* src_ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&src.my_cpu_ctl_env);
278     new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env(*src_ctl);
279     ctx.my_traits.fp_settings = true;
280 }
281 
282 /*
283     Comments:
284 
285 1.  The premise of the cancellation support implementation is that cancellations are
286     not part of the hot path of the program execution. Therefore all changes in its
287     implementation in order to reduce the overhead of the cancellation control flow
288     should be done only in ways that do not increase overhead of the normal execution.
289 
290     In general, contexts are used by all threads and their descendants are created in
291     different threads as well. In order to minimize impact of the cross-thread tree
292     maintenance (first of all because of the synchronization), the tree of contexts
293     is split into pieces, each of which is handled by a single thread. Such pieces
294     are represented as lists of contexts, members of which are contexts that were
295     bound to their parents in the given thread.
296 
297     The context tree maintenance and cancellation propagation algorithms are designed
298     in such a manner that cross-thread access to a context list will take place only
299     when cancellation signal is sent (by user or when an exception happens), and
300     synchronization is necessary only then. Thus the normal execution flow (without
301     exceptions and cancellation) remains free from any synchronization done on
302     behalf of exception handling and cancellation support.
303 
304 2.  Consider parallel cancellations at the different levels of the context tree:
305 
306         Ctx1 <- Cancelled by Thread1            |- Thread2 started processing
307          |                                      |
308         Ctx2                                    |- Thread1 started processing
309          |                                   T1 |- Thread2 finishes and syncs up local counters
310         Ctx3 <- Cancelled by Thread2            |
311          |                                      |- Ctx5 is bound to Ctx2
312         Ctx4                                    |
313                                              T2 |- Thread1 reaches Ctx2
314 
315     Thread-propagator of each cancellation increments global counter. However the thread
316     propagating the cancellation from the outermost context (Thread1) may be the last
317     to finish. Which means that the local counters may be synchronized earlier (by Thread2,
318     at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context
319     (Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only
320     (Ctx2) may result in cancellation request being lost.
321 
322     This issue is solved by doing the whole propagation under the lock.
323 
324     If we need more concurrency while processing parallel cancellations, we could try
325     the following modification of the propagation algorithm:
326 
327     advance global counter and remember it
328     for each thread:
329         scan thread's list of contexts
330     for each thread:
331         sync up its local counter only if the global counter has not been changed
332 
333     However this version of the algorithm requires more analysis and verification.
334 */
335 
336 void __TBB_EXPORTED_FUNC initialize(d1::task_group_context& ctx) {
337     task_group_context_impl::initialize(ctx);
338 }
339 void __TBB_EXPORTED_FUNC destroy(d1::task_group_context& ctx) {
340     task_group_context_impl::destroy(ctx);
341 }
342 void __TBB_EXPORTED_FUNC reset(d1::task_group_context& ctx) {
343     task_group_context_impl::reset(ctx);
344 }
345 bool __TBB_EXPORTED_FUNC cancel_group_execution(d1::task_group_context& ctx) {
346     return task_group_context_impl::cancel_group_execution(ctx);
347 }
348 bool __TBB_EXPORTED_FUNC is_group_execution_cancelled(d1::task_group_context& ctx) {
349     return task_group_context_impl::is_group_execution_cancelled(ctx);
350 }
351 void __TBB_EXPORTED_FUNC capture_fp_settings(d1::task_group_context& ctx) {
352     task_group_context_impl::capture_fp_settings(ctx);
353 }
354 
355 } // namespace r1
356 } // namespace detail
357 } // namespace tbb
358 
359