1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "waiters.h" 19 20 namespace tbb { 21 namespace detail { 22 namespace r1 { 23 24 static inline void spawn_and_notify(d1::task& t, arena_slot* slot, arena* a) { 25 slot->spawn(t); 26 a->advertise_new_work<arena::work_spawned>(); 27 // TODO: TBB_REVAMP_TODO slot->assert_task_pool_valid(); 28 } 29 30 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx) { 31 thread_data* tls = governor::get_thread_data(); 32 task_group_context_impl::bind_to(ctx, tls); 33 arena* a = tls->my_arena; 34 arena_slot* slot = tls->my_arena_slot; 35 // Capture current context 36 task_accessor::context(t) = &ctx; 37 // Mark isolation 38 task_accessor::isolation(t) = tls->my_task_dispatcher->m_execute_data_ext.isolation; 39 spawn_and_notify(t, slot, a); 40 } 41 42 void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx, d1::slot_id id) { 43 thread_data* tls = governor::get_thread_data(); 44 task_group_context_impl::bind_to(ctx, tls); 45 arena* a = tls->my_arena; 46 arena_slot* slot = tls->my_arena_slot; 47 execution_data_ext& ed = tls->my_task_dispatcher->m_execute_data_ext; 48 49 // Capture context 50 task_accessor::context(t) = &ctx; 51 // Mark isolation 52 task_accessor::isolation(t) = ed.isolation; 53 54 if ( id != d1::no_slot && id != tls->my_arena_index && id < a->my_num_slots) { 55 // Allocate proxy task 56 d1::small_object_allocator alloc{}; 57 auto proxy = alloc.new_object<task_proxy>(static_cast<d1::execution_data&>(ed)); 58 // Mark as a proxy 59 task_accessor::set_proxy_trait(*proxy); 60 // Mark isolation for the proxy task 61 task_accessor::isolation(*proxy) = ed.isolation; 62 // Deallocation hint (tls) from the task allocator 63 proxy->allocator = alloc; 64 proxy->slot = id; 65 proxy->outbox = &a->mailbox(id); 66 // Mark proxy as present in both locations (sender's task pool and destination mailbox) 67 proxy->task_and_tag = intptr_t(&t) | task_proxy::location_mask; 68 // Mail the proxy - after this point t may be destroyed by another thread at any moment. 69 proxy->outbox->push(proxy); 70 // Spawn proxy to the local task pool 71 spawn_and_notify(*proxy, slot, a); 72 } else { 73 spawn_and_notify(t, slot, a); 74 } 75 } 76 77 void __TBB_EXPORTED_FUNC submit(d1::task& t, d1::task_group_context& ctx, arena* a, std::uintptr_t as_critical) { 78 suppress_unused_warning(as_critical); 79 assert_pointer_valid(a); 80 thread_data& tls = *governor::get_thread_data(); 81 82 // TODO revamp: for each use case investigate neccesity to make this call 83 task_group_context_impl::bind_to(ctx, &tls); 84 task_accessor::context(t) = &ctx; 85 // TODO revamp: consider respecting task isolation if this call is being made by external thread 86 task_accessor::isolation(t) = tls.my_task_dispatcher->m_execute_data_ext.isolation; 87 88 // TODO: consider code refactoring when lane selection mechanism is unified. 89 90 if ( tls.is_attached_to(a) ) { 91 arena_slot* slot = tls.my_arena_slot; 92 #if __TBB_PREVIEW_CRITICAL_TASKS 93 if( as_critical ) { 94 a->my_critical_task_stream.push( &t, subsequent_lane_selector(slot->critical_hint()) ); 95 } else 96 #endif 97 { 98 slot->spawn(t); 99 } 100 } else { 101 random_lane_selector lane_selector{tls.my_random}; 102 #if !__TBB_PREVIEW_CRITICAL_TASKS 103 suppress_unused_warning(as_critical); 104 #else 105 if ( as_critical ) { 106 a->my_critical_task_stream.push( &t, lane_selector ); 107 } else 108 #endif 109 { 110 // Avoid joining the arena the thread is not currently in. 111 a->my_fifo_task_stream.push( &t, lane_selector ); 112 } 113 } 114 // It is assumed that some thread will explicitly wait in the arena the task is submitted 115 // into. Therefore, no need to utilize mandatory concurrency here. 116 a->advertise_new_work<arena::work_spawned>(); 117 } 118 119 void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group_context& t_ctx, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) { 120 task_accessor::context(t) = &t_ctx; 121 task_dispatcher::execute_and_wait(&t, wait_ctx, w_ctx); 122 } 123 124 void __TBB_EXPORTED_FUNC wait(d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) { 125 // Enter the task dispatch loop without a task 126 task_dispatcher::execute_and_wait(nullptr, wait_ctx, w_ctx); 127 } 128 129 d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data* ed) { 130 if (ed) { 131 const execution_data_ext* ed_ext = static_cast<const execution_data_ext*>(ed); 132 assert_pointers_valid(ed_ext->task_disp, ed_ext->task_disp->m_thread_data); 133 return ed_ext->task_disp->m_thread_data->my_arena_index; 134 } else { 135 thread_data* td = governor::get_thread_data_if_initialized(); 136 return td ? td->my_arena_index : d1::slot_id(-1); 137 } 138 } 139 140 d1::task_group_context* __TBB_EXPORTED_FUNC current_context() { 141 thread_data* td = governor::get_thread_data(); 142 assert_pointers_valid(td, td->my_task_dispatcher); 143 144 task_dispatcher* task_disp = td->my_task_dispatcher; 145 if (task_disp->m_properties.outermost) { 146 // No one task is executed, so no execute_data. 147 return nullptr; 148 } else { 149 return td->my_task_dispatcher->m_execute_data_ext.context; 150 } 151 } 152 153 void task_dispatcher::execute_and_wait(d1::task* t, d1::wait_context& wait_ctx, d1::task_group_context& w_ctx) { 154 // Get an associated task dispatcher 155 thread_data* tls = governor::get_thread_data(); 156 __TBB_ASSERT(tls->my_task_dispatcher != nullptr, nullptr); 157 task_dispatcher& local_td = *tls->my_task_dispatcher; 158 159 // TODO: factor out the binding to execute_and_wait_impl 160 if (t) { 161 task_group_context_impl::bind_to(*task_accessor::context(*t), tls); 162 // Propagate the isolation to the task executed without spawn. 163 task_accessor::isolation(*t) = tls->my_task_dispatcher->m_execute_data_ext.isolation; 164 } 165 166 // Waiting on special object tied to a waiting thread. 167 external_waiter waiter{ *tls->my_arena, wait_ctx }; 168 t = local_td.local_wait_for_all(t, waiter); 169 __TBB_ASSERT_EX(t == nullptr, "External waiter must not leave dispatch loop with a task"); 170 171 // The external thread couldn't exit the dispatch loop in an idle state 172 if (local_td.m_thread_data->my_inbox.is_idle_state(true)) { 173 local_td.m_thread_data->my_inbox.set_is_idle(false); 174 } 175 176 auto exception = w_ctx.my_exception.load(std::memory_order_acquire); 177 if (exception) { 178 __TBB_ASSERT(w_ctx.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 179 exception->throw_self(); 180 } 181 } 182 183 #if __TBB_RESUMABLE_TASKS 184 185 #if _WIN32 186 /* [[noreturn]] */ void __stdcall co_local_wait_for_all(void* addr) noexcept 187 #else 188 /* [[noreturn]] */ void co_local_wait_for_all(unsigned hi, unsigned lo) noexcept 189 #endif 190 { 191 #if !_WIN32 192 std::uintptr_t addr = lo; 193 __TBB_ASSERT(sizeof(addr) == 8 || hi == 0, nullptr); 194 addr += std::uintptr_t(std::uint64_t(hi) << 32); 195 #endif 196 task_dispatcher& task_disp = *reinterpret_cast<task_dispatcher*>(addr); 197 assert_pointers_valid(task_disp.m_thread_data, task_disp.m_thread_data->my_arena); 198 task_disp.set_stealing_threshold(task_disp.m_thread_data->my_arena->calculate_stealing_threshold()); 199 __TBB_ASSERT(task_disp.can_steal(), nullptr); 200 task_disp.co_local_wait_for_all(); 201 // This code is unreachable 202 } 203 204 /* [[noreturn]] */ void task_dispatcher::co_local_wait_for_all() noexcept { 205 // Do not create non-trivial objects on the stack of this function. They will never be destroyed. 206 assert_pointer_valid(m_thread_data); 207 208 // Basically calls the user callback passed to the tbb::task::suspend function 209 m_thread_data->do_post_resume_action(); 210 211 // Endless loop here because coroutine could be reused 212 d1::task* resume_task{}; 213 do { 214 arena* a = m_thread_data->my_arena; 215 coroutine_waiter waiter(*a); 216 resume_task = local_wait_for_all(nullptr, waiter); 217 assert_task_valid(resume_task); 218 __TBB_ASSERT(this == m_thread_data->my_task_dispatcher, nullptr); 219 220 m_thread_data->set_post_resume_action(thread_data::post_resume_action::cleanup, this); 221 222 } while (resume(static_cast<suspend_point_type::resume_task*>(resume_task)->m_target)); 223 // This code might be unreachable 224 } 225 226 d1::suspend_point task_dispatcher::get_suspend_point() { 227 if (m_suspend_point == nullptr) { 228 assert_pointer_valid(m_thread_data); 229 // 0 means that we attach this task dispatcher to the current stack 230 init_suspend_point(m_thread_data->my_arena, 0); 231 } 232 assert_pointer_valid(m_suspend_point); 233 return m_suspend_point; 234 } 235 void task_dispatcher::init_suspend_point(arena* a, std::size_t stack_size) { 236 __TBB_ASSERT(m_suspend_point == nullptr, nullptr); 237 m_suspend_point = new(cache_aligned_allocate(sizeof(suspend_point_type))) 238 suspend_point_type(a, stack_size, *this); 239 } 240 #endif /* __TBB_RESUMABLE_TASKS */ 241 } // namespace r1 242 } // namespace detail 243 } // namespace tbb 244