1 /* 2 Copyright (c) 2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "waiters.h" 19 20 namespace tbb { 21 namespace detail { 22 namespace r1 { 23 24 static inline void spawn_and_notify(d1::task& t, arena_slot* slot, arena* a) { 25 slot->spawn(t); 26 a->advertise_new_work<arena::work_spawned>(); 27 // TODO: TBB_REVAMP_TODO slot->assert_task_pool_valid(); 28 } 29 30 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx) { 31 thread_data* tls = governor::get_thread_data(); 32 task_group_context_impl::bind_to(ctx, tls); 33 arena* a = tls->my_arena; 34 arena_slot* slot = tls->my_arena_slot; 35 // Capture current context 36 task_accessor::context(t) = &ctx; 37 // Mark isolation 38 task_accessor::isolation(t) = tls->my_task_dispatcher->m_execute_data_ext.isolation; 39 spawn_and_notify(t, slot, a); 40 } 41 42 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx, d1::slot_id id) { 43 thread_data* tls = governor::get_thread_data(); 44 task_group_context_impl::bind_to(ctx, tls); 45 arena* a = tls->my_arena; 46 arena_slot* slot = tls->my_arena_slot; 47 48 // Capture context 49 task_accessor::context(t) = &ctx; 50 // Mark isolation 51 isolation_type isolation = tls->my_task_dispatcher->m_execute_data_ext.isolation; 52 task_accessor::isolation(t) = isolation; 53 54 if ( id != d1::no_slot && id != tls->my_arena_index ) { 55 // Allocate proxy task 56 d1::small_object_pool* alloc{}; 57 auto object_pool = tls->my_small_object_pool; 58 auto proxy = static_cast<task_proxy*>(object_pool->allocate_impl(alloc, sizeof(task_proxy))); 59 // Mark as a proxy 60 task_accessor::set_proxy_trait(*proxy); 61 // Mark isolation for the proxy task 62 task_accessor::isolation(*proxy) = isolation; 63 // Deallocation hint (tls) from the task allocator 64 proxy->allocator = alloc; 65 proxy->slot = id; 66 proxy->outbox = &a->mailbox(id); 67 // Mark proxy as present in both locations (sender's task pool and destination mailbox) 68 proxy->task_and_tag = intptr_t(&t) | task_proxy::location_mask; 69 // Mail the proxy - after this point t may be destroyed by another thread at any moment. 70 proxy->outbox->push(proxy); 71 // Spawn proxy to the local task pool 72 spawn_and_notify(*proxy, slot, a); 73 } else { 74 spawn_and_notify(t, slot, a); 75 } 76 } 77 78 void __TBB_EXPORTED_FUNC submit(d1::task& t, d1::task_group_context& ctx, arena* a, std::uintptr_t as_critical) { 79 suppress_unused_warning(as_critical); 80 assert_pointer_valid(a); 81 thread_data& tls = *governor::get_thread_data(); 82 83 // TODO revamp: for each use case investigate neccesity to make this call 84 task_group_context_impl::bind_to(ctx, &tls); 85 task_accessor::context(t) = &ctx; 86 // TODO revamp: consider respecting task isolation if this call is being made by external thread 87 task_accessor::isolation(t) = tls.my_task_dispatcher->m_execute_data_ext.isolation; 88 89 // TODO: consider code refactoring when lane selection mechanism is unified. 90 91 if ( tls.is_attached_to(a) ) { 92 arena_slot* slot = tls.my_arena_slot; 93 #if __TBB_PREVIEW_CRITICAL_TASKS 94 if( as_critical ) { 95 a->my_critical_task_stream.push( &t, subsequent_lane_selector(slot->critical_hint()) ); 96 } else 97 #endif 98 { 99 slot->spawn(t); 100 } 101 } else { 102 random_lane_selector lane_selector{tls.my_random}; 103 #if !__TBB_PREVIEW_CRITICAL_TASKS 104 suppress_unused_warning(as_critical); 105 #else 106 if ( as_critical ) { 107 a->my_critical_task_stream.push( &t, lane_selector ); 108 } else 109 #endif 110 { 111 // Avoid joining the arena the thread is not currently in. 112 a->my_fifo_task_stream.push( &t, lane_selector ); 113 } 114 } 115 // It is assumed that some thread will explicitly wait in the arena the task is submitted 116 // into. Therefore, no need to utilize mandatory concurrency here. 117 a->advertise_new_work<arena::work_spawned>(); 118 } 119 120 void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group_context& t_ctx, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) { 121 task_accessor::context(t) = &t_ctx; 122 task_dispatcher::execute_and_wait(&t, wait_ctx, w_ctx); 123 } 124 125 void __TBB_EXPORTED_FUNC wait(d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) { 126 // Enter the task dispatch loop without a task 127 task_dispatcher::execute_and_wait(nullptr, wait_ctx, w_ctx); 128 } 129 130 d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data* ed) { 131 if (ed) { 132 const execution_data_ext* ed_ext = static_cast<const execution_data_ext*>(ed); 133 assert_pointers_valid(ed_ext->task_disp, ed_ext->task_disp->m_thread_data); 134 return ed_ext->task_disp->m_thread_data->my_arena_index; 135 } else { 136 thread_data* td = governor::get_thread_data_if_initialized(); 137 return td ? int(td->my_arena_index) : -1; 138 } 139 } 140 141 d1::task_group_context* __TBB_EXPORTED_FUNC current_context() { 142 thread_data* td = governor::get_thread_data(); 143 assert_pointers_valid(td, td->my_task_dispatcher); 144 145 task_dispatcher* task_disp = td->my_task_dispatcher; 146 if (task_disp->m_properties.outermost) { 147 // No one task is executed, so no execute_data. 148 return nullptr; 149 } else { 150 return td->my_task_dispatcher->m_execute_data_ext.context; 151 } 152 } 153 154 void task_dispatcher::execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) { 155 // Get an associated task dispatcher 156 thread_data* tls = governor::get_thread_data(); 157 __TBB_ASSERT(tls->my_task_dispatcher != nullptr, nullptr); 158 task_dispatcher& local_td = *tls->my_task_dispatcher; 159 160 // TODO: factor out the binding to execute_and_wait_impl 161 if (t) { 162 task_group_context_impl::bind_to(*task_accessor::context(*t), tls); 163 // Propagate the isolation to the task executed without spawn. 164 task_accessor::isolation(*t) = tls->my_task_dispatcher->m_execute_data_ext.isolation; 165 } 166 167 // Waiting on special object tied to a waiting thread. 168 external_waiter waiter{ *tls->my_arena, wait_ctx }; 169 t = local_td.local_wait_for_all(t, waiter); 170 __TBB_ASSERT_EX(t == nullptr, "External waiter must not leave dispatch loop with a task"); 171 172 // Master (external) thread couldn't exit the dispatch loop in an idle state 173 if (local_td.m_thread_data->my_inbox.is_idle_state(true)) { 174 local_td.m_thread_data->my_inbox.set_is_idle(false); 175 } 176 177 if (w_ctx.my_exception) { 178 __TBB_ASSERT(w_ctx.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 179 w_ctx.my_exception->throw_self(); 180 } 181 } 182 183 #if __TBB_RESUMABLE_TASKS 184 185 #if _WIN32 186 /* [[noreturn]] */ void __stdcall co_local_wait_for_all(void* arg) noexcept 187 #else 188 /* [[noreturn]] */ void co_local_wait_for_all(void* arg) noexcept 189 #endif 190 { 191 // Do not create non-trivial objects on the stack of this function. They will never be destroyed. 192 __TBB_ASSERT(arg != nullptr, nullptr); 193 task_dispatcher& task_disp = *static_cast<task_dispatcher*>(arg); 194 195 assert_pointers_valid(task_disp.m_thread_data, task_disp.m_thread_data->my_arena); 196 task_disp.set_stealing_threshold(task_disp.m_thread_data->my_arena->calculate_stealing_threshold()); 197 __TBB_ASSERT(task_disp.can_steal(), nullptr); 198 task_disp.co_local_wait_for_all(); 199 // This code is unreachable 200 } 201 202 /* [[noreturn]] */ void task_dispatcher::co_local_wait_for_all() noexcept { 203 // Do not create non-trivial objects on the stack of this function. They will never be destroyed. 204 assert_pointer_valid(m_thread_data); 205 206 // Basically calls the user callback passed to the tbb::task::suspend function 207 m_thread_data->do_post_resume_action(); 208 209 // Endless loop here because coroutine could be reused 210 for (;;) { 211 arena* a = m_thread_data->my_arena; 212 coroutine_waiter waiter(*a); 213 d1::task* resume_task = local_wait_for_all(nullptr, waiter); 214 assert_task_valid(resume_task); 215 __TBB_ASSERT(this == m_thread_data->my_task_dispatcher, nullptr); 216 217 m_thread_data->set_post_resume_action(thread_data::post_resume_action::cleanup, this); 218 resume(static_cast<suspend_point_type::resume_task*>(resume_task)->m_target); 219 } 220 // This code is unreachable 221 } 222 223 d1::suspend_point task_dispatcher::get_suspend_point() { 224 if (m_suspend_point == nullptr) { 225 assert_pointer_valid(m_thread_data); 226 // 0 means that we attach this task dispatcher to the current stack 227 init_suspend_point(m_thread_data->my_arena, 0); 228 } 229 assert_pointer_valid(m_suspend_point); 230 return m_suspend_point; 231 } 232 void task_dispatcher::init_suspend_point(arena* a, std::size_t stack_size) { 233 __TBB_ASSERT(m_suspend_point == nullptr, nullptr); 234 m_suspend_point = new(cache_aligned_allocate(sizeof(suspend_point_type))) 235 suspend_point_type(a, stack_size, *this); 236 } 237 #endif /* __TBB_RESUMABLE_TASKS */ 238 } // namespace r1 239 } // namespace detail 240 } // namespace tbb 241 242