xref: /oneTBB/src/tbb/task_dispatcher.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "task_dispatcher.h"
18 #include "waiters.h"
19 
20 namespace tbb {
21 namespace detail {
22 namespace r1 {
23 
24 static inline void spawn_and_notify(d1::task& t, arena_slot* slot, arena* a) {
25     slot->spawn(t);
26     a->advertise_new_work<arena::work_spawned>();
27     // TODO: TBB_REVAMP_TODO slot->assert_task_pool_valid();
28 }
29 
30 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx) {
31     thread_data* tls = governor::get_thread_data();
32     task_group_context_impl::bind_to(ctx, tls);
33     arena* a = tls->my_arena;
34     arena_slot* slot = tls->my_arena_slot;
35     // Capture current context
36     task_accessor::context(t) = &ctx;
37     // Mark isolation
38     task_accessor::isolation(t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
39     spawn_and_notify(t, slot, a);
40 }
41 
42 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx, d1::slot_id id) {
43     thread_data* tls = governor::get_thread_data();
44     task_group_context_impl::bind_to(ctx, tls);
45     arena* a = tls->my_arena;
46     arena_slot* slot = tls->my_arena_slot;
47 
48     // Capture context
49     task_accessor::context(t) = &ctx;
50     // Mark isolation
51     isolation_type isolation = tls->my_task_dispatcher->m_execute_data_ext.isolation;
52     task_accessor::isolation(t) = isolation;
53 
54     if ( id != d1::no_slot && id != tls->my_arena_index ) {
55         // Allocate proxy task
56         d1::small_object_pool* alloc{};
57         auto object_pool = tls->my_small_object_pool;
58         auto proxy = static_cast<task_proxy*>(object_pool->allocate_impl(alloc, sizeof(task_proxy)));
59         // Mark as a proxy
60         task_accessor::set_proxy_trait(*proxy);
61         // Mark isolation for the proxy task
62         task_accessor::isolation(*proxy) = isolation;
63         // Deallocation hint (tls) from the task allocator
64         proxy->allocator = alloc;
65         proxy->slot = id;
66         proxy->outbox = &a->mailbox(id);
67         // Mark proxy as present in both locations (sender's task pool and destination mailbox)
68         proxy->task_and_tag = intptr_t(&t) | task_proxy::location_mask;
69         // Mail the proxy - after this point t may be destroyed by another thread at any moment.
70         proxy->outbox->push(proxy);
71         // Spawn proxy to the local task pool
72         spawn_and_notify(*proxy, slot, a);
73     } else {
74         spawn_and_notify(t, slot, a);
75     }
76 }
77 
78 void __TBB_EXPORTED_FUNC submit(d1::task& t, d1::task_group_context& ctx, arena* a, std::uintptr_t as_critical) {
79     suppress_unused_warning(as_critical);
80     assert_pointer_valid(a);
81     thread_data& tls = *governor::get_thread_data();
82 
83     // TODO revamp: for each use case investigate neccesity to make this call
84     task_group_context_impl::bind_to(ctx, &tls);
85     task_accessor::context(t) = &ctx;
86     // TODO revamp: consider respecting task isolation if this call is being made by external thread
87     task_accessor::isolation(t) = tls.my_task_dispatcher->m_execute_data_ext.isolation;
88 
89     // TODO: consider code refactoring when lane selection mechanism is unified.
90 
91     if ( tls.is_attached_to(a) ) {
92         arena_slot* slot = tls.my_arena_slot;
93 #if __TBB_PREVIEW_CRITICAL_TASKS
94         if( as_critical ) {
95             a->my_critical_task_stream.push( &t, subsequent_lane_selector(slot->critical_hint()) );
96         } else
97 #endif
98         {
99             slot->spawn(t);
100         }
101     } else {
102         random_lane_selector lane_selector{tls.my_random};
103 #if !__TBB_PREVIEW_CRITICAL_TASKS
104         suppress_unused_warning(as_critical);
105 #else
106         if ( as_critical ) {
107             a->my_critical_task_stream.push( &t, lane_selector );
108         } else
109 #endif
110         {
111             // Avoid joining the arena the thread is not currently in.
112             a->my_fifo_task_stream.push( &t, lane_selector );
113         }
114     }
115     // It is assumed that some thread will explicitly wait in the arena the task is submitted
116     // into. Therefore, no need to utilize mandatory concurrency here.
117     a->advertise_new_work<arena::work_spawned>();
118 }
119 
120 void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group_context& t_ctx, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
121     task_accessor::context(t) = &t_ctx;
122     task_dispatcher::execute_and_wait(&t, wait_ctx, w_ctx);
123 }
124 
125 void __TBB_EXPORTED_FUNC wait(d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
126     // Enter the task dispatch loop without a task
127     task_dispatcher::execute_and_wait(nullptr, wait_ctx, w_ctx);
128 }
129 
130 d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data* ed) {
131     if (ed) {
132         const execution_data_ext* ed_ext = static_cast<const execution_data_ext*>(ed);
133         assert_pointers_valid(ed_ext->task_disp, ed_ext->task_disp->m_thread_data);
134         return ed_ext->task_disp->m_thread_data->my_arena_index;
135     } else {
136         thread_data* td = governor::get_thread_data_if_initialized();
137         return td ? int(td->my_arena_index) : -1;
138     }
139 }
140 
141 d1::task_group_context* __TBB_EXPORTED_FUNC current_context() {
142     thread_data* td = governor::get_thread_data();
143     assert_pointers_valid(td, td->my_task_dispatcher);
144 
145     task_dispatcher* task_disp = td->my_task_dispatcher;
146     if (task_disp->m_properties.outermost) {
147         // No one task is executed, so no execute_data.
148         return nullptr;
149     } else {
150         return td->my_task_dispatcher->m_execute_data_ext.context;
151     }
152 }
153 
154 void task_dispatcher::execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) {
155     // Get an associated task dispatcher
156     thread_data* tls = governor::get_thread_data();
157     __TBB_ASSERT(tls->my_task_dispatcher != nullptr, nullptr);
158     task_dispatcher& local_td = *tls->my_task_dispatcher;
159 
160     // TODO: factor out the binding to execute_and_wait_impl
161     if (t) {
162         task_group_context_impl::bind_to(*task_accessor::context(*t), tls);
163         // Propagate the isolation to the task executed without spawn.
164         task_accessor::isolation(*t) = tls->my_task_dispatcher->m_execute_data_ext.isolation;
165     }
166 
167     // Waiting on special object tied to a waiting thread.
168     external_waiter waiter{ *tls->my_arena, wait_ctx };
169     t = local_td.local_wait_for_all(t, waiter);
170     __TBB_ASSERT_EX(t == nullptr, "External waiter must not leave dispatch loop with a task");
171 
172     // Master (external) thread couldn't exit the dispatch loop in an idle state
173     if (local_td.m_thread_data->my_inbox.is_idle_state(true)) {
174         local_td.m_thread_data->my_inbox.set_is_idle(false);
175     }
176 
177     if (w_ctx.my_exception) {
178         __TBB_ASSERT(w_ctx.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
179         w_ctx.my_exception->throw_self();
180     }
181 }
182 
183 #if __TBB_RESUMABLE_TASKS
184 
185 #if _WIN32
186 /* [[noreturn]] */ void __stdcall co_local_wait_for_all(void* arg) noexcept
187 #else
188 /* [[noreturn]] */ void co_local_wait_for_all(void* arg)  noexcept
189 #endif
190 {
191     // Do not create non-trivial objects on the stack of this function. They will never be destroyed.
192     __TBB_ASSERT(arg != nullptr, nullptr);
193     task_dispatcher& task_disp = *static_cast<task_dispatcher*>(arg);
194 
195     assert_pointers_valid(task_disp.m_thread_data, task_disp.m_thread_data->my_arena);
196     task_disp.set_stealing_threshold(task_disp.m_thread_data->my_arena->calculate_stealing_threshold());
197     __TBB_ASSERT(task_disp.can_steal(), nullptr);
198     task_disp.co_local_wait_for_all();
199     // This code is unreachable
200 }
201 
202 /* [[noreturn]] */ void task_dispatcher::co_local_wait_for_all() noexcept {
203     // Do not create non-trivial objects on the stack of this function. They will never be destroyed.
204     assert_pointer_valid(m_thread_data);
205 
206     // Basically calls the user callback passed to the tbb::task::suspend function
207     m_thread_data->do_post_resume_action();
208 
209     // Endless loop here because coroutine could be reused
210     for (;;) {
211         arena* a = m_thread_data->my_arena;
212         coroutine_waiter waiter(*a);
213         d1::task* resume_task = local_wait_for_all(nullptr, waiter);
214         assert_task_valid(resume_task);
215         __TBB_ASSERT(this == m_thread_data->my_task_dispatcher, nullptr);
216 
217         m_thread_data->set_post_resume_action(thread_data::post_resume_action::cleanup, this);
218         resume(static_cast<suspend_point_type::resume_task*>(resume_task)->m_target);
219     }
220     // This code is unreachable
221 }
222 
223 d1::suspend_point task_dispatcher::get_suspend_point() {
224     if (m_suspend_point == nullptr) {
225         assert_pointer_valid(m_thread_data);
226         // 0 means that we attach this task dispatcher to the current stack
227         init_suspend_point(m_thread_data->my_arena, 0);
228     }
229     assert_pointer_valid(m_suspend_point);
230     return m_suspend_point;
231 }
232 void task_dispatcher::init_suspend_point(arena* a, std::size_t stack_size) {
233     __TBB_ASSERT(m_suspend_point == nullptr, nullptr);
234     m_suspend_point = new(cache_aligned_allocate(sizeof(suspend_point_type)))
235         suspend_point_type(a, stack_size, *this);
236 }
237 #endif /* __TBB_RESUMABLE_TASKS */
238 } // namespace r1
239 } // namespace detail
240 } // namespace tbb
241 
242