xref: /oneTBB/src/tbb/waiters.h (revision f71c92ae)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef _TBB_waiters_H
18 #define _TBB_waiters_H
19 
20 #include "oneapi/tbb/detail/_task.h"
21 #include "scheduler_common.h"
22 #include "arena.h"
23 #include "threading_control.h"
24 
25 namespace tbb {
26 namespace detail {
27 namespace r1 {
28 
29 inline d1::task* get_self_recall_task(arena_slot& slot);
30 
31 class waiter_base {
32 public:
my_arena(a)33     waiter_base(arena& a, int yields_multiplier = 1) : my_arena(a), my_backoff(int(a.my_num_slots), yields_multiplier) {}
34 
pause()35     bool pause() {
36         if (my_backoff.pause()) {
37             my_arena.out_of_work();
38             return true;
39         }
40 
41         return false;
42     }
43 
reset_wait()44     void reset_wait() {
45         my_backoff.reset_wait();
46     }
47 
48 protected:
49     arena& my_arena;
50     stealing_loop_backoff my_backoff;
51 };
52 
53 class outermost_worker_waiter : public waiter_base {
54 public:
55     using waiter_base::waiter_base;
56 
continue_execution(arena_slot & slot,d1::task * & t)57     bool continue_execution(arena_slot& slot, d1::task*& t) const {
58         __TBB_ASSERT(t == nullptr, nullptr);
59 
60         if (is_worker_should_leave(slot)) {
61             // Leave dispatch loop
62             return false;
63         }
64 
65         t = get_self_recall_task(slot);
66         return true;
67     }
68 
pause(arena_slot &)69     void pause(arena_slot&) {
70         waiter_base::pause();
71     }
72 
73 
wait_ctx()74     d1::wait_context* wait_ctx() {
75         return nullptr;
76     }
77 
postpone_execution(d1::task &)78     static bool postpone_execution(d1::task&) {
79         return false;
80     }
81 
82 private:
83     using base_type = waiter_base;
84 
is_worker_should_leave(arena_slot & slot)85     bool is_worker_should_leave(arena_slot& slot) const {
86         bool is_top_priority_arena = my_arena.is_top_priority();
87         bool is_task_pool_empty = slot.task_pool.load(std::memory_order_relaxed) == EmptyTaskPool;
88 
89         if (is_top_priority_arena) {
90             // Worker in most priority arena do not leave arena, until all work in task_pool is done
91             if (is_task_pool_empty && my_arena.is_recall_requested()) {
92                 return true;
93             }
94         } else {
95             if (my_arena.is_recall_requested()) {
96                 // If worker has work in task pool, we must notify other threads,
97                 // because can appear missed wake up of other threads
98                 if (!is_task_pool_empty) {
99                     my_arena.advertise_new_work<arena::wakeup>();
100                 }
101                 return true;
102             }
103         }
104 
105         return false;
106     }
107 };
108 
109 class sleep_waiter : public waiter_base {
110 protected:
111     using waiter_base::waiter_base;
112 
113     template <typename Pred>
sleep(std::uintptr_t uniq_tag,Pred wakeup_condition)114     void sleep(std::uintptr_t uniq_tag, Pred wakeup_condition) {
115         my_arena.get_waiting_threads_monitor().wait<thread_control_monitor::thread_context>(wakeup_condition,
116             market_context{uniq_tag, &my_arena});
117         reset_wait();
118     }
119 };
120 
121 class external_waiter : public sleep_waiter {
122 public:
external_waiter(arena & a,d1::wait_context & wo)123     external_waiter(arena& a, d1::wait_context& wo)
124         : sleep_waiter(a, /*yields_multiplier*/10), my_wait_ctx(wo)
125         {}
126 
continue_execution(arena_slot & slot,d1::task * & t)127     bool continue_execution(arena_slot& slot, d1::task*& t) const {
128         __TBB_ASSERT(t == nullptr, nullptr);
129         if (!my_wait_ctx.continue_execution())
130             return false;
131         t = get_self_recall_task(slot);
132         return true;
133     }
134 
pause(arena_slot &)135     void pause(arena_slot&) {
136         if (!sleep_waiter::pause()) {
137             return;
138         }
139 
140         auto wakeup_condition = [&] { return !my_arena.is_empty() || !my_wait_ctx.continue_execution(); };
141 
142         sleep(std::uintptr_t(&my_wait_ctx), wakeup_condition);
143     }
144 
wait_ctx()145     d1::wait_context* wait_ctx() {
146         return &my_wait_ctx;
147     }
148 
postpone_execution(d1::task &)149     static bool postpone_execution(d1::task&) {
150         return false;
151     }
152 
153 private:
154     d1::wait_context& my_wait_ctx;
155 };
156 
157 #if __TBB_RESUMABLE_TASKS
158 
159 class coroutine_waiter : public sleep_waiter {
160 public:
161     using sleep_waiter::sleep_waiter;
162 
continue_execution(arena_slot & slot,d1::task * & t)163     bool continue_execution(arena_slot& slot, d1::task*& t) const {
164         __TBB_ASSERT(t == nullptr, nullptr);
165         t = get_self_recall_task(slot);
166         return true;
167     }
168 
pause(arena_slot & slot)169     void pause(arena_slot& slot) {
170         if (!sleep_waiter::pause()) {
171             return;
172         }
173 
174         suspend_point_type* sp = slot.default_task_dispatcher().m_suspend_point;
175 
176         auto wakeup_condition = [&] { return !my_arena.is_empty() || sp->m_is_owner_recalled.load(std::memory_order_relaxed); };
177 
178         sleep(std::uintptr_t(sp), wakeup_condition);
179     }
180 
wait_ctx()181     d1::wait_context* wait_ctx() {
182         return nullptr;
183     }
184 
postpone_execution(d1::task & t)185     static bool postpone_execution(d1::task& t) {
186         return task_accessor::is_resume_task(t);
187     }
188 };
189 
190 #endif // __TBB_RESUMABLE_TASKS
191 
192 } // namespace r1
193 } // namespace detail
194 } // namespace tbb
195 
196 #endif // _TBB_waiters_H
197