1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_task_arena_H
18 #define __TBB_task_arena_H
19
20 #include "detail/_config.h"
21
22 #include "detail/_aligned_space.h"
23 #include "detail/_attach.h"
24 #include "detail/_exception.h"
25 #include "detail/_namespace_injection.h"
26 #include "detail/_small_object_pool.h"
27 #include "detail/_task.h"
28
29 #include "detail/_task_handle.h"
30
31 #if __TBB_ARENA_BINDING
32 #include "info.h"
33 #endif /*__TBB_ARENA_BINDING*/
34
35 namespace tbb {
36 namespace detail {
37
38 namespace d1 {
39
40 template<typename F, typename R>
41 class task_arena_function : public delegate_base {
42 F &my_func;
43 aligned_space<R> my_return_storage;
44 bool my_constructed{false};
45 // The function should be called only once.
operator()46 bool operator()() const override {
47 new (my_return_storage.begin()) R(my_func());
48 return true;
49 }
50 public:
task_arena_function(F & f)51 task_arena_function(F& f) : my_func(f) {}
52 // The function can be called only after operator() and only once.
consume_result()53 R consume_result() {
54 my_constructed = true;
55 return std::move(*(my_return_storage.begin()));
56 }
~task_arena_function()57 ~task_arena_function() override {
58 if (my_constructed) {
59 my_return_storage.begin()->~R();
60 }
61 }
62 };
63
64 template<typename F>
65 class task_arena_function<F,void> : public delegate_base {
66 F &my_func;
operator()67 bool operator()() const override {
68 my_func();
69 return true;
70 }
71 public:
task_arena_function(F & f)72 task_arena_function(F& f) : my_func(f) {}
consume_result()73 void consume_result() const {}
74
75 friend class task_arena_base;
76 };
77
78 class task_arena_base;
79 class task_scheduler_observer;
80 } // namespace d1
81
82 namespace r1 {
83 class arena;
84 struct task_arena_impl;
85
86 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool);
87 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&);
88 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&);
89 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&);
90 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&);
91 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&);
92 TBB_EXPORT int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*);
93 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t);
94
95 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
96 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
97 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);
98 } // namespace r1
99
100 namespace d2 {
enqueue_impl(task_handle && th,d1::task_arena_base * ta)101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
102 __TBB_ASSERT(th != nullptr, "Attempt to schedule empty task_handle");
103
104 auto& ctx = task_handle_accessor::ctx_of(th);
105
106 // Do not access th after release
107 r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
108 }
109 } //namespace d2
110
111 namespace d1 {
112
113 static constexpr unsigned num_priority_levels = 3;
114 static constexpr int priority_stride = INT_MAX / (num_priority_levels + 1);
115
116 class task_arena_base {
117 friend struct r1::task_arena_impl;
118 friend void r1::observe(d1::task_scheduler_observer&, bool);
119 public:
120 enum class priority : int {
121 low = 1 * priority_stride,
122 normal = 2 * priority_stride,
123 high = 3 * priority_stride
124 };
125 #if __TBB_ARENA_BINDING
126 using constraints = tbb::detail::d1::constraints;
127 #endif /*__TBB_ARENA_BINDING*/
128 protected:
129 //! Special settings
130 intptr_t my_version_and_traits;
131
132 std::atomic<do_once_state> my_initialization_state;
133
134 //! nullptr if not currently initialized.
135 std::atomic<r1::arena*> my_arena;
136 static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*),
137 "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer");
138
139 //! Concurrency level for deferred initialization
140 int my_max_concurrency;
141
142 //! Reserved slots for external threads
143 unsigned my_num_reserved_slots;
144
145 //! Arena priority
146 priority my_priority;
147
148 //! The NUMA node index to which the arena will be attached
149 numa_node_id my_numa_id;
150
151 //! The core type index to which arena will be attached
152 core_type_id my_core_type;
153
154 //! Number of threads per core
155 int my_max_threads_per_core;
156
157 // Backward compatibility checks.
core_type()158 core_type_id core_type() const {
159 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
160 }
max_threads_per_core()161 int max_threads_per_core() const {
162 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
163 }
164
165 enum {
166 default_flags = 0
167 , core_type_support_flag = 1
168 };
169
task_arena_base(int max_concurrency,unsigned reserved_for_masters,priority a_priority)170 task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
171 : my_version_and_traits(default_flags | core_type_support_flag)
172 , my_initialization_state(do_once_state::uninitialized)
173 , my_arena(nullptr)
174 , my_max_concurrency(max_concurrency)
175 , my_num_reserved_slots(reserved_for_masters)
176 , my_priority(a_priority)
177 , my_numa_id(automatic)
178 , my_core_type(automatic)
179 , my_max_threads_per_core(automatic)
180 {}
181
182 #if __TBB_ARENA_BINDING
task_arena_base(const constraints & constraints_,unsigned reserved_for_masters,priority a_priority)183 task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
184 : my_version_and_traits(default_flags | core_type_support_flag)
185 , my_initialization_state(do_once_state::uninitialized)
186 , my_arena(nullptr)
187 , my_max_concurrency(constraints_.max_concurrency)
188 , my_num_reserved_slots(reserved_for_masters)
189 , my_priority(a_priority)
190 , my_numa_id(constraints_.numa_id)
191 , my_core_type(constraints_.core_type)
192 , my_max_threads_per_core(constraints_.max_threads_per_core)
193 {}
194 #endif /*__TBB_ARENA_BINDING*/
195 public:
196 //! Typedef for number of threads that is automatic.
197 static const int automatic = -1;
198 static const int not_initialized = -2;
199 };
200
201 template<typename R, typename F>
isolate_impl(F & f)202 R isolate_impl(F& f) {
203 task_arena_function<F, R> func(f);
204 r1::isolate_within_arena(func, /*isolation*/ 0);
205 return func.consume_result();
206 }
207
208 template <typename F>
209 class enqueue_task : public task {
210 small_object_allocator m_allocator;
211 const F m_func;
212
finalize(const execution_data & ed)213 void finalize(const execution_data& ed) {
214 m_allocator.delete_object(this, ed);
215 }
execute(execution_data & ed)216 task* execute(execution_data& ed) override {
217 m_func();
218 finalize(ed);
219 return nullptr;
220 }
cancel(execution_data &)221 task* cancel(execution_data&) override {
222 __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught");
223 return nullptr;
224 }
225 public:
enqueue_task(const F & f,small_object_allocator & alloc)226 enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {}
enqueue_task(F && f,small_object_allocator & alloc)227 enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {}
228 };
229
230 template<typename F>
enqueue_impl(F && f,task_arena_base * ta)231 void enqueue_impl(F&& f, task_arena_base* ta) {
232 small_object_allocator alloc{};
233 r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta);
234 }
235 /** 1-to-1 proxy representation class of scheduler's arena
236 * Constructors set up settings only, real construction is deferred till the first method invocation
237 * Destructor only removes one of the references to the inner arena representation.
238 * Final destruction happens when all the references (and the work) are gone.
239 */
240 class task_arena : public task_arena_base {
241
mark_initialized()242 void mark_initialized() {
243 __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" );
244 my_initialization_state.store(do_once_state::initialized, std::memory_order_release);
245 }
246
247 template<typename R, typename F>
execute_impl(F & f)248 R execute_impl(F& f) {
249 initialize();
250 task_arena_function<F, R> func(f);
251 r1::execute(*this, func);
252 return func.consume_result();
253 }
254 public:
255 //! Creates task_arena with certain concurrency limits
256 /** Sets up settings only, real construction is deferred till the first method invocation
257 * @arg max_concurrency specifies total number of slots in arena where threads work
258 * @arg reserved_for_masters specifies number of slots to be used by external threads only.
259 * Value of 1 is default and reflects behavior of implicit arenas.
260 **/
261 task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
262 priority a_priority = priority::normal)
task_arena_base(max_concurrency_,reserved_for_masters,a_priority)263 : task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
264 {}
265
266 #if __TBB_ARENA_BINDING
267 //! Creates task arena pinned to certain NUMA node
268 task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
269 priority a_priority = priority::normal)
task_arena_base(constraints_,reserved_for_masters,a_priority)270 : task_arena_base(constraints_, reserved_for_masters, a_priority)
271 {}
272
273 //! Copies settings from another task_arena
task_arena(const task_arena & s)274 task_arena(const task_arena &s) // copy settings but not the reference or instance
275 : task_arena_base(
276 constraints{}
277 .set_numa_id(s.my_numa_id)
278 .set_max_concurrency(s.my_max_concurrency)
279 .set_core_type(s.my_core_type)
280 .set_max_threads_per_core(s.my_max_threads_per_core)
281 , s.my_num_reserved_slots, s.my_priority)
282 {}
283 #else
284 //! Copies settings from another task_arena
task_arena(const task_arena & a)285 task_arena(const task_arena& a) // copy settings but not the reference or instance
286 : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
287 {}
288 #endif /*__TBB_ARENA_BINDING*/
289
290 //! Tag class used to indicate the "attaching" constructor
291 struct attach {};
292
293 //! Creates an instance of task_arena attached to the current arena of the thread
task_arena(attach)294 explicit task_arena( attach )
295 : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
296 {
297 if (r1::attach(*this)) {
298 mark_initialized();
299 }
300 }
301
302 //! Creates an instance of task_arena attached to the current arena of the thread
task_arena(d1::attach)303 explicit task_arena(d1::attach)
304 : task_arena(attach{})
305 {}
306
307 //! Forces allocation of the resources for the task_arena as specified in constructor arguments
initialize()308 void initialize() {
309 atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state);
310 }
311
312 //! Overrides concurrency level and forces initialization of internal representation
313 void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
314 priority a_priority = priority::normal)
315 {
316 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
317 if( !is_active() ) {
318 my_max_concurrency = max_concurrency_;
319 my_num_reserved_slots = reserved_for_masters;
320 my_priority = a_priority;
321 r1::initialize(*this);
322 mark_initialized();
323 }
324 }
325
326 #if __TBB_ARENA_BINDING
327 void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
328 priority a_priority = priority::normal)
329 {
330 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
331 if( !is_active() ) {
332 my_numa_id = constraints_.numa_id;
333 my_max_concurrency = constraints_.max_concurrency;
334 my_core_type = constraints_.core_type;
335 my_max_threads_per_core = constraints_.max_threads_per_core;
336 my_num_reserved_slots = reserved_for_masters;
337 my_priority = a_priority;
338 r1::initialize(*this);
339 mark_initialized();
340 }
341 }
342 #endif /*__TBB_ARENA_BINDING*/
343
344 //! Attaches this instance to the current arena of the thread
initialize(attach)345 void initialize(attach) {
346 // TODO: decide if this call must be thread-safe
347 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
348 if( !is_active() ) {
349 if ( !r1::attach(*this) ) {
350 r1::initialize(*this);
351 }
352 mark_initialized();
353 }
354 }
355
356 //! Attaches this instance to the current arena of the thread
initialize(d1::attach)357 void initialize(d1::attach) {
358 initialize(attach{});
359 }
360
361 //! Removes the reference to the internal arena representation.
362 //! Not thread safe wrt concurrent invocations of other methods.
terminate()363 void terminate() {
364 if( is_active() ) {
365 r1::terminate(*this);
366 my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed);
367 }
368 }
369
370 //! Removes the reference to the internal arena representation, and destroys the external object.
371 //! Not thread safe wrt concurrent invocations of other methods.
~task_arena()372 ~task_arena() {
373 terminate();
374 }
375
376 //! Returns true if the arena is active (initialized); false otherwise.
377 //! The name was chosen to match a task_scheduler_init method with the same semantics.
is_active()378 bool is_active() const {
379 return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized;
380 }
381
382 //! Enqueues a task into the arena to process a functor, and immediately returns.
383 //! Does not require the calling thread to join the arena
384
385 template<typename F>
enqueue(F && f)386 void enqueue(F&& f) {
387 initialize();
388 enqueue_impl(std::forward<F>(f), this);
389 }
390
391 //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns.
392 //! Does not require the calling thread to join the arena
enqueue(d2::task_handle && th)393 void enqueue(d2::task_handle&& th) {
394 initialize();
395 d2::enqueue_impl(std::move(th), this);
396 }
397
398 //! Joins the arena and executes a mutable functor, then returns
399 //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
400 //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread
401 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
402 template<typename F>
403 auto execute(F&& f) -> decltype(f()) {
404 return execute_impl<decltype(f())>(f);
405 }
406
407 #if __TBB_EXTRA_DEBUG
408 //! Returns my_num_reserved_slots
debug_reserved_slots()409 int debug_reserved_slots() const {
410 // Handle special cases inside the library
411 return my_num_reserved_slots;
412 }
413
414 //! Returns my_max_concurrency
debug_max_concurrency()415 int debug_max_concurrency() const {
416 // Handle special cases inside the library
417 return my_max_concurrency;
418 }
419
420 //! Wait for all work in the arena to be completed
421 //! Even submitted by other application threads
422 //! Joins arena if/when possible (in the same way as execute())
debug_wait_until_empty()423 void debug_wait_until_empty() {
424 initialize();
425 r1::wait(*this);
426 }
427 #endif //__TBB_EXTRA_DEBUG
428
429 //! Returns the maximal number of threads that can work inside the arena
max_concurrency()430 int max_concurrency() const {
431 // Handle special cases inside the library
432 return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this);
433 }
434
submit(task & t,task_arena & ta,task_group_context & ctx,bool as_critical)435 friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) {
436 __TBB_ASSERT(ta.is_active(), nullptr);
437 call_itt_task_notify(releasing, &t);
438 r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0);
439 }
440 };
441
442 //! Executes a mutable functor in isolation within the current task arena.
443 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
444 template<typename F>
445 inline auto isolate(F&& f) -> decltype(f()) {
446 return isolate_impl<decltype(f())>(f);
447 }
448
449 //! Returns the index, aka slot number, of the calling thread in its current arena
current_thread_index()450 inline int current_thread_index() {
451 slot_id idx = r1::execution_slot(nullptr);
452 return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx);
453 }
454
455 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
is_inside_task()456 inline bool is_inside_task() {
457 return nullptr != current_context();
458 }
459 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
460
461 //! Returns the maximal number of threads that can work inside the arena
max_concurrency()462 inline int max_concurrency() {
463 return r1::max_concurrency(nullptr);
464 }
465
enqueue(d2::task_handle && th)466 inline void enqueue(d2::task_handle&& th) {
467 d2::enqueue_impl(std::move(th), nullptr);
468 }
469
470 template<typename F>
enqueue(F && f)471 inline void enqueue(F&& f) {
472 enqueue_impl(std::forward<F>(f), nullptr);
473 }
474
475 using r1::submit;
476
477 } // namespace d1
478 } // namespace detail
479
480 inline namespace v1 {
481 using detail::d1::task_arena;
482 using detail::d1::attach;
483
484 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
485 using detail::d1::is_inside_task;
486 #endif
487
488 namespace this_task_arena {
489 using detail::d1::current_thread_index;
490 using detail::d1::max_concurrency;
491 using detail::d1::isolate;
492
493 using detail::d1::enqueue;
494 } // namespace this_task_arena
495
496 } // inline namespace v1
497
498 } // namespace tbb
499 #endif /* __TBB_task_arena_H */
500