xref: /oneTBB/src/tbb/task_dispatcher.cpp (revision fa3268c3)
1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "waiters.h"
19 
20 namespace tbb {
21 namespace detail {
22 namespace r1 {
23 
24 static inline void spawn_and_notify(d1::task& t, arena_slot* slot, arena* a) {
25     slot->spawn(t);
26     a->advertise_new_work<arena::work_spawned>();
27     // TODO: TBB_REVAMP_TODO slot->assert_task_pool_valid();
28 }
29 
30 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx) {
31     thread_data* tls = governor::get_thread_data();
32     task_group_context_impl::bind_to(ctx, tls);
33     arena* a = tls->my_arena;
34     arena_slot* slot = tls->my_arena_slot;
35     // Capture current context
36     task_accessor::context(t) = &ctx;
37     // Mark isolation
38     task_accessor::isolation(t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
39     spawn_and_notify(t, slot, a);
40 }
41 
42 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx, d1::slot_id id) {
43     thread_data* tls = governor::get_thread_data();
44     task_group_context_impl::bind_to(ctx, tls);
45     arena* a = tls->my_arena;
46     arena_slot* slot = tls->my_arena_slot;
47     execution_data_ext& ed = tls->my_task_dispatcher->m_execute_data_ext;
48 
49     // Capture context
50     task_accessor::context(t) = &ctx;
51     // Mark isolation
52     task_accessor::isolation(t) = ed.isolation;
53 
54     if ( id != d1::no_slot && id != tls->my_arena_index ) {
55         // Allocate proxy task
56         d1::small_object_allocator alloc{};
57         auto proxy = alloc.new_object<task_proxy>(static_cast<d1::execution_data&>(ed));
58         // Mark as a proxy
59         task_accessor::set_proxy_trait(*proxy);
60         // Mark isolation for the proxy task
61         task_accessor::isolation(*proxy) = ed.isolation;
62         // Deallocation hint (tls) from the task allocator
63         proxy->allocator = alloc;
64         proxy->slot = id;
65         proxy->outbox = &a->mailbox(id);
66         // Mark proxy as present in both locations (sender's task pool and destination mailbox)
67         proxy->task_and_tag = intptr_t(&t) | task_proxy::location_mask;
68         // Mail the proxy - after this point t may be destroyed by another thread at any moment.
69         proxy->outbox->push(proxy);
70         // Spawn proxy to the local task pool
71         spawn_and_notify(*proxy, slot, a);
72     } else {
73         spawn_and_notify(t, slot, a);
74     }
75 }
76 
77 void __TBB_EXPORTED_FUNC submit(d1::task& t, d1::task_group_context& ctx, arena* a, std::uintptr_t as_critical) {
78     suppress_unused_warning(as_critical);
79     assert_pointer_valid(a);
80     thread_data& tls = *governor::get_thread_data();
81 
82     // TODO revamp: for each use case investigate neccesity to make this call
83     task_group_context_impl::bind_to(ctx, &tls);
84     task_accessor::context(t) = &ctx;
85     // TODO revamp: consider respecting task isolation if this call is being made by external thread
86     task_accessor::isolation(t) = tls.my_task_dispatcher->m_execute_data_ext.isolation;
87 
88     // TODO: consider code refactoring when lane selection mechanism is unified.
89 
90     if ( tls.is_attached_to(a) ) {
91         arena_slot* slot = tls.my_arena_slot;
92 #if __TBB_PREVIEW_CRITICAL_TASKS
93         if( as_critical ) {
94             a->my_critical_task_stream.push( &t, subsequent_lane_selector(slot->critical_hint()) );
95         } else
96 #endif
97         {
98             slot->spawn(t);
99         }
100     } else {
101         random_lane_selector lane_selector{tls.my_random};
102 #if !__TBB_PREVIEW_CRITICAL_TASKS
103         suppress_unused_warning(as_critical);
104 #else
105         if ( as_critical ) {
106             a->my_critical_task_stream.push( &t, lane_selector );
107         } else
108 #endif
109         {
110             // Avoid joining the arena the thread is not currently in.
111             a->my_fifo_task_stream.push( &t, lane_selector );
112         }
113     }
114     // It is assumed that some thread will explicitly wait in the arena the task is submitted
115     // into. Therefore, no need to utilize mandatory concurrency here.
116     a->advertise_new_work<arena::work_spawned>();
117 }
118 
119 void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group_context& t_ctx, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
120     task_accessor::context(t) = &t_ctx;
121     task_dispatcher::execute_and_wait(&t, wait_ctx, w_ctx);
122 }
123 
124 void __TBB_EXPORTED_FUNC wait(d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
125     // Enter the task dispatch loop without a task
126     task_dispatcher::execute_and_wait(nullptr, wait_ctx, w_ctx);
127 }
128 
129 d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data* ed) {
130     if (ed) {
131         const execution_data_ext* ed_ext = static_cast<const execution_data_ext*>(ed);
132         assert_pointers_valid(ed_ext->task_disp, ed_ext->task_disp->m_thread_data);
133         return ed_ext->task_disp->m_thread_data->my_arena_index;
134     } else {
135         thread_data* td = governor::get_thread_data_if_initialized();
136         return td ? int(td->my_arena_index) : -1;
137     }
138 }
139 
140 d1::task_group_context* __TBB_EXPORTED_FUNC current_context() {
141     thread_data* td = governor::get_thread_data();
142     assert_pointers_valid(td, td->my_task_dispatcher);
143 
144     task_dispatcher* task_disp = td->my_task_dispatcher;
145     if (task_disp->m_properties.outermost) {
146         // No one task is executed, so no execute_data.
147         return nullptr;
148     } else {
149         return td->my_task_dispatcher->m_execute_data_ext.context;
150     }
151 }
152 
153 void task_dispatcher::execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
154     // Get an associated task dispatcher
155     thread_data* tls = governor::get_thread_data();
156     __TBB_ASSERT(tls->my_task_dispatcher != nullptr, nullptr);
157     task_dispatcher& local_td = *tls->my_task_dispatcher;
158 
159     // TODO: factor out the binding to execute_and_wait_impl
160     if (t) {
161         task_group_context_impl::bind_to(*task_accessor::context(*t), tls);
162         // Propagate the isolation to the task executed without spawn.
163         task_accessor::isolation(*t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
164     }
165 
166     // Waiting on special object tied to a waiting thread.
167     external_waiter waiter{ *tls->my_arena, wait_ctx };
168     t = local_td.local_wait_for_all(t, waiter);
169     __TBB_ASSERT_EX(t == nullptr, "External waiter must not leave dispatch loop with a task");
170 
171     // The external thread couldn't exit the dispatch loop in an idle state
172     if (local_td.m_thread_data->my_inbox.is_idle_state(true)) {
173         local_td.m_thread_data->my_inbox.set_is_idle(false);
174     }
175 
176     if (w_ctx.my_exception) {
177         __TBB_ASSERT(w_ctx.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
178         w_ctx.my_exception->throw_self();
179     }
180 }
181 
182 #if __TBB_RESUMABLE_TASKS
183 
184 #if _WIN32
185 /* [[noreturn]] */ void __stdcall co_local_wait_for_all(void* addr) noexcept
186 #else
187 /* [[noreturn]] */ void co_local_wait_for_all(unsigned hi, unsigned lo) noexcept
188 #endif
189 {
190 #if !_WIN32
191     std::uintptr_t addr = lo;
192     __TBB_ASSERT(sizeof(addr) == 8 || hi == 0, nullptr);
193     addr += std::uintptr_t(std::uint64_t(hi) << 32);
194 #endif
195     task_dispatcher& task_disp = *reinterpret_cast<task_dispatcher*>(addr);
196     assert_pointers_valid(task_disp.m_thread_data, task_disp.m_thread_data->my_arena);
197     task_disp.set_stealing_threshold(task_disp.m_thread_data->my_arena->calculate_stealing_threshold());
198     __TBB_ASSERT(task_disp.can_steal(), nullptr);
199     task_disp.co_local_wait_for_all();
200     // This code is unreachable
201 }
202 
203 /* [[noreturn]] */ void task_dispatcher::co_local_wait_for_all() noexcept {
204     // Do not create non-trivial objects on the stack of this function. They will never be destroyed.
205     assert_pointer_valid(m_thread_data);
206 
207     // Basically calls the user callback passed to the tbb::task::suspend function
208     m_thread_data->do_post_resume_action();
209 
210     // Endless loop here because coroutine could be reused
211     for (;;) {
212         arena* a = m_thread_data->my_arena;
213         coroutine_waiter waiter(*a);
214         d1::task* resume_task = local_wait_for_all(nullptr, waiter);
215         assert_task_valid(resume_task);
216         __TBB_ASSERT(this == m_thread_data->my_task_dispatcher, nullptr);
217 
218         m_thread_data->set_post_resume_action(thread_data::post_resume_action::cleanup, this);
219         resume(static_cast<suspend_point_type::resume_task*>(resume_task)->m_target);
220     }
221     // This code is unreachable
222 }
223 
224 d1::suspend_point task_dispatcher::get_suspend_point() {
225     if (m_suspend_point == nullptr) {
226         assert_pointer_valid(m_thread_data);
227         // 0 means that we attach this task dispatcher to the current stack
228         init_suspend_point(m_thread_data->my_arena, 0);
229     }
230     assert_pointer_valid(m_suspend_point);
231     return m_suspend_point;
232 }
233 void task_dispatcher::init_suspend_point(arena* a, std::size_t stack_size) {
234     __TBB_ASSERT(m_suspend_point == nullptr, nullptr);
235     m_suspend_point = new(cache_aligned_allocate(sizeof(suspend_point_type)))
236         suspend_point_type(a, stack_size, *this);
237 }
238 #endif /* __TBB_RESUMABLE_TASKS */
239 } // namespace r1
240 } // namespace detail
241 } // namespace tbb
242 
243