xref: /oneTBB/src/tbb/task_dispatcher.h (revision c4568449)
1 /*
2     Copyright (c) 2020-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_task_dispatcher_H
18 #define _TBB_task_dispatcher_H
19 
20 #include "oneapi/tbb/detail/_utils.h"
21 #include "oneapi/tbb/detail/_task.h"
22 #include "oneapi/tbb/global_control.h"
23 
24 #include "scheduler_common.h"
25 #include "waiters.h"
26 #include "arena_slot.h"
27 #include "arena.h"
28 #include "thread_data.h"
29 #include "mailbox.h"
30 #include "itt_notify.h"
31 #include "concurrent_monitor.h"
32 #include "threading_control.h"
33 
34 #include <atomic>
35 
36 #if !__TBB_CPU_CTL_ENV_PRESENT
37 #include <fenv.h> //
38 #endif
39 
40 namespace tbb {
41 namespace detail {
42 namespace r1 {
43 
get_self_recall_task(arena_slot & slot)44 inline d1::task* get_self_recall_task(arena_slot& slot) {
45     suppress_unused_warning(slot);
46     d1::task* t = nullptr;
47 #if __TBB_RESUMABLE_TASKS
48     suspend_point_type* sp = slot.default_task_dispatcher().m_suspend_point;
49     if (sp && sp->m_is_owner_recalled.load(std::memory_order_acquire)) {
50         t = &sp->m_resume_task;
51         __TBB_ASSERT(sp->m_resume_task.m_target.m_thread_data == nullptr, nullptr);
52     }
53 #endif /* __TBB_RESUMABLE_TASKS */
54     return t;
55 }
56 
57 // Defined in exception.cpp
58 /*[[noreturn]]*/void do_throw_noexcept(void (*throw_exception)()) noexcept;
59 
60 //------------------------------------------------------------------------
61 // Suspend point
62 //------------------------------------------------------------------------
63 #if __TBB_RESUMABLE_TASKS
64 
execute(d1::execution_data & ed)65 inline d1::task* suspend_point_type::resume_task::execute(d1::execution_data& ed) {
66     execution_data_ext& ed_ext = static_cast<execution_data_ext&>(ed);
67 
68     if (ed_ext.wait_ctx) {
69         thread_control_monitor::resume_context monitor_node{{std::uintptr_t(ed_ext.wait_ctx), nullptr}, ed_ext, m_target};
70         // The wait_ctx is present only in external_waiter. In that case we leave the current stack
71         // in the abandoned state to resume when waiting completes.
72         thread_data* td = ed_ext.task_disp->m_thread_data;
73         td->set_post_resume_action(task_dispatcher::post_resume_action::register_waiter, &monitor_node);
74 
75         thread_control_monitor& wait_list = td->my_arena->get_waiting_threads_monitor();
76 
77         if (wait_list.wait([&] { return !ed_ext.wait_ctx->continue_execution(); }, monitor_node)) {
78             return nullptr;
79         }
80 
81         td->clear_post_resume_action();
82         r1::resume(ed_ext.task_disp->get_suspend_point());
83     } else {
84         // If wait_ctx is null, it can be only a worker thread on outermost level because
85         // coroutine_waiter interrupts bypass loop before the resume_task execution.
86         ed_ext.task_disp->m_thread_data->set_post_resume_action(task_dispatcher::post_resume_action::notify,
87             ed_ext.task_disp->get_suspend_point());
88     }
89     // Do not access this task because it might be destroyed
90     ed_ext.task_disp->resume(m_target);
91     return nullptr;
92 }
93 
suspend_point_type(arena * a,size_t stack_size,task_dispatcher & task_disp)94 inline suspend_point_type::suspend_point_type(arena* a, size_t stack_size, task_dispatcher& task_disp)
95     : m_arena(a)
96     , m_random(this)
97     , m_co_context(stack_size, &task_disp)
98     , m_resume_task(task_disp)
99 {
100     assert_pointer_valid(m_arena);
101     assert_pointer_valid(m_arena->my_default_ctx);
102     task_accessor::context(m_resume_task) = m_arena->my_default_ctx;
103     task_accessor::isolation(m_resume_task) = no_isolation;
104     // Initialize the itt_caller for the context of the resume task.
105     // It will be bound to the stack of the first suspend call.
106     task_group_context_impl::bind_to(*task_accessor::context(m_resume_task), task_disp.m_thread_data);
107 }
108 
109 #endif /* __TBB_RESUMABLE_TASKS */
110 
111 //------------------------------------------------------------------------
112 // Task Dispatcher
113 //------------------------------------------------------------------------
task_dispatcher(arena * a)114 inline task_dispatcher::task_dispatcher(arena* a) {
115     m_execute_data_ext.context = a->my_default_ctx;
116     m_execute_data_ext.task_disp = this;
117 }
118 
can_steal()119 inline bool task_dispatcher::can_steal() {
120     __TBB_ASSERT(m_stealing_threshold != 0, nullptr);
121     stack_anchor_type anchor{};
122     return reinterpret_cast<std::uintptr_t>(&anchor) > m_stealing_threshold;
123 }
124 
get_inbox_or_critical_task(execution_data_ext & ed,mail_inbox & inbox,isolation_type isolation,bool critical_allowed)125 inline d1::task* task_dispatcher::get_inbox_or_critical_task(
126     execution_data_ext& ed, mail_inbox& inbox, isolation_type isolation, bool critical_allowed)
127 {
128     if (inbox.empty())
129         return nullptr;
130     d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
131     if (result)
132         return result;
133     // Check if there are tasks mailed to this thread via task-to-thread affinity mechanism.
134     result = get_mailbox_task(inbox, ed, isolation);
135     // There is a race with a thread adding a new task (possibly with suitable isolation)
136     // to our mailbox, so the below conditions might result in a false positive.
137     // Then set_is_idle(false) allows that task to be stolen; it's OK.
138     if (isolation != no_isolation && !result && !inbox.empty() && inbox.is_idle_state(true)) {
139         // We have proxy tasks in our mailbox but the isolation blocks their execution.
140         // So publish the proxy tasks in mailbox to be available for stealing from owner's task pool.
141         inbox.set_is_idle( false );
142     }
143     return result;
144 }
145 
get_stream_or_critical_task(execution_data_ext & ed,arena & a,task_stream<front_accessor> & stream,unsigned & hint,isolation_type isolation,bool critical_allowed)146 inline d1::task* task_dispatcher::get_stream_or_critical_task(
147     execution_data_ext& ed, arena& a, task_stream<front_accessor>& stream, unsigned& hint,
148     isolation_type isolation, bool critical_allowed)
149 {
150     if (stream.empty())
151         return nullptr;
152     d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
153     if (result)
154         return result;
155     return a.get_stream_task(stream, hint);
156 }
157 
steal_or_get_critical(execution_data_ext & ed,arena & a,unsigned arena_index,FastRandom & random,isolation_type isolation,bool critical_allowed)158 inline d1::task* task_dispatcher::steal_or_get_critical(
159     execution_data_ext& ed, arena& a, unsigned arena_index, FastRandom& random,
160     isolation_type isolation, bool critical_allowed)
161 {
162     if (d1::task* t = a.steal_task(arena_index, random, ed, isolation)) {
163         ed.context = task_accessor::context(*t);
164         ed.isolation = task_accessor::isolation(*t);
165         return get_critical_task(t, ed, isolation, critical_allowed);
166     }
167     return nullptr;
168 }
169 
170 template <bool ITTPossible, typename Waiter>
receive_or_steal_task(thread_data & tls,execution_data_ext & ed,Waiter & waiter,isolation_type isolation,bool fifo_allowed,bool critical_allowed)171 d1::task* task_dispatcher::receive_or_steal_task(
172     thread_data& tls, execution_data_ext& ed, Waiter& waiter, isolation_type isolation,
173     bool fifo_allowed, bool critical_allowed)
174 {
175     __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
176     // Task to return
177     d1::task* t = nullptr;
178     // Get tls data (again)
179     arena& a = *tls.my_arena;
180     arena_slot& slot = *tls.my_arena_slot;
181     unsigned arena_index = tls.my_arena_index;
182     mail_inbox& inbox = tls.my_inbox;
183     task_stream<front_accessor>& resume_stream = a.my_resume_task_stream;
184     unsigned& resume_hint = slot.hint_for_resume_stream;
185     task_stream<front_accessor>& fifo_stream = a.my_fifo_task_stream;
186     unsigned& fifo_hint = slot.hint_for_fifo_stream;
187 
188     waiter.reset_wait();
189     // Thread is in idle state now
190     inbox.set_is_idle(true);
191 
192     bool stealing_is_allowed = can_steal();
193 
194     // Stealing loop mailbox/enqueue/other_slots
195     for (;;) {
196         __TBB_ASSERT(t == nullptr, nullptr);
197         // Check if the resource manager requires our arena to relinquish some threads
198         // For the external thread restore idle state to true after dispatch loop
199         if (!waiter.continue_execution(slot, t)) {
200             __TBB_ASSERT(t == nullptr, nullptr);
201             break;
202         }
203         // Start searching
204         if (t != nullptr) {
205             // continue_execution returned a task
206         }
207         else if ((t = get_inbox_or_critical_task(ed, inbox, isolation, critical_allowed))) {
208             // Successfully got the task from mailbox or critical task
209         }
210         else if ((t = get_stream_or_critical_task(ed, a, resume_stream, resume_hint, isolation, critical_allowed))) {
211             // Successfully got the resume or critical task
212         }
213         else if (fifo_allowed && isolation == no_isolation
214                  && (t = get_stream_or_critical_task(ed, a, fifo_stream, fifo_hint, isolation, critical_allowed))) {
215             // Checked if there are tasks in starvation-resistant stream. Only allowed at the outermost dispatch level without isolation.
216         }
217         else if (stealing_is_allowed
218                  && (t = steal_or_get_critical(ed, a, arena_index, tls.my_random, isolation, critical_allowed))) {
219             // Stole a task from a random arena slot
220         }
221         else {
222             t = get_critical_task(t, ed, isolation, critical_allowed);
223         }
224 
225         if (t != nullptr) {
226             ed.context = task_accessor::context(*t);
227             ed.isolation = task_accessor::isolation(*t);
228             a.my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
229             break; // Stealing success, end of stealing attempt
230         }
231         // Nothing to do, pause a little.
232         waiter.pause(slot);
233     } // end of nonlocal task retrieval loop
234 
235     __TBB_ASSERT(is_alive(a.my_guard), nullptr);
236     if (inbox.is_idle_state(true)) {
237         inbox.set_is_idle(false);
238     }
239     return t;
240 }
241 
242 template <bool ITTPossible, typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)243 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
244     assert_pointer_valid(m_thread_data);
245     __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
246 
247     // Guard an outer/default execution state
248     struct dispatch_loop_guard {
249         task_dispatcher& task_disp;
250         execution_data_ext old_execute_data_ext;
251         properties old_properties;
252 
253         ~dispatch_loop_guard() {
254             task_disp.m_execute_data_ext = old_execute_data_ext;
255             task_disp.m_properties = old_properties;
256 
257             __TBB_ASSERT(task_disp.m_thread_data && governor::is_thread_data_set(task_disp.m_thread_data), nullptr);
258             __TBB_ASSERT(task_disp.m_thread_data->my_task_dispatcher == &task_disp, nullptr);
259         }
260     } dl_guard{ *this, m_execute_data_ext, m_properties };
261 
262     // The context guard to track fp setting and itt tasks.
263     context_guard_helper</*report_tasks=*/ITTPossible> context_guard;
264 
265     // Current isolation context
266     const isolation_type isolation = dl_guard.old_execute_data_ext.isolation;
267 
268     // Critical work inflection point. Once turned false current execution context has taken
269     // critical task on the previous stack frame and cannot take more until that critical path is
270     // finished.
271     bool critical_allowed = dl_guard.old_properties.critical_task_allowed;
272 
273     // Extended execution data that is used for dispatching.
274     // Base version is passed to the task::execute method.
275     execution_data_ext& ed = m_execute_data_ext;
276     ed.context = t ? task_accessor::context(*t) : nullptr;
277     ed.original_slot = m_thread_data->my_arena_index;
278     ed.affinity_slot = d1::no_slot;
279     ed.task_disp = this;
280     ed.wait_ctx = waiter.wait_ctx();
281 
282     m_properties.outermost = false;
283     m_properties.fifo_tasks_allowed = false;
284 
285     t = get_critical_task(t, ed, isolation, critical_allowed);
286     if (t && m_thread_data->my_inbox.is_idle_state(true)) {
287         // The thread has a work to do. Therefore, marking its inbox as not idle so that
288         // affinitized tasks can be stolen from it.
289         m_thread_data->my_inbox.set_is_idle(false);
290     }
291 
292     // Infinite exception loop
293     for (;;) {
294         try {
295             // Main execution loop
296             do {
297                 // We assume that bypass tasks are from the same task group.
298                 context_guard.set_ctx(ed.context);
299                 // Inner level evaluates tasks coming from nesting loops and those returned
300                 // by just executed tasks (bypassing spawn or enqueue calls).
301                 while (t != nullptr) {
302                     assert_task_valid(t);
303                     assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
304                     __TBB_ASSERT(ed.context->my_state == d1::task_group_context::state::bound ||
305                         ed.context->my_state == d1::task_group_context::state::isolated, nullptr);
306                     __TBB_ASSERT(m_thread_data->my_inbox.is_idle_state(false), nullptr);
307                     __TBB_ASSERT(task_accessor::is_resume_task(*t) || isolation == no_isolation || isolation == ed.isolation, nullptr);
308                     // Check premature leave
309                     if (Waiter::postpone_execution(*t)) {
310                         __TBB_ASSERT(task_accessor::is_resume_task(*t) && dl_guard.old_properties.outermost,
311                             "Currently, the bypass loop can be interrupted only for resume task on outermost level");
312                         return t;
313                     }
314                     // Copy itt_caller to a stack because the context might be destroyed after t->execute.
315                     void* itt_caller = ed.context->my_itt_caller;
316                     suppress_unused_warning(itt_caller);
317 
318                     ITT_CALLEE_ENTER(ITTPossible, t, itt_caller);
319 
320                     if (ed.context->is_group_execution_cancelled()) {
321                         t = t->cancel(ed);
322                     } else {
323                         t = t->execute(ed);
324                     }
325 
326                     ITT_CALLEE_LEAVE(ITTPossible, itt_caller);
327 
328                     // The task affinity in execution data is set for affinitized tasks.
329                     // So drop it after the task execution.
330                     ed.affinity_slot = d1::no_slot;
331                     // Reset task owner id for bypassed task
332                     ed.original_slot = m_thread_data->my_arena_index;
333                     t = get_critical_task(t, ed, isolation, critical_allowed);
334                 }
335                 __TBB_ASSERT(m_thread_data && governor::is_thread_data_set(m_thread_data), nullptr);
336                 __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
337                 // When refactoring, pay attention that m_thread_data can be changed after t->execute()
338                 __TBB_ASSERT(m_thread_data->my_arena_slot != nullptr, nullptr);
339                 arena_slot& slot = *m_thread_data->my_arena_slot;
340                 if (!waiter.continue_execution(slot, t)) {
341                     break;
342                 }
343                 // Retrieve the task from local task pool
344                 if (t || (slot.is_task_pool_published() && (t = slot.get_task(ed, isolation)))) {
345                     __TBB_ASSERT(ed.original_slot == m_thread_data->my_arena_index, nullptr);
346                     ed.context = task_accessor::context(*t);
347                     ed.isolation = task_accessor::isolation(*t);
348                     continue;
349                 }
350                 // Retrieve the task from global sources
351                 t = receive_or_steal_task<ITTPossible>(
352                     *m_thread_data, ed, waiter, isolation, dl_guard.old_properties.fifo_tasks_allowed,
353                     critical_allowed
354                 );
355             } while (t != nullptr); // main dispatch loop
356             break; // Exit exception loop;
357         } catch (...) {
358             if (global_control::active_value(global_control::terminate_on_exception) == 1) {
359                 do_throw_noexcept([] { throw; });
360             }
361             if (ed.context->cancel_group_execution()) {
362                 /* We are the first to signal cancellation, so store the exception that caused it. */
363                 ed.context->my_exception.store(tbb_exception_ptr::allocate(), std::memory_order_release);
364             }
365         }
366     } // Infinite exception loop
367     __TBB_ASSERT(t == nullptr, nullptr);
368 
369 
370 #if __TBB_RESUMABLE_TASKS
371     if (dl_guard.old_properties.outermost) {
372         recall_point();
373     }
374 #endif /* __TBB_RESUMABLE_TASKS */
375 
376     return nullptr;
377 }
378 
379 #if __TBB_RESUMABLE_TASKS
recall_point()380 inline void task_dispatcher::recall_point() {
381     if (this != &m_thread_data->my_arena_slot->default_task_dispatcher()) {
382         __TBB_ASSERT(m_suspend_point != nullptr, nullptr);
383         __TBB_ASSERT(m_suspend_point->m_is_owner_recalled.load(std::memory_order_relaxed) == false, nullptr);
384 
385         m_thread_data->set_post_resume_action(post_resume_action::notify, get_suspend_point());
386         internal_suspend();
387 
388         if (m_thread_data->my_inbox.is_idle_state(true)) {
389             m_thread_data->my_inbox.set_is_idle(false);
390         }
391     }
392 }
393 #endif /* __TBB_RESUMABLE_TASKS */
394 
395 #if __TBB_PREVIEW_CRITICAL_TASKS
get_critical_task(d1::task * t,execution_data_ext & ed,isolation_type isolation,bool critical_allowed)396 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext& ed, isolation_type isolation, bool critical_allowed) {
397     __TBB_ASSERT( critical_allowed || !m_properties.critical_task_allowed, nullptr );
398 
399     if (!critical_allowed) {
400         // The stack is already in the process of critical path execution. Cannot take another
401         // critical work until finish with the current one.
402         __TBB_ASSERT(!m_properties.critical_task_allowed, nullptr);
403         return t;
404     }
405 
406     assert_pointers_valid(m_thread_data, m_thread_data->my_arena, m_thread_data->my_arena_slot);
407     thread_data& td = *m_thread_data;
408     arena& a = *td.my_arena;
409     arena_slot& slot = *td.my_arena_slot;
410 
411     d1::task* crit_t = a.get_critical_task(slot.hint_for_critical_stream, isolation);
412     if (crit_t != nullptr) {
413         assert_task_valid(crit_t);
414         if (t != nullptr) {
415             assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
416             r1::spawn(*t, *ed.context);
417         }
418         ed.context = task_accessor::context(*crit_t);
419         ed.isolation = task_accessor::isolation(*crit_t);
420 
421         // We cannot execute more than one critical task on the same stack.
422         // In other words, we prevent nested critical tasks.
423         m_properties.critical_task_allowed = false;
424 
425         // TODO: add a test that the observer is called when critical task is taken.
426         a.my_observers.notify_entry_observers(td.my_last_observer, td.my_is_worker);
427         t = crit_t;
428     } else {
429         // Was unable to find critical work in the queue. Allow inspecting the queue in nested
430         // invocations. Handles the case when critical task has been just completed.
431         m_properties.critical_task_allowed = true;
432     }
433     return t;
434 }
435 #else
get_critical_task(d1::task * t,execution_data_ext &,isolation_type,bool)436 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext&, isolation_type, bool /*critical_allowed*/) {
437     return t;
438 }
439 #endif
440 
get_mailbox_task(mail_inbox & my_inbox,execution_data_ext & ed,isolation_type isolation)441 inline d1::task* task_dispatcher::get_mailbox_task(mail_inbox& my_inbox, execution_data_ext& ed, isolation_type isolation) {
442     while (task_proxy* const tp = my_inbox.pop(isolation)) {
443         if (d1::task* result = tp->extract_task<task_proxy::mailbox_bit>()) {
444             ed.original_slot = (unsigned short)(-2);
445             ed.affinity_slot = ed.task_disp->m_thread_data->my_arena_index;
446             return result;
447         }
448         // We have exclusive access to the proxy, and can destroy it.
449         tp->allocator.delete_object(tp, ed);
450     }
451     return nullptr;
452 }
453 
454 template <typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)455 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter) {
456     if (governor::is_itt_present()) {
457         return local_wait_for_all</*ITTPossible = */ true>(t, waiter);
458     } else {
459         return local_wait_for_all</*ITTPossible = */ false>(t, waiter);
460     }
461 }
462 
463 } // namespace r1
464 } // namespace detail
465 } // namespace tbb
466 
467 #endif // _TBB_task_dispatcher_H
468 
469