xref: /oneTBB/include/oneapi/tbb/task_arena.h (revision c4568449)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_task_arena_H
18 #define __TBB_task_arena_H
19 
20 #include "detail/_config.h"
21 
22 #include "detail/_aligned_space.h"
23 #include "detail/_attach.h"
24 #include "detail/_exception.h"
25 #include "detail/_namespace_injection.h"
26 #include "detail/_small_object_pool.h"
27 #include "detail/_task.h"
28 
29 #include "detail/_task_handle.h"
30 
31 #if __TBB_ARENA_BINDING
32 #include "info.h"
33 #endif /*__TBB_ARENA_BINDING*/
34 
35 namespace tbb {
36 namespace detail {
37 
38 namespace d1 {
39 
40 template<typename F, typename R>
41 class task_arena_function : public delegate_base {
42     F &my_func;
43     aligned_space<R> my_return_storage;
44     bool my_constructed{false};
45     // The function should be called only once.
operator()46     bool operator()() const override {
47         new (my_return_storage.begin()) R(my_func());
48         return true;
49     }
50 public:
task_arena_function(F & f)51     task_arena_function(F& f) : my_func(f) {}
52     // The function can be called only after operator() and only once.
consume_result()53     R consume_result() {
54         my_constructed = true;
55         return std::move(*(my_return_storage.begin()));
56     }
~task_arena_function()57     ~task_arena_function() override {
58         if (my_constructed) {
59             my_return_storage.begin()->~R();
60         }
61     }
62 };
63 
64 template<typename F>
65 class task_arena_function<F,void> : public delegate_base {
66     F &my_func;
operator()67     bool operator()() const override {
68         my_func();
69         return true;
70     }
71 public:
task_arena_function(F & f)72     task_arena_function(F& f) : my_func(f) {}
consume_result()73     void consume_result() const {}
74 
75     friend class task_arena_base;
76 };
77 
78 class task_arena_base;
79 class task_scheduler_observer;
80 } // namespace d1
81 
82 namespace r1 {
83 class arena;
84 struct task_arena_impl;
85 
86 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool);
87 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&);
88 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&);
89 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&);
90 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&);
91 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&);
92 TBB_EXPORT int  __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*);
93 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t);
94 
95 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
96 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
97 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);
98 } // namespace r1
99 
100 namespace d2 {
enqueue_impl(task_handle && th,d1::task_arena_base * ta)101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
102     __TBB_ASSERT(th != nullptr, "Attempt to schedule empty task_handle");
103 
104     auto& ctx = task_handle_accessor::ctx_of(th);
105 
106     // Do not access th after release
107     r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
108 }
109 } //namespace d2
110 
111 namespace d1 {
112 
113 static constexpr unsigned num_priority_levels = 3;
114 static constexpr int priority_stride = INT_MAX / (num_priority_levels + 1);
115 
116 class task_arena_base {
117     friend struct r1::task_arena_impl;
118     friend void r1::observe(d1::task_scheduler_observer&, bool);
119 public:
120     enum class priority : int {
121         low    = 1 * priority_stride,
122         normal = 2 * priority_stride,
123         high   = 3 * priority_stride
124     };
125 #if __TBB_ARENA_BINDING
126     using constraints = tbb::detail::d1::constraints;
127 #endif /*__TBB_ARENA_BINDING*/
128 protected:
129     //! Special settings
130     intptr_t my_version_and_traits;
131 
132     std::atomic<do_once_state> my_initialization_state;
133 
134     //! nullptr if not currently initialized.
135     std::atomic<r1::arena*> my_arena;
136     static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*),
137         "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer");
138 
139     //! Concurrency level for deferred initialization
140     int my_max_concurrency;
141 
142     //! Reserved slots for external threads
143     unsigned my_num_reserved_slots;
144 
145     //! Arena priority
146     priority my_priority;
147 
148     //! The NUMA node index to which the arena will be attached
149     numa_node_id my_numa_id;
150 
151     //! The core type index to which arena will be attached
152     core_type_id my_core_type;
153 
154     //! Number of threads per core
155     int my_max_threads_per_core;
156 
157     // Backward compatibility checks.
core_type()158     core_type_id core_type() const {
159         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
160     }
max_threads_per_core()161     int max_threads_per_core() const {
162         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
163     }
164 
165     enum {
166         default_flags = 0
167         , core_type_support_flag = 1
168     };
169 
task_arena_base(int max_concurrency,unsigned reserved_for_masters,priority a_priority)170     task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
171         : my_version_and_traits(default_flags | core_type_support_flag)
172         , my_initialization_state(do_once_state::uninitialized)
173         , my_arena(nullptr)
174         , my_max_concurrency(max_concurrency)
175         , my_num_reserved_slots(reserved_for_masters)
176         , my_priority(a_priority)
177         , my_numa_id(automatic)
178         , my_core_type(automatic)
179         , my_max_threads_per_core(automatic)
180         {}
181 
182 #if __TBB_ARENA_BINDING
task_arena_base(const constraints & constraints_,unsigned reserved_for_masters,priority a_priority)183     task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
184         : my_version_and_traits(default_flags | core_type_support_flag)
185         , my_initialization_state(do_once_state::uninitialized)
186         , my_arena(nullptr)
187         , my_max_concurrency(constraints_.max_concurrency)
188         , my_num_reserved_slots(reserved_for_masters)
189         , my_priority(a_priority)
190         , my_numa_id(constraints_.numa_id)
191         , my_core_type(constraints_.core_type)
192         , my_max_threads_per_core(constraints_.max_threads_per_core)
193         {}
194 #endif /*__TBB_ARENA_BINDING*/
195 public:
196     //! Typedef for number of threads that is automatic.
197     static const int automatic = -1;
198     static const int not_initialized = -2;
199 };
200 
201 template<typename R, typename F>
isolate_impl(F & f)202 R isolate_impl(F& f) {
203     task_arena_function<F, R> func(f);
204     r1::isolate_within_arena(func, /*isolation*/ 0);
205     return func.consume_result();
206 }
207 
208 template <typename F>
209 class enqueue_task : public task {
210     small_object_allocator m_allocator;
211     const F m_func;
212 
finalize(const execution_data & ed)213     void finalize(const execution_data& ed) {
214         m_allocator.delete_object(this, ed);
215     }
execute(execution_data & ed)216     task* execute(execution_data& ed) override {
217         m_func();
218         finalize(ed);
219         return nullptr;
220     }
cancel(execution_data &)221     task* cancel(execution_data&) override {
222         __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught");
223         return nullptr;
224     }
225 public:
enqueue_task(const F & f,small_object_allocator & alloc)226     enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {}
enqueue_task(F && f,small_object_allocator & alloc)227     enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {}
228 };
229 
230 template<typename F>
enqueue_impl(F && f,task_arena_base * ta)231 void enqueue_impl(F&& f, task_arena_base* ta) {
232     small_object_allocator alloc{};
233     r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta);
234 }
235 /** 1-to-1 proxy representation class of scheduler's arena
236  * Constructors set up settings only, real construction is deferred till the first method invocation
237  * Destructor only removes one of the references to the inner arena representation.
238  * Final destruction happens when all the references (and the work) are gone.
239  */
240 class task_arena : public task_arena_base {
241 
mark_initialized()242     void mark_initialized() {
243         __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" );
244         my_initialization_state.store(do_once_state::initialized, std::memory_order_release);
245     }
246 
247     template<typename R, typename F>
execute_impl(F & f)248     R execute_impl(F& f) {
249         initialize();
250         task_arena_function<F, R> func(f);
251         r1::execute(*this, func);
252         return func.consume_result();
253     }
254 public:
255     //! Creates task_arena with certain concurrency limits
256     /** Sets up settings only, real construction is deferred till the first method invocation
257      *  @arg max_concurrency specifies total number of slots in arena where threads work
258      *  @arg reserved_for_masters specifies number of slots to be used by external threads only.
259      *       Value of 1 is default and reflects behavior of implicit arenas.
260      **/
261     task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
262                priority a_priority = priority::normal)
task_arena_base(max_concurrency_,reserved_for_masters,a_priority)263         : task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
264     {}
265 
266 #if __TBB_ARENA_BINDING
267     //! Creates task arena pinned to certain NUMA node
268     task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
269                priority a_priority = priority::normal)
task_arena_base(constraints_,reserved_for_masters,a_priority)270         : task_arena_base(constraints_, reserved_for_masters, a_priority)
271     {}
272 
273     //! Copies settings from another task_arena
task_arena(const task_arena & s)274     task_arena(const task_arena &s) // copy settings but not the reference or instance
275         : task_arena_base(
276             constraints{}
277                 .set_numa_id(s.my_numa_id)
278                 .set_max_concurrency(s.my_max_concurrency)
279                 .set_core_type(s.my_core_type)
280                 .set_max_threads_per_core(s.my_max_threads_per_core)
281             , s.my_num_reserved_slots, s.my_priority)
282     {}
283 #else
284     //! Copies settings from another task_arena
task_arena(const task_arena & a)285     task_arena(const task_arena& a) // copy settings but not the reference or instance
286         : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
287     {}
288 #endif /*__TBB_ARENA_BINDING*/
289 
290     //! Tag class used to indicate the "attaching" constructor
291     struct attach {};
292 
293     //! Creates an instance of task_arena attached to the current arena of the thread
task_arena(attach)294     explicit task_arena( attach )
295         : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
296     {
297         if (r1::attach(*this)) {
298             mark_initialized();
299         }
300     }
301 
302     //! Creates an instance of task_arena attached to the current arena of the thread
task_arena(d1::attach)303     explicit task_arena(d1::attach)
304         : task_arena(attach{})
305     {}
306 
307     //! Forces allocation of the resources for the task_arena as specified in constructor arguments
initialize()308     void initialize() {
309         atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state);
310     }
311 
312     //! Overrides concurrency level and forces initialization of internal representation
313     void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
314                     priority a_priority = priority::normal)
315     {
316         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
317         if( !is_active() ) {
318             my_max_concurrency = max_concurrency_;
319             my_num_reserved_slots = reserved_for_masters;
320             my_priority = a_priority;
321             r1::initialize(*this);
322             mark_initialized();
323         }
324     }
325 
326 #if __TBB_ARENA_BINDING
327     void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
328                     priority a_priority = priority::normal)
329     {
330         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
331         if( !is_active() ) {
332             my_numa_id = constraints_.numa_id;
333             my_max_concurrency = constraints_.max_concurrency;
334             my_core_type = constraints_.core_type;
335             my_max_threads_per_core = constraints_.max_threads_per_core;
336             my_num_reserved_slots = reserved_for_masters;
337             my_priority = a_priority;
338             r1::initialize(*this);
339             mark_initialized();
340         }
341     }
342 #endif /*__TBB_ARENA_BINDING*/
343 
344     //! Attaches this instance to the current arena of the thread
initialize(attach)345     void initialize(attach) {
346         // TODO: decide if this call must be thread-safe
347         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
348         if( !is_active() ) {
349             if ( !r1::attach(*this) ) {
350                 r1::initialize(*this);
351             }
352             mark_initialized();
353         }
354     }
355 
356     //! Attaches this instance to the current arena of the thread
initialize(d1::attach)357     void initialize(d1::attach) {
358         initialize(attach{});
359     }
360 
361     //! Removes the reference to the internal arena representation.
362     //! Not thread safe wrt concurrent invocations of other methods.
terminate()363     void terminate() {
364         if( is_active() ) {
365             r1::terminate(*this);
366             my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed);
367         }
368     }
369 
370     //! Removes the reference to the internal arena representation, and destroys the external object.
371     //! Not thread safe wrt concurrent invocations of other methods.
~task_arena()372     ~task_arena() {
373         terminate();
374     }
375 
376     //! Returns true if the arena is active (initialized); false otherwise.
377     //! The name was chosen to match a task_scheduler_init method with the same semantics.
is_active()378     bool is_active() const {
379         return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized;
380     }
381 
382     //! Enqueues a task into the arena to process a functor, and immediately returns.
383     //! Does not require the calling thread to join the arena
384 
385     template<typename F>
enqueue(F && f)386     void enqueue(F&& f) {
387         initialize();
388         enqueue_impl(std::forward<F>(f), this);
389     }
390 
391     //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns.
392     //! Does not require the calling thread to join the arena
enqueue(d2::task_handle && th)393     void enqueue(d2::task_handle&& th) {
394         initialize();
395         d2::enqueue_impl(std::move(th), this);
396     }
397 
398     //! Joins the arena and executes a mutable functor, then returns
399     //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
400     //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread
401     //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
402     template<typename F>
403     auto execute(F&& f) -> decltype(f()) {
404         return execute_impl<decltype(f())>(f);
405     }
406 
407 #if __TBB_EXTRA_DEBUG
408     //! Returns my_num_reserved_slots
debug_reserved_slots()409     int debug_reserved_slots() const {
410         // Handle special cases inside the library
411         return my_num_reserved_slots;
412     }
413 
414     //! Returns my_max_concurrency
debug_max_concurrency()415     int debug_max_concurrency() const {
416         // Handle special cases inside the library
417         return my_max_concurrency;
418     }
419 
420     //! Wait for all work in the arena to be completed
421     //! Even submitted by other application threads
422     //! Joins arena if/when possible (in the same way as execute())
debug_wait_until_empty()423     void debug_wait_until_empty() {
424         initialize();
425         r1::wait(*this);
426     }
427 #endif //__TBB_EXTRA_DEBUG
428 
429     //! Returns the maximal number of threads that can work inside the arena
max_concurrency()430     int max_concurrency() const {
431         // Handle special cases inside the library
432         return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this);
433     }
434 
submit(task & t,task_arena & ta,task_group_context & ctx,bool as_critical)435     friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) {
436         __TBB_ASSERT(ta.is_active(), nullptr);
437         call_itt_task_notify(releasing, &t);
438         r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0);
439     }
440 };
441 
442 //! Executes a mutable functor in isolation within the current task arena.
443 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
444 template<typename F>
445 inline auto isolate(F&& f) -> decltype(f()) {
446     return isolate_impl<decltype(f())>(f);
447 }
448 
449 //! Returns the index, aka slot number, of the calling thread in its current arena
current_thread_index()450 inline int current_thread_index() {
451     slot_id idx = r1::execution_slot(nullptr);
452     return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx);
453 }
454 
455 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
is_inside_task()456 inline bool is_inside_task() {
457     return nullptr != current_context();
458 }
459 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
460 
461 //! Returns the maximal number of threads that can work inside the arena
max_concurrency()462 inline int max_concurrency() {
463     return r1::max_concurrency(nullptr);
464 }
465 
enqueue(d2::task_handle && th)466 inline void enqueue(d2::task_handle&& th) {
467     d2::enqueue_impl(std::move(th), nullptr);
468 }
469 
470 template<typename F>
enqueue(F && f)471 inline void enqueue(F&& f) {
472     enqueue_impl(std::forward<F>(f), nullptr);
473 }
474 
475 using r1::submit;
476 
477 } // namespace d1
478 } // namespace detail
479 
480 inline namespace v1 {
481 using detail::d1::task_arena;
482 using detail::d1::attach;
483 
484 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
485 using detail::d1::is_inside_task;
486 #endif
487 
488 namespace this_task_arena {
489 using detail::d1::current_thread_index;
490 using detail::d1::max_concurrency;
491 using detail::d1::isolate;
492 
493 using detail::d1::enqueue;
494 } // namespace this_task_arena
495 
496 } // inline namespace v1
497 
498 } // namespace tbb
499 #endif /* __TBB_task_arena_H */
500