xref: /oneTBB/src/tbb/task_group_context.cpp (revision 9cef5777)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/detail/_config.h"
18 #include "oneapi/tbb/tbb_allocator.h"
19 #include "oneapi/tbb/task_group.h"
20 #include "governor.h"
21 #include "thread_data.h"
22 #include "scheduler_common.h"
23 #include "itt_notify.h"
24 #include "task_dispatcher.h"
25 
26 #include <type_traits>
27 
28 namespace tbb {
29 namespace detail {
30 namespace r1 {
31 
32 //------------------------------------------------------------------------
33 // tbb_exception_ptr
34 //------------------------------------------------------------------------
35 tbb_exception_ptr* tbb_exception_ptr::allocate() noexcept {
36     tbb_exception_ptr* eptr = (tbb_exception_ptr*)allocate_memory(sizeof(tbb_exception_ptr));
37     return eptr ? new (eptr) tbb_exception_ptr(std::current_exception()) : nullptr;
38 }
39 
40 void tbb_exception_ptr::destroy() noexcept {
41     this->~tbb_exception_ptr();
42     deallocate_memory(this);
43 }
44 
45 void tbb_exception_ptr::throw_self() {
46     if (governor::rethrow_exception_broken()) fix_broken_rethrow();
47     std::rethrow_exception(my_ptr);
48 }
49 
50 //------------------------------------------------------------------------
51 // task_group_context
52 //------------------------------------------------------------------------
53 
54 void task_group_context_impl::destroy(d1::task_group_context& ctx) {
55     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
56 
57     if (ctx.my_context_list != nullptr) {
58         __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::bound, nullptr);
59         // The owner can be destroyed at any moment. Access the associate data with caution.
60         ctx.my_context_list->remove(ctx.my_node);
61     }
62     d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env);
63 #if _MSC_VER && _MSC_VER <= 1900 && !__INTEL_COMPILER
64     suppress_unused_warning(ctl);
65 #endif
66     ctl->~cpu_ctl_env();
67 
68     if (ctx.my_exception)
69         ctx.my_exception->destroy();
70     ITT_STACK_DESTROY(ctx.my_itt_caller);
71 
72     poison_pointer(ctx.my_parent);
73     poison_pointer(ctx.my_context_list);
74     poison_pointer(ctx.my_node.my_next_node);
75     poison_pointer(ctx.my_node.my_prev_node);
76     poison_pointer(ctx.my_exception);
77     poison_pointer(ctx.my_itt_caller);
78 
79     ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::dead, std::memory_order_release);
80 }
81 
82 void task_group_context_impl::initialize(d1::task_group_context& ctx) {
83     ITT_TASK_GROUP(&ctx, ctx.my_name, nullptr);
84 
85     ctx.my_node.my_next_node = &ctx.my_node;
86     ctx.my_node.my_prev_node = &ctx.my_node;
87     ctx.my_cpu_ctl_env = 0;
88     ctx.my_cancellation_requested = 0;
89     ctx.my_state.store(0, std::memory_order_relaxed);
90     // Set the created state to bound at the first usage.
91     ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::created, std::memory_order_relaxed);
92     ctx.my_parent = nullptr;
93     ctx.my_context_list = nullptr;
94     ctx.my_exception = nullptr;
95     ctx.my_itt_caller = nullptr;
96 
97     static_assert(sizeof(d1::cpu_ctl_env) <= sizeof(ctx.my_cpu_ctl_env), "FPU settings storage does not fit to uint64_t");
98     d1::cpu_ctl_env* ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env;
99     if (ctx.my_traits.fp_settings)
100         ctl->get_env();
101 }
102 
103 void task_group_context_impl::register_with(d1::task_group_context& ctx, thread_data* td) {
104     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
105     __TBB_ASSERT(td, nullptr);
106     ctx.my_context_list = td->my_context_list;
107 
108     ctx.my_context_list->push_front(ctx.my_node);
109 }
110 
111 void task_group_context_impl::bind_to_impl(d1::task_group_context& ctx, thread_data* td) {
112     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
113     __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) == d1::task_group_context::lifetime_state::locked, "The context can be bound only under the lock.");
114     __TBB_ASSERT(!ctx.my_parent, "Parent is set before initial binding");
115 
116     ctx.my_parent = td->my_task_dispatcher->m_execute_data_ext.context;
117     __TBB_ASSERT(ctx.my_parent, nullptr);
118 
119     // Inherit FPU settings only if the context has not captured FPU settings yet.
120     if (!ctx.my_traits.fp_settings)
121         copy_fp_settings(ctx, *ctx.my_parent);
122 
123     // Condition below prevents unnecessary thrashing parent context's cache line
124     if (ctx.my_parent->my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children) {
125         ctx.my_parent->my_state.store(d1::task_group_context::may_have_children, std::memory_order_relaxed); // full fence is below
126     }
127     if (ctx.my_parent->my_parent) {
128         // Even if this context were made accessible for state change propagation
129         // (by placing store_with_release(td->my_context_list_state.head.my_next, &ctx.my_node)
130         // above), it still could be missed if state propagation from a grand-ancestor
131         // was underway concurrently with binding.
132         // Speculative propagation from the parent together with epoch counters
133         // detecting possibility of such a race allow to avoid taking locks when
134         // there is no contention.
135 
136         // Acquire fence is necessary to prevent reordering subsequent speculative
137         // loads of parent state data out of the scope where epoch counters comparison
138         // can reliably validate it.
139         uintptr_t local_count_snapshot = ctx.my_parent->my_context_list->epoch.load(std::memory_order_acquire);
140         // Speculative propagation of parent's state. The speculation will be
141         // validated by the epoch counters check further on.
142         ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
143         register_with(ctx, td); // Issues full fence
144 
145         // If no state propagation was detected by the following condition, the above
146         // full fence guarantees that the parent had correct state during speculative
147         // propagation before the fence. Otherwise the propagation from parent is
148         // repeated under the lock.
149         if (local_count_snapshot != the_context_state_propagation_epoch.load(std::memory_order_relaxed)) {
150             // Another thread may be propagating state change right now. So resort to lock.
151             context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
152             ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
153         }
154     } else {
155         register_with(ctx, td); // Issues full fence
156         // As we do not have grand-ancestors, concurrent state propagation (if any)
157         // may originate only from the parent context, and thus it is safe to directly
158         // copy the state from it.
159         ctx.my_cancellation_requested.store(ctx.my_parent->my_cancellation_requested.load(std::memory_order_relaxed), std::memory_order_relaxed);
160     }
161 
162     ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::bound, std::memory_order_release);
163 }
164 
165 void task_group_context_impl::bind_to(d1::task_group_context& ctx, thread_data* td) {
166     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
167     d1::task_group_context::lifetime_state state = ctx.my_lifetime_state.load(std::memory_order_acquire);
168     if (state <= d1::task_group_context::lifetime_state::locked) {
169         if (state == d1::task_group_context::lifetime_state::created &&
170 #if defined(__INTEL_COMPILER) && __INTEL_COMPILER <= 1910
171             ((std::atomic<typename std::underlying_type<d1::task_group_context::lifetime_state>::type>&)ctx.my_lifetime_state).compare_exchange_strong(
172             (typename std::underlying_type<d1::task_group_context::lifetime_state>::type&)state,
173                 (typename std::underlying_type<d1::task_group_context::lifetime_state>::type)d1::task_group_context::lifetime_state::locked)
174 #else
175             ctx.my_lifetime_state.compare_exchange_strong(state, d1::task_group_context::lifetime_state::locked)
176 #endif
177             ) {
178             // If we are in the outermost task dispatch loop of an external thread, then
179             // there is nothing to bind this context to, and we skip the binding part
180             // treating the context as isolated.
181             __TBB_ASSERT(td->my_task_dispatcher->m_execute_data_ext.context != nullptr, nullptr);
182             if (td->my_task_dispatcher->m_execute_data_ext.context == td->my_arena->my_default_ctx || !ctx.my_traits.bound) {
183                 if (!ctx.my_traits.fp_settings) {
184                     copy_fp_settings(ctx, *td->my_arena->my_default_ctx);
185                 }
186                 ctx.my_lifetime_state.store(d1::task_group_context::lifetime_state::isolated, std::memory_order_release);
187             } else {
188                 bind_to_impl(ctx, td);
189             }
190             ITT_STACK_CREATE(ctx.my_itt_caller);
191         }
192         spin_wait_while_eq(ctx.my_lifetime_state, d1::task_group_context::lifetime_state::locked);
193     }
194     __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::created, nullptr);
195     __TBB_ASSERT(ctx.my_lifetime_state.load(std::memory_order_relaxed) != d1::task_group_context::lifetime_state::locked, nullptr);
196 }
197 
198 template <typename T>
199 void task_group_context_impl::propagate_task_group_state(d1::task_group_context& ctx, std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) {
200     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
201     /*  1. if ((ctx.*mptr_state).load(std::memory_order_relaxed) == new_state):
202             Nothing to do, whether descending from "src" or not, so no need to scan.
203             Hopefully this happens often thanks to earlier invocations.
204             This optimization is enabled by LIFO order in the context lists:
205                 - new contexts are bound to the beginning of lists;
206                 - descendants are newer than ancestors;
207                 - earlier invocations are therefore likely to "paint" long chains.
208         2. if (&ctx != &src):
209             This clause is disjunct from the traversal below, which skips src entirely.
210             Note that src.*mptr_state is not necessarily still equal to new_state (another thread may have changed it again).
211             Such interference is probably not frequent enough to aim for optimisation by writing new_state again (to make the other thread back down).
212             Letting the other thread prevail may also be fairer.
213     */
214     if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state && &ctx != &src) {
215         for (d1::task_group_context* ancestor = ctx.my_parent; ancestor != nullptr; ancestor = ancestor->my_parent) {
216             if (ancestor == &src) {
217                 for (d1::task_group_context* c = &ctx; c != ancestor; c = c->my_parent)
218                     (c->*mptr_state).store(new_state, std::memory_order_relaxed);
219                 break;
220             }
221         }
222     }
223 }
224 
225 template <typename T>
226 void thread_data::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) {
227     mutex::scoped_lock lock(my_context_list->m_mutex);
228     // Acquire fence is necessary to ensure that the subsequent node->my_next load
229     // returned the correct value in case it was just inserted in another thread.
230     // The fence also ensures visibility of the correct ctx.my_parent value.
231     for (context_list::iterator it = my_context_list->begin(); it != my_context_list->end(); ++it) {
232         d1::task_group_context& ctx = __TBB_get_object_ref(d1::task_group_context, my_node, &(*it));
233         if ((ctx.*mptr_state).load(std::memory_order_relaxed) != new_state)
234             task_group_context_impl::propagate_task_group_state(ctx, mptr_state, src, new_state);
235     }
236     // Sync up local propagation epoch with the global one. Release fence prevents
237     // reordering of possible store to *mptr_state after the sync point.
238     my_context_list->epoch.store(the_context_state_propagation_epoch.load(std::memory_order_relaxed), std::memory_order_release);
239 }
240 
241 template <typename T>
242 bool market::propagate_task_group_state(std::atomic<T> d1::task_group_context::* mptr_state, d1::task_group_context& src, T new_state) {
243     if (src.my_state.load(std::memory_order_relaxed) != d1::task_group_context::may_have_children)
244         return true;
245     // The whole propagation algorithm is under the lock in order to ensure correctness
246     // in case of concurrent state changes at the different levels of the context tree.
247     // See comment at the bottom of scheduler.cpp
248     context_state_propagation_mutex_type::scoped_lock lock(the_context_state_propagation_mutex);
249     if ((src.*mptr_state).load(std::memory_order_relaxed) != new_state)
250         // Another thread has concurrently changed the state. Back down.
251         return false;
252     // Advance global state propagation epoch
253     ++the_context_state_propagation_epoch;
254     // Propagate to all workers and external threads and sync up their local epochs with the global one
255     unsigned num_workers = my_first_unused_worker_idx;
256     for (unsigned i = 0; i < num_workers; ++i) {
257         thread_data* td = my_workers[i];
258         // If the worker is only about to be registered, skip it.
259         if (td)
260             td->propagate_task_group_state(mptr_state, src, new_state);
261     }
262     // Propagate to all external threads
263     // The whole propagation sequence is locked, thus no contention is expected
264     for (thread_data_list_type::iterator it = my_masters.begin(); it != my_masters.end(); it++)
265         it->propagate_task_group_state(mptr_state, src, new_state);
266     return true;
267 }
268 
269 bool task_group_context_impl::cancel_group_execution(d1::task_group_context& ctx) {
270     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
271     __TBB_ASSERT(ctx.my_cancellation_requested.load(std::memory_order_relaxed) <= 1, "The cancellation state can be either 0 or 1");
272     if (ctx.my_cancellation_requested.load(std::memory_order_relaxed) || ctx.my_cancellation_requested.exchange(1)) {
273         // This task group and any descendants have already been canceled.
274         // (A newly added descendant would inherit its parent's ctx.my_cancellation_requested,
275         // not missing out on any cancellation still being propagated, and a context cannot be uncanceled.)
276         return false;
277     }
278     governor::get_thread_data()->my_arena->my_market->propagate_task_group_state(&d1::task_group_context::my_cancellation_requested, ctx, uint32_t(1));
279     return true;
280 }
281 
282 bool task_group_context_impl::is_group_execution_cancelled(const d1::task_group_context& ctx) {
283     return ctx.my_cancellation_requested.load(std::memory_order_relaxed) != 0;
284 }
285 
286 // IMPORTANT: It is assumed that this method is not used concurrently!
287 void task_group_context_impl::reset(d1::task_group_context& ctx) {
288     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
289     //! TODO: Add assertion that this context does not have children
290     // No fences are necessary since this context can be accessed from another thread
291     // only after stealing happened (which means necessary fences were used).
292     if (ctx.my_exception) {
293         ctx.my_exception->destroy();
294         ctx.my_exception = nullptr;
295     }
296     ctx.my_cancellation_requested = 0;
297 }
298 
299 // IMPORTANT: It is assumed that this method is not used concurrently!
300 void task_group_context_impl::capture_fp_settings(d1::task_group_context& ctx) {
301     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
302     //! TODO: Add assertion that this context does not have children
303     // No fences are necessary since this context can be accessed from another thread
304     // only after stealing happened (which means necessary fences were used).
305     d1::cpu_ctl_env* ctl = reinterpret_cast<d1::cpu_ctl_env*>(&ctx.my_cpu_ctl_env);
306     if (!ctx.my_traits.fp_settings) {
307         ctl = new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env;
308         ctx.my_traits.fp_settings = true;
309     }
310     ctl->get_env();
311 }
312 
313 void task_group_context_impl::copy_fp_settings(d1::task_group_context& ctx, const d1::task_group_context& src) {
314     __TBB_ASSERT(!is_poisoned(ctx.my_context_list), nullptr);
315     __TBB_ASSERT(!ctx.my_traits.fp_settings, "The context already has FPU settings.");
316     __TBB_ASSERT(src.my_traits.fp_settings, "The source context does not have FPU settings.");
317 
318     const d1::cpu_ctl_env* src_ctl = reinterpret_cast<const d1::cpu_ctl_env*>(&src.my_cpu_ctl_env);
319     new (&ctx.my_cpu_ctl_env) d1::cpu_ctl_env(*src_ctl);
320     ctx.my_traits.fp_settings = true;
321 }
322 
323 /*
324     Comments:
325 
326 1.  The premise of the cancellation support implementation is that cancellations are
327     not part of the hot path of the program execution. Therefore all changes in its
328     implementation in order to reduce the overhead of the cancellation control flow
329     should be done only in ways that do not increase overhead of the normal execution.
330 
331     In general, contexts are used by all threads and their descendants are created in
332     different threads as well. In order to minimize impact of the cross-thread tree
333     maintenance (first of all because of the synchronization), the tree of contexts
334     is split into pieces, each of which is handled by a single thread. Such pieces
335     are represented as lists of contexts, members of which are contexts that were
336     bound to their parents in the given thread.
337 
338     The context tree maintenance and cancellation propagation algorithms are designed
339     in such a manner that cross-thread access to a context list will take place only
340     when cancellation signal is sent (by user or when an exception happens), and
341     synchronization is necessary only then. Thus the normal execution flow (without
342     exceptions and cancellation) remains free from any synchronization done on
343     behalf of exception handling and cancellation support.
344 
345 2.  Consider parallel cancellations at the different levels of the context tree:
346 
347         Ctx1 <- Cancelled by Thread1            |- Thread2 started processing
348          |                                      |
349         Ctx2                                    |- Thread1 started processing
350          |                                   T1 |- Thread2 finishes and syncs up local counters
351         Ctx3 <- Cancelled by Thread2            |
352          |                                      |- Ctx5 is bound to Ctx2
353         Ctx4                                    |
354                                              T2 |- Thread1 reaches Ctx2
355 
356     Thread-propagator of each cancellation increments global counter. However the thread
357     propagating the cancellation from the outermost context (Thread1) may be the last
358     to finish. Which means that the local counters may be synchronized earlier (by Thread2,
359     at Time1) than it propagated cancellation into Ctx2 (at time Time2). If a new context
360     (Ctx5) is created and bound to Ctx2 between Time1 and Time2, checking its parent only
361     (Ctx2) may result in cancellation request being lost.
362 
363     This issue is solved by doing the whole propagation under the lock.
364 
365     If we need more concurrency while processing parallel cancellations, we could try
366     the following modification of the propagation algorithm:
367 
368     advance global counter and remember it
369     for each thread:
370         scan thread's list of contexts
371     for each thread:
372         sync up its local counter only if the global counter has not been changed
373 
374     However this version of the algorithm requires more analysis and verification.
375 */
376 
377 void __TBB_EXPORTED_FUNC initialize(d1::task_group_context& ctx) {
378     task_group_context_impl::initialize(ctx);
379 }
380 void __TBB_EXPORTED_FUNC destroy(d1::task_group_context& ctx) {
381     task_group_context_impl::destroy(ctx);
382 }
383 void __TBB_EXPORTED_FUNC reset(d1::task_group_context& ctx) {
384     task_group_context_impl::reset(ctx);
385 }
386 bool __TBB_EXPORTED_FUNC cancel_group_execution(d1::task_group_context& ctx) {
387     return task_group_context_impl::cancel_group_execution(ctx);
388 }
389 bool __TBB_EXPORTED_FUNC is_group_execution_cancelled(d1::task_group_context& ctx) {
390     return task_group_context_impl::is_group_execution_cancelled(ctx);
391 }
392 void __TBB_EXPORTED_FUNC capture_fp_settings(d1::task_group_context& ctx) {
393     task_group_context_impl::capture_fp_settings(ctx);
394 }
395 
396 } // namespace r1
397 } // namespace detail
398 } // namespace tbb
399 
400