xref: /oneTBB/include/oneapi/tbb/detail/_task.h (revision 0cf592bd)
1 /*
2     Copyright (c) 2020-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB__task_H
18 #define __TBB__task_H
19 
20 #include "_config.h"
21 #include "_assert.h"
22 #include "_template_helpers.h"
23 #include "_small_object_pool.h"
24 
25 #include "../profiling.h"
26 
27 #include <cstddef>
28 #include <cstdint>
29 #include <climits>
30 #include <utility>
31 #include <atomic>
32 #include <mutex>
33 
34 namespace tbb {
35 namespace detail {
36 
37 namespace d1 {
38 using slot_id = unsigned short;
39 constexpr slot_id no_slot = slot_id(~0);
40 constexpr slot_id any_slot = slot_id(~1);
41 
42 class task;
43 class wait_context;
44 class task_group_context;
45 struct execution_data;
46 }
47 
48 namespace r1 {
49 //! Task spawn/wait entry points
50 TBB_EXPORT void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx);
51 TBB_EXPORT void __TBB_EXPORTED_FUNC spawn(d1::task& t, d1::task_group_context& ctx, d1::slot_id id);
52 TBB_EXPORT void __TBB_EXPORTED_FUNC execute_and_wait(d1::task& t, d1::task_group_context& t_ctx, d1::wait_context&, d1::task_group_context& w_ctx);
53 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::wait_context&, d1::task_group_context& ctx);
54 TBB_EXPORT d1::slot_id __TBB_EXPORTED_FUNC execution_slot(const d1::execution_data*);
55 TBB_EXPORT d1::task_group_context* __TBB_EXPORTED_FUNC current_context();
56 
57 // Do not place under __TBB_RESUMABLE_TASKS. It is a stub for unsupported platforms.
58 struct suspend_point_type;
59 using suspend_callback_type = void(*)(void*, suspend_point_type*);
60 //! The resumable tasks entry points
61 TBB_EXPORT void __TBB_EXPORTED_FUNC suspend(suspend_callback_type suspend_callback, void* user_callback);
62 TBB_EXPORT void __TBB_EXPORTED_FUNC resume(suspend_point_type* tag);
63 TBB_EXPORT suspend_point_type* __TBB_EXPORTED_FUNC current_suspend_point();
64 TBB_EXPORT void __TBB_EXPORTED_FUNC notify_waiters(std::uintptr_t wait_ctx_addr);
65 
66 class thread_data;
67 class task_dispatcher;
68 class external_waiter;
69 struct task_accessor;
70 struct task_arena_impl;
71 } // namespace r1
72 
73 namespace d1 {
74 
75 class task_arena;
76 using suspend_point = r1::suspend_point_type*;
77 
78 #if __TBB_RESUMABLE_TASKS
79 template <typename F>
suspend_callback(void * user_callback,suspend_point sp)80 static void suspend_callback(void* user_callback, suspend_point sp) {
81     // Copy user function to a new stack after the context switch to avoid a race when the previous
82     // suspend point is resumed while the user_callback is being called.
83     F user_callback_copy = *static_cast<F*>(user_callback);
84     user_callback_copy(sp);
85 }
86 
87 template <typename F>
suspend(F f)88 void suspend(F f) {
89     r1::suspend(&suspend_callback<F>, &f);
90 }
91 
resume(suspend_point tag)92 inline void resume(suspend_point tag) {
93     r1::resume(tag);
94 }
95 #endif /* __TBB_RESUMABLE_TASKS */
96 
97 // TODO align wait_context on cache lane
98 class wait_context {
99     static constexpr std::uint64_t overflow_mask = ~((1LLU << 32) - 1);
100 
101     std::uint64_t m_version_and_traits{1};
102     std::atomic<std::uint64_t> m_ref_count{};
103 
add_reference(std::int64_t delta)104     void add_reference(std::int64_t delta) {
105         call_itt_task_notify(releasing, this);
106         std::uint64_t r = m_ref_count.fetch_add(static_cast<std::uint64_t>(delta)) + static_cast<std::uint64_t>(delta);
107 
108         __TBB_ASSERT_EX((r & overflow_mask) == 0, "Overflow is detected");
109 
110         if (!r) {
111             // Some external waiters or coroutine waiters sleep in wait list
112             // Should to notify them that work is done
113             std::uintptr_t wait_ctx_addr = std::uintptr_t(this);
114             r1::notify_waiters(wait_ctx_addr);
115         }
116     }
117 
continue_execution()118     bool continue_execution() const {
119         std::uint64_t r = m_ref_count.load(std::memory_order_acquire);
120         __TBB_ASSERT_EX((r & overflow_mask) == 0, "Overflow is detected");
121         return r > 0;
122     }
123 
124     friend class r1::thread_data;
125     friend class r1::task_dispatcher;
126     friend class r1::external_waiter;
127     friend class task_group;
128     friend class task_group_base;
129     friend struct r1::task_arena_impl;
130     friend struct r1::suspend_point_type;
131 public:
132     // Despite the internal reference count is uin64_t we limit the user interface with uint32_t
133     // to preserve a part of the internal reference count for special needs.
wait_context(std::uint32_t ref_count)134     wait_context(std::uint32_t ref_count) : m_ref_count{ref_count} { suppress_unused_warning(m_version_and_traits); }
135     wait_context(const wait_context&) = delete;
136 
~wait_context()137     ~wait_context() {
138         __TBB_ASSERT(!continue_execution(), nullptr);
139     }
140 
141     void reserve(std::uint32_t delta = 1) {
142         add_reference(delta);
143     }
144 
145     void release(std::uint32_t delta = 1) {
146         add_reference(-std::int64_t(delta));
147     }
148 };
149 
150 struct execution_data {
151     task_group_context* context{};
152     slot_id original_slot{};
153     slot_id affinity_slot{};
154 };
155 
context(const execution_data & ed)156 inline task_group_context* context(const execution_data& ed) {
157     return ed.context;
158 }
159 
original_slot(const execution_data & ed)160 inline slot_id original_slot(const execution_data& ed) {
161     return ed.original_slot;
162 }
163 
affinity_slot(const execution_data & ed)164 inline slot_id affinity_slot(const execution_data& ed) {
165     return ed.affinity_slot;
166 }
167 
execution_slot(const execution_data & ed)168 inline slot_id execution_slot(const execution_data& ed) {
169     return r1::execution_slot(&ed);
170 }
171 
is_same_affinity(const execution_data & ed)172 inline bool is_same_affinity(const execution_data& ed) {
173     return affinity_slot(ed) == no_slot || affinity_slot(ed) == execution_slot(ed);
174 }
175 
is_stolen(const execution_data & ed)176 inline bool is_stolen(const execution_data& ed) {
177     return original_slot(ed) != execution_slot(ed);
178 }
179 
spawn(task & t,task_group_context & ctx)180 inline void spawn(task& t, task_group_context& ctx) {
181     call_itt_task_notify(releasing, &t);
182     r1::spawn(t, ctx);
183 }
184 
spawn(task & t,task_group_context & ctx,slot_id id)185 inline void spawn(task& t, task_group_context& ctx, slot_id id) {
186     call_itt_task_notify(releasing, &t);
187     r1::spawn(t, ctx, id);
188 }
189 
execute_and_wait(task & t,task_group_context & t_ctx,wait_context & wait_ctx,task_group_context & w_ctx)190 inline void execute_and_wait(task& t, task_group_context& t_ctx, wait_context& wait_ctx, task_group_context& w_ctx) {
191     r1::execute_and_wait(t, t_ctx, wait_ctx, w_ctx);
192     call_itt_task_notify(acquired, &wait_ctx);
193     call_itt_task_notify(destroy, &wait_ctx);
194 }
195 
wait(wait_context & wait_ctx,task_group_context & ctx)196 inline void wait(wait_context& wait_ctx, task_group_context& ctx) {
197     r1::wait(wait_ctx, ctx);
198     call_itt_task_notify(acquired, &wait_ctx);
199     call_itt_task_notify(destroy, &wait_ctx);
200 }
201 
202 using r1::current_context;
203 
204 class task_traits {
205     std::uint64_t m_version_and_traits{};
206     friend struct r1::task_accessor;
207 };
208 
209 //! Alignment for a task object
210 static constexpr std::size_t task_alignment = 64;
211 
212 //! Base class for user-defined tasks.
213 /** @ingroup task_scheduling */
alignas(task_alignment)214 class alignas(task_alignment) task : public task_traits {
215 protected:
216     virtual ~task() = default;
217 
218 public:
219     virtual task* execute(execution_data&) = 0;
220     virtual task* cancel(execution_data&) = 0;
221 
222 private:
223     std::uint64_t m_reserved[6]{};
224     friend struct r1::task_accessor;
225 };
226 static_assert(sizeof(task) == task_alignment, "task size is broken");
227 
228 } // namespace d1
229 } // namespace detail
230 } // namespace tbb
231 
232 #endif /* __TBB__task_H */
233