1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef _TBB_waiters_H 18 #define _TBB_waiters_H 19 20 #include "oneapi/tbb/detail/_task.h" 21 #include "scheduler_common.h" 22 #include "arena.h" 23 #include "threading_control.h" 24 25 namespace tbb { 26 namespace detail { 27 namespace r1 { 28 29 inline d1::task* get_self_recall_task(arena_slot& slot); 30 31 class waiter_base { 32 public: my_arena(a)33 waiter_base(arena& a, int yields_multiplier = 1) : my_arena(a), my_backoff(int(a.my_num_slots), yields_multiplier) {} 34 pause()35 bool pause() { 36 if (my_backoff.pause()) { 37 my_arena.out_of_work(); 38 return true; 39 } 40 41 return false; 42 } 43 reset_wait()44 void reset_wait() { 45 my_backoff.reset_wait(); 46 } 47 48 protected: 49 arena& my_arena; 50 stealing_loop_backoff my_backoff; 51 }; 52 53 class outermost_worker_waiter : public waiter_base { 54 public: 55 using waiter_base::waiter_base; 56 continue_execution(arena_slot & slot,d1::task * & t)57 bool continue_execution(arena_slot& slot, d1::task*& t) const { 58 __TBB_ASSERT(t == nullptr, nullptr); 59 60 if (is_worker_should_leave(slot)) { 61 // Leave dispatch loop 62 return false; 63 } 64 65 t = get_self_recall_task(slot); 66 return true; 67 } 68 pause(arena_slot &)69 void pause(arena_slot&) { 70 waiter_base::pause(); 71 } 72 73 wait_ctx()74 d1::wait_context* wait_ctx() { 75 return nullptr; 76 } 77 postpone_execution(d1::task &)78 static bool postpone_execution(d1::task&) { 79 return false; 80 } 81 82 private: 83 using base_type = waiter_base; 84 is_worker_should_leave(arena_slot & slot)85 bool is_worker_should_leave(arena_slot& slot) const { 86 bool is_top_priority_arena = my_arena.is_top_priority(); 87 bool is_task_pool_empty = slot.task_pool.load(std::memory_order_relaxed) == EmptyTaskPool; 88 89 if (is_top_priority_arena) { 90 // Worker in most priority arena do not leave arena, until all work in task_pool is done 91 if (is_task_pool_empty && my_arena.is_recall_requested()) { 92 return true; 93 } 94 } else { 95 if (my_arena.is_recall_requested()) { 96 // If worker has work in task pool, we must notify other threads, 97 // because can appear missed wake up of other threads 98 if (!is_task_pool_empty) { 99 my_arena.advertise_new_work<arena::wakeup>(); 100 } 101 return true; 102 } 103 } 104 105 return false; 106 } 107 }; 108 109 class sleep_waiter : public waiter_base { 110 protected: 111 using waiter_base::waiter_base; 112 113 template <typename Pred> sleep(std::uintptr_t uniq_tag,Pred wakeup_condition)114 void sleep(std::uintptr_t uniq_tag, Pred wakeup_condition) { 115 my_arena.get_waiting_threads_monitor().wait<thread_control_monitor::thread_context>(wakeup_condition, 116 market_context{uniq_tag, &my_arena}); 117 reset_wait(); 118 } 119 }; 120 121 class external_waiter : public sleep_waiter { 122 public: external_waiter(arena & a,d1::wait_context & wo)123 external_waiter(arena& a, d1::wait_context& wo) 124 : sleep_waiter(a, /*yields_multiplier*/10), my_wait_ctx(wo) 125 {} 126 continue_execution(arena_slot & slot,d1::task * & t)127 bool continue_execution(arena_slot& slot, d1::task*& t) const { 128 __TBB_ASSERT(t == nullptr, nullptr); 129 if (!my_wait_ctx.continue_execution()) 130 return false; 131 t = get_self_recall_task(slot); 132 return true; 133 } 134 pause(arena_slot &)135 void pause(arena_slot&) { 136 if (!sleep_waiter::pause()) { 137 return; 138 } 139 140 auto wakeup_condition = [&] { return !my_arena.is_empty() || !my_wait_ctx.continue_execution(); }; 141 142 sleep(std::uintptr_t(&my_wait_ctx), wakeup_condition); 143 } 144 wait_ctx()145 d1::wait_context* wait_ctx() { 146 return &my_wait_ctx; 147 } 148 postpone_execution(d1::task &)149 static bool postpone_execution(d1::task&) { 150 return false; 151 } 152 153 private: 154 d1::wait_context& my_wait_ctx; 155 }; 156 157 #if __TBB_RESUMABLE_TASKS 158 159 class coroutine_waiter : public sleep_waiter { 160 public: 161 using sleep_waiter::sleep_waiter; 162 continue_execution(arena_slot & slot,d1::task * & t)163 bool continue_execution(arena_slot& slot, d1::task*& t) const { 164 __TBB_ASSERT(t == nullptr, nullptr); 165 t = get_self_recall_task(slot); 166 return true; 167 } 168 pause(arena_slot & slot)169 void pause(arena_slot& slot) { 170 if (!sleep_waiter::pause()) { 171 return; 172 } 173 174 suspend_point_type* sp = slot.default_task_dispatcher().m_suspend_point; 175 176 auto wakeup_condition = [&] { return !my_arena.is_empty() || sp->m_is_owner_recalled.load(std::memory_order_relaxed); }; 177 178 sleep(std::uintptr_t(sp), wakeup_condition); 179 } 180 wait_ctx()181 d1::wait_context* wait_ctx() { 182 return nullptr; 183 } 184 postpone_execution(d1::task & t)185 static bool postpone_execution(d1::task& t) { 186 return task_accessor::is_resume_task(t); 187 } 188 }; 189 190 #endif // __TBB_RESUMABLE_TASKS 191 192 } // namespace r1 193 } // namespace detail 194 } // namespace tbb 195 196 #endif // _TBB_waiters_H 197