1 /*
2 Copyright (c) 2020-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef _TBB_task_dispatcher_H
18 #define _TBB_task_dispatcher_H
19
20 #include "oneapi/tbb/detail/_utils.h"
21 #include "oneapi/tbb/detail/_task.h"
22 #include "oneapi/tbb/global_control.h"
23
24 #include "scheduler_common.h"
25 #include "waiters.h"
26 #include "arena_slot.h"
27 #include "arena.h"
28 #include "thread_data.h"
29 #include "mailbox.h"
30 #include "itt_notify.h"
31 #include "concurrent_monitor.h"
32 #include "threading_control.h"
33
34 #include <atomic>
35
36 #if !__TBB_CPU_CTL_ENV_PRESENT
37 #include <fenv.h> //
38 #endif
39
40 namespace tbb {
41 namespace detail {
42 namespace r1 {
43
get_self_recall_task(arena_slot & slot)44 inline d1::task* get_self_recall_task(arena_slot& slot) {
45 suppress_unused_warning(slot);
46 d1::task* t = nullptr;
47 #if __TBB_RESUMABLE_TASKS
48 suspend_point_type* sp = slot.default_task_dispatcher().m_suspend_point;
49 if (sp && sp->m_is_owner_recalled.load(std::memory_order_acquire)) {
50 t = &sp->m_resume_task;
51 __TBB_ASSERT(sp->m_resume_task.m_target.m_thread_data == nullptr, nullptr);
52 }
53 #endif /* __TBB_RESUMABLE_TASKS */
54 return t;
55 }
56
57 // Defined in exception.cpp
58 /*[[noreturn]]*/void do_throw_noexcept(void (*throw_exception)()) noexcept;
59
60 //------------------------------------------------------------------------
61 // Suspend point
62 //------------------------------------------------------------------------
63 #if __TBB_RESUMABLE_TASKS
64
execute(d1::execution_data & ed)65 inline d1::task* suspend_point_type::resume_task::execute(d1::execution_data& ed) {
66 execution_data_ext& ed_ext = static_cast<execution_data_ext&>(ed);
67
68 if (ed_ext.wait_ctx) {
69 thread_control_monitor::resume_context monitor_node{{std::uintptr_t(ed_ext.wait_ctx), nullptr}, ed_ext, m_target};
70 // The wait_ctx is present only in external_waiter. In that case we leave the current stack
71 // in the abandoned state to resume when waiting completes.
72 thread_data* td = ed_ext.task_disp->m_thread_data;
73 td->set_post_resume_action(task_dispatcher::post_resume_action::register_waiter, &monitor_node);
74
75 thread_control_monitor& wait_list = td->my_arena->get_waiting_threads_monitor();
76
77 if (wait_list.wait([&] { return !ed_ext.wait_ctx->continue_execution(); }, monitor_node)) {
78 return nullptr;
79 }
80
81 td->clear_post_resume_action();
82 r1::resume(ed_ext.task_disp->get_suspend_point());
83 } else {
84 // If wait_ctx is null, it can be only a worker thread on outermost level because
85 // coroutine_waiter interrupts bypass loop before the resume_task execution.
86 ed_ext.task_disp->m_thread_data->set_post_resume_action(task_dispatcher::post_resume_action::notify,
87 ed_ext.task_disp->get_suspend_point());
88 }
89 // Do not access this task because it might be destroyed
90 ed_ext.task_disp->resume(m_target);
91 return nullptr;
92 }
93
suspend_point_type(arena * a,size_t stack_size,task_dispatcher & task_disp)94 inline suspend_point_type::suspend_point_type(arena* a, size_t stack_size, task_dispatcher& task_disp)
95 : m_arena(a)
96 , m_random(this)
97 , m_co_context(stack_size, &task_disp)
98 , m_resume_task(task_disp)
99 {
100 assert_pointer_valid(m_arena);
101 assert_pointer_valid(m_arena->my_default_ctx);
102 task_accessor::context(m_resume_task) = m_arena->my_default_ctx;
103 task_accessor::isolation(m_resume_task) = no_isolation;
104 // Initialize the itt_caller for the context of the resume task.
105 // It will be bound to the stack of the first suspend call.
106 task_group_context_impl::bind_to(*task_accessor::context(m_resume_task), task_disp.m_thread_data);
107 }
108
109 #endif /* __TBB_RESUMABLE_TASKS */
110
111 //------------------------------------------------------------------------
112 // Task Dispatcher
113 //------------------------------------------------------------------------
task_dispatcher(arena * a)114 inline task_dispatcher::task_dispatcher(arena* a) {
115 m_execute_data_ext.context = a->my_default_ctx;
116 m_execute_data_ext.task_disp = this;
117 }
118
can_steal()119 inline bool task_dispatcher::can_steal() {
120 __TBB_ASSERT(m_stealing_threshold != 0, nullptr);
121 stack_anchor_type anchor{};
122 return reinterpret_cast<std::uintptr_t>(&anchor) > m_stealing_threshold;
123 }
124
get_inbox_or_critical_task(execution_data_ext & ed,mail_inbox & inbox,isolation_type isolation,bool critical_allowed)125 inline d1::task* task_dispatcher::get_inbox_or_critical_task(
126 execution_data_ext& ed, mail_inbox& inbox, isolation_type isolation, bool critical_allowed)
127 {
128 if (inbox.empty())
129 return nullptr;
130 d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
131 if (result)
132 return result;
133 // Check if there are tasks mailed to this thread via task-to-thread affinity mechanism.
134 result = get_mailbox_task(inbox, ed, isolation);
135 // There is a race with a thread adding a new task (possibly with suitable isolation)
136 // to our mailbox, so the below conditions might result in a false positive.
137 // Then set_is_idle(false) allows that task to be stolen; it's OK.
138 if (isolation != no_isolation && !result && !inbox.empty() && inbox.is_idle_state(true)) {
139 // We have proxy tasks in our mailbox but the isolation blocks their execution.
140 // So publish the proxy tasks in mailbox to be available for stealing from owner's task pool.
141 inbox.set_is_idle( false );
142 }
143 return result;
144 }
145
get_stream_or_critical_task(execution_data_ext & ed,arena & a,task_stream<front_accessor> & stream,unsigned & hint,isolation_type isolation,bool critical_allowed)146 inline d1::task* task_dispatcher::get_stream_or_critical_task(
147 execution_data_ext& ed, arena& a, task_stream<front_accessor>& stream, unsigned& hint,
148 isolation_type isolation, bool critical_allowed)
149 {
150 if (stream.empty())
151 return nullptr;
152 d1::task* result = get_critical_task(nullptr, ed, isolation, critical_allowed);
153 if (result)
154 return result;
155 return a.get_stream_task(stream, hint);
156 }
157
steal_or_get_critical(execution_data_ext & ed,arena & a,unsigned arena_index,FastRandom & random,isolation_type isolation,bool critical_allowed)158 inline d1::task* task_dispatcher::steal_or_get_critical(
159 execution_data_ext& ed, arena& a, unsigned arena_index, FastRandom& random,
160 isolation_type isolation, bool critical_allowed)
161 {
162 if (d1::task* t = a.steal_task(arena_index, random, ed, isolation)) {
163 ed.context = task_accessor::context(*t);
164 ed.isolation = task_accessor::isolation(*t);
165 return get_critical_task(t, ed, isolation, critical_allowed);
166 }
167 return nullptr;
168 }
169
170 template <bool ITTPossible, typename Waiter>
receive_or_steal_task(thread_data & tls,execution_data_ext & ed,Waiter & waiter,isolation_type isolation,bool fifo_allowed,bool critical_allowed)171 d1::task* task_dispatcher::receive_or_steal_task(
172 thread_data& tls, execution_data_ext& ed, Waiter& waiter, isolation_type isolation,
173 bool fifo_allowed, bool critical_allowed)
174 {
175 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
176 // Task to return
177 d1::task* t = nullptr;
178 // Get tls data (again)
179 arena& a = *tls.my_arena;
180 arena_slot& slot = *tls.my_arena_slot;
181 unsigned arena_index = tls.my_arena_index;
182 mail_inbox& inbox = tls.my_inbox;
183 task_stream<front_accessor>& resume_stream = a.my_resume_task_stream;
184 unsigned& resume_hint = slot.hint_for_resume_stream;
185 task_stream<front_accessor>& fifo_stream = a.my_fifo_task_stream;
186 unsigned& fifo_hint = slot.hint_for_fifo_stream;
187
188 waiter.reset_wait();
189 // Thread is in idle state now
190 inbox.set_is_idle(true);
191
192 bool stealing_is_allowed = can_steal();
193
194 // Stealing loop mailbox/enqueue/other_slots
195 for (;;) {
196 __TBB_ASSERT(t == nullptr, nullptr);
197 // Check if the resource manager requires our arena to relinquish some threads
198 // For the external thread restore idle state to true after dispatch loop
199 if (!waiter.continue_execution(slot, t)) {
200 __TBB_ASSERT(t == nullptr, nullptr);
201 break;
202 }
203 // Start searching
204 if (t != nullptr) {
205 // continue_execution returned a task
206 }
207 else if ((t = get_inbox_or_critical_task(ed, inbox, isolation, critical_allowed))) {
208 // Successfully got the task from mailbox or critical task
209 }
210 else if ((t = get_stream_or_critical_task(ed, a, resume_stream, resume_hint, isolation, critical_allowed))) {
211 // Successfully got the resume or critical task
212 }
213 else if (fifo_allowed && isolation == no_isolation
214 && (t = get_stream_or_critical_task(ed, a, fifo_stream, fifo_hint, isolation, critical_allowed))) {
215 // Checked if there are tasks in starvation-resistant stream. Only allowed at the outermost dispatch level without isolation.
216 }
217 else if (stealing_is_allowed
218 && (t = steal_or_get_critical(ed, a, arena_index, tls.my_random, isolation, critical_allowed))) {
219 // Stole a task from a random arena slot
220 }
221 else {
222 t = get_critical_task(t, ed, isolation, critical_allowed);
223 }
224
225 if (t != nullptr) {
226 ed.context = task_accessor::context(*t);
227 ed.isolation = task_accessor::isolation(*t);
228 a.my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
229 break; // Stealing success, end of stealing attempt
230 }
231 // Nothing to do, pause a little.
232 waiter.pause(slot);
233 } // end of nonlocal task retrieval loop
234
235 __TBB_ASSERT(is_alive(a.my_guard), nullptr);
236 if (inbox.is_idle_state(true)) {
237 inbox.set_is_idle(false);
238 }
239 return t;
240 }
241
242 template <bool ITTPossible, typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)243 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter ) {
244 assert_pointer_valid(m_thread_data);
245 __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
246
247 // Guard an outer/default execution state
248 struct dispatch_loop_guard {
249 task_dispatcher& task_disp;
250 execution_data_ext old_execute_data_ext;
251 properties old_properties;
252
253 ~dispatch_loop_guard() {
254 task_disp.m_execute_data_ext = old_execute_data_ext;
255 task_disp.m_properties = old_properties;
256
257 __TBB_ASSERT(task_disp.m_thread_data && governor::is_thread_data_set(task_disp.m_thread_data), nullptr);
258 __TBB_ASSERT(task_disp.m_thread_data->my_task_dispatcher == &task_disp, nullptr);
259 }
260 } dl_guard{ *this, m_execute_data_ext, m_properties };
261
262 // The context guard to track fp setting and itt tasks.
263 context_guard_helper</*report_tasks=*/ITTPossible> context_guard;
264
265 // Current isolation context
266 const isolation_type isolation = dl_guard.old_execute_data_ext.isolation;
267
268 // Critical work inflection point. Once turned false current execution context has taken
269 // critical task on the previous stack frame and cannot take more until that critical path is
270 // finished.
271 bool critical_allowed = dl_guard.old_properties.critical_task_allowed;
272
273 // Extended execution data that is used for dispatching.
274 // Base version is passed to the task::execute method.
275 execution_data_ext& ed = m_execute_data_ext;
276 ed.context = t ? task_accessor::context(*t) : nullptr;
277 ed.original_slot = m_thread_data->my_arena_index;
278 ed.affinity_slot = d1::no_slot;
279 ed.task_disp = this;
280 ed.wait_ctx = waiter.wait_ctx();
281
282 m_properties.outermost = false;
283 m_properties.fifo_tasks_allowed = false;
284
285 t = get_critical_task(t, ed, isolation, critical_allowed);
286 if (t && m_thread_data->my_inbox.is_idle_state(true)) {
287 // The thread has a work to do. Therefore, marking its inbox as not idle so that
288 // affinitized tasks can be stolen from it.
289 m_thread_data->my_inbox.set_is_idle(false);
290 }
291
292 // Infinite exception loop
293 for (;;) {
294 try {
295 // Main execution loop
296 do {
297 // We assume that bypass tasks are from the same task group.
298 context_guard.set_ctx(ed.context);
299 // Inner level evaluates tasks coming from nesting loops and those returned
300 // by just executed tasks (bypassing spawn or enqueue calls).
301 while (t != nullptr) {
302 assert_task_valid(t);
303 assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
304 __TBB_ASSERT(ed.context->my_state == d1::task_group_context::state::bound ||
305 ed.context->my_state == d1::task_group_context::state::isolated, nullptr);
306 __TBB_ASSERT(m_thread_data->my_inbox.is_idle_state(false), nullptr);
307 __TBB_ASSERT(task_accessor::is_resume_task(*t) || isolation == no_isolation || isolation == ed.isolation, nullptr);
308 // Check premature leave
309 if (Waiter::postpone_execution(*t)) {
310 __TBB_ASSERT(task_accessor::is_resume_task(*t) && dl_guard.old_properties.outermost,
311 "Currently, the bypass loop can be interrupted only for resume task on outermost level");
312 return t;
313 }
314 // Copy itt_caller to a stack because the context might be destroyed after t->execute.
315 void* itt_caller = ed.context->my_itt_caller;
316 suppress_unused_warning(itt_caller);
317
318 ITT_CALLEE_ENTER(ITTPossible, t, itt_caller);
319
320 if (ed.context->is_group_execution_cancelled()) {
321 t = t->cancel(ed);
322 } else {
323 t = t->execute(ed);
324 }
325
326 ITT_CALLEE_LEAVE(ITTPossible, itt_caller);
327
328 // The task affinity in execution data is set for affinitized tasks.
329 // So drop it after the task execution.
330 ed.affinity_slot = d1::no_slot;
331 // Reset task owner id for bypassed task
332 ed.original_slot = m_thread_data->my_arena_index;
333 t = get_critical_task(t, ed, isolation, critical_allowed);
334 }
335 __TBB_ASSERT(m_thread_data && governor::is_thread_data_set(m_thread_data), nullptr);
336 __TBB_ASSERT(m_thread_data->my_task_dispatcher == this, nullptr);
337 // When refactoring, pay attention that m_thread_data can be changed after t->execute()
338 __TBB_ASSERT(m_thread_data->my_arena_slot != nullptr, nullptr);
339 arena_slot& slot = *m_thread_data->my_arena_slot;
340 if (!waiter.continue_execution(slot, t)) {
341 break;
342 }
343 // Retrieve the task from local task pool
344 if (t || (slot.is_task_pool_published() && (t = slot.get_task(ed, isolation)))) {
345 __TBB_ASSERT(ed.original_slot == m_thread_data->my_arena_index, nullptr);
346 ed.context = task_accessor::context(*t);
347 ed.isolation = task_accessor::isolation(*t);
348 continue;
349 }
350 // Retrieve the task from global sources
351 t = receive_or_steal_task<ITTPossible>(
352 *m_thread_data, ed, waiter, isolation, dl_guard.old_properties.fifo_tasks_allowed,
353 critical_allowed
354 );
355 } while (t != nullptr); // main dispatch loop
356 break; // Exit exception loop;
357 } catch (...) {
358 if (global_control::active_value(global_control::terminate_on_exception) == 1) {
359 do_throw_noexcept([] { throw; });
360 }
361 if (ed.context->cancel_group_execution()) {
362 /* We are the first to signal cancellation, so store the exception that caused it. */
363 ed.context->my_exception.store(tbb_exception_ptr::allocate(), std::memory_order_release);
364 }
365 }
366 } // Infinite exception loop
367 __TBB_ASSERT(t == nullptr, nullptr);
368
369
370 #if __TBB_RESUMABLE_TASKS
371 if (dl_guard.old_properties.outermost) {
372 recall_point();
373 }
374 #endif /* __TBB_RESUMABLE_TASKS */
375
376 return nullptr;
377 }
378
379 #if __TBB_RESUMABLE_TASKS
recall_point()380 inline void task_dispatcher::recall_point() {
381 if (this != &m_thread_data->my_arena_slot->default_task_dispatcher()) {
382 __TBB_ASSERT(m_suspend_point != nullptr, nullptr);
383 __TBB_ASSERT(m_suspend_point->m_is_owner_recalled.load(std::memory_order_relaxed) == false, nullptr);
384
385 m_thread_data->set_post_resume_action(post_resume_action::notify, get_suspend_point());
386 internal_suspend();
387
388 if (m_thread_data->my_inbox.is_idle_state(true)) {
389 m_thread_data->my_inbox.set_is_idle(false);
390 }
391 }
392 }
393 #endif /* __TBB_RESUMABLE_TASKS */
394
395 #if __TBB_PREVIEW_CRITICAL_TASKS
get_critical_task(d1::task * t,execution_data_ext & ed,isolation_type isolation,bool critical_allowed)396 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext& ed, isolation_type isolation, bool critical_allowed) {
397 __TBB_ASSERT( critical_allowed || !m_properties.critical_task_allowed, nullptr );
398
399 if (!critical_allowed) {
400 // The stack is already in the process of critical path execution. Cannot take another
401 // critical work until finish with the current one.
402 __TBB_ASSERT(!m_properties.critical_task_allowed, nullptr);
403 return t;
404 }
405
406 assert_pointers_valid(m_thread_data, m_thread_data->my_arena, m_thread_data->my_arena_slot);
407 thread_data& td = *m_thread_data;
408 arena& a = *td.my_arena;
409 arena_slot& slot = *td.my_arena_slot;
410
411 d1::task* crit_t = a.get_critical_task(slot.hint_for_critical_stream, isolation);
412 if (crit_t != nullptr) {
413 assert_task_valid(crit_t);
414 if (t != nullptr) {
415 assert_pointer_valid</*alignment = */alignof(void*)>(ed.context);
416 r1::spawn(*t, *ed.context);
417 }
418 ed.context = task_accessor::context(*crit_t);
419 ed.isolation = task_accessor::isolation(*crit_t);
420
421 // We cannot execute more than one critical task on the same stack.
422 // In other words, we prevent nested critical tasks.
423 m_properties.critical_task_allowed = false;
424
425 // TODO: add a test that the observer is called when critical task is taken.
426 a.my_observers.notify_entry_observers(td.my_last_observer, td.my_is_worker);
427 t = crit_t;
428 } else {
429 // Was unable to find critical work in the queue. Allow inspecting the queue in nested
430 // invocations. Handles the case when critical task has been just completed.
431 m_properties.critical_task_allowed = true;
432 }
433 return t;
434 }
435 #else
get_critical_task(d1::task * t,execution_data_ext &,isolation_type,bool)436 inline d1::task* task_dispatcher::get_critical_task(d1::task* t, execution_data_ext&, isolation_type, bool /*critical_allowed*/) {
437 return t;
438 }
439 #endif
440
get_mailbox_task(mail_inbox & my_inbox,execution_data_ext & ed,isolation_type isolation)441 inline d1::task* task_dispatcher::get_mailbox_task(mail_inbox& my_inbox, execution_data_ext& ed, isolation_type isolation) {
442 while (task_proxy* const tp = my_inbox.pop(isolation)) {
443 if (d1::task* result = tp->extract_task<task_proxy::mailbox_bit>()) {
444 ed.original_slot = (unsigned short)(-2);
445 ed.affinity_slot = ed.task_disp->m_thread_data->my_arena_index;
446 return result;
447 }
448 // We have exclusive access to the proxy, and can destroy it.
449 tp->allocator.delete_object(tp, ed);
450 }
451 return nullptr;
452 }
453
454 template <typename Waiter>
local_wait_for_all(d1::task * t,Waiter & waiter)455 d1::task* task_dispatcher::local_wait_for_all(d1::task* t, Waiter& waiter) {
456 if (governor::is_itt_present()) {
457 return local_wait_for_all</*ITTPossible = */ true>(t, waiter);
458 } else {
459 return local_wait_for_all</*ITTPossible = */ false>(t, waiter);
460 }
461 }
462
463 } // namespace r1
464 } // namespace detail
465 } // namespace tbb
466
467 #endif // _TBB_task_dispatcher_H
468
469