xref: /oneTBB/include/oneapi/tbb/task_arena.h (revision 3e352b48)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_task_arena_H
18 #define __TBB_task_arena_H
19 
20 #include "detail/_config.h"
21 
22 #include "detail/_aligned_space.h"
23 #include "detail/_attach.h"
24 #include "detail/_exception.h"
25 #include "detail/_namespace_injection.h"
26 #include "detail/_small_object_pool.h"
27 #include "detail/_task.h"
28 
29 #include "detail/_task_handle.h"
30 
31 #if __TBB_ARENA_BINDING
32 #include "info.h"
33 #endif /*__TBB_ARENA_BINDING*/
34 
35 namespace tbb {
36 namespace detail {
37 
38 namespace d1 {
39 
40 template<typename F, typename R>
41 class task_arena_function : public delegate_base {
42     F &my_func;
43     aligned_space<R> my_return_storage;
44     bool my_constructed{false};
45     // The function should be called only once.
46     bool operator()() const override {
47         new (my_return_storage.begin()) R(my_func());
48         return true;
49     }
50 public:
51     task_arena_function(F& f) : my_func(f) {}
52     // The function can be called only after operator() and only once.
53     R consume_result() {
54         my_constructed = true;
55         return std::move(*(my_return_storage.begin()));
56     }
57     ~task_arena_function() override {
58         if (my_constructed) {
59             my_return_storage.begin()->~R();
60         }
61     }
62 };
63 
64 template<typename F>
65 class task_arena_function<F,void> : public delegate_base {
66     F &my_func;
67     bool operator()() const override {
68         my_func();
69         return true;
70     }
71 public:
72     task_arena_function(F& f) : my_func(f) {}
73     void consume_result() const {}
74 
75     friend class task_arena_base;
76 };
77 
78 class task_arena_base;
79 class task_scheduler_observer;
80 } // namespace d1
81 
82 namespace r1 {
83 class arena;
84 struct task_arena_impl;
85 
86 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool);
87 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&);
88 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&);
89 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&);
90 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&);
91 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&);
92 TBB_EXPORT int  __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*);
93 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t);
94 
95 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
96 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
97 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);
98 } // namespace r1
99 
100 namespace d2 {
101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
102     __TBB_ASSERT(th != nullptr, "Attempt to schedule empty task_handle");
103 
104     auto& ctx = task_handle_accessor::ctx_of(th);
105 
106     // Do not access th after release
107     r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
108 }
109 } //namespace d2
110 
111 namespace d1 {
112 
113 static constexpr int priority_stride = INT_MAX / 4;
114 
115 class task_arena_base {
116     friend struct r1::task_arena_impl;
117     friend void r1::observe(d1::task_scheduler_observer&, bool);
118 public:
119     enum class priority : int {
120         low    = 1 * priority_stride,
121         normal = 2 * priority_stride,
122         high   = 3 * priority_stride
123     };
124 #if __TBB_ARENA_BINDING
125     using constraints = tbb::detail::d1::constraints;
126 #endif /*__TBB_ARENA_BINDING*/
127 protected:
128     //! Special settings
129     intptr_t my_version_and_traits;
130 
131     std::atomic<do_once_state> my_initialization_state;
132 
133     //! nullptr if not currently initialized.
134     std::atomic<r1::arena*> my_arena;
135     static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*),
136         "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer");
137 
138     //! Concurrency level for deferred initialization
139     int my_max_concurrency;
140 
141     //! Reserved slots for external threads
142     unsigned my_num_reserved_slots;
143 
144     //! Arena priority
145     priority my_priority;
146 
147     //! The NUMA node index to which the arena will be attached
148     numa_node_id my_numa_id;
149 
150     //! The core type index to which arena will be attached
151     core_type_id my_core_type;
152 
153     //! Number of threads per core
154     int my_max_threads_per_core;
155 
156     // Backward compatibility checks.
157     core_type_id core_type() const {
158         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
159     }
160     int max_threads_per_core() const {
161         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
162     }
163 
164     enum {
165         default_flags = 0
166         , core_type_support_flag = 1
167     };
168 
169     task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
170         : my_version_and_traits(default_flags | core_type_support_flag)
171         , my_initialization_state(do_once_state::uninitialized)
172         , my_arena(nullptr)
173         , my_max_concurrency(max_concurrency)
174         , my_num_reserved_slots(reserved_for_masters)
175         , my_priority(a_priority)
176         , my_numa_id(automatic)
177         , my_core_type(automatic)
178         , my_max_threads_per_core(automatic)
179         {}
180 
181 #if __TBB_ARENA_BINDING
182     task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
183         : my_version_and_traits(default_flags | core_type_support_flag)
184         , my_initialization_state(do_once_state::uninitialized)
185         , my_arena(nullptr)
186         , my_max_concurrency(constraints_.max_concurrency)
187         , my_num_reserved_slots(reserved_for_masters)
188         , my_priority(a_priority)
189         , my_numa_id(constraints_.numa_id)
190 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
191         , my_core_type(constraints_.core_type)
192         , my_max_threads_per_core(constraints_.max_threads_per_core)
193 #else
194         , my_core_type(automatic)
195         , my_max_threads_per_core(automatic)
196 #endif
197         {}
198 #endif /*__TBB_ARENA_BINDING*/
199 public:
200     //! Typedef for number of threads that is automatic.
201     static const int automatic = -1;
202     static const int not_initialized = -2;
203 };
204 
205 template<typename R, typename F>
206 R isolate_impl(F& f) {
207     task_arena_function<F, R> func(f);
208     r1::isolate_within_arena(func, /*isolation*/ 0);
209     return func.consume_result();
210 }
211 
212 template <typename F>
213 class enqueue_task : public task {
214     small_object_allocator m_allocator;
215     const F m_func;
216 
217     void finalize(const execution_data& ed) {
218         m_allocator.delete_object(this, ed);
219     }
220     task* execute(execution_data& ed) override {
221         m_func();
222         finalize(ed);
223         return nullptr;
224     }
225     task* cancel(execution_data&) override {
226         __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught");
227         return nullptr;
228     }
229 public:
230     enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {}
231     enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {}
232 };
233 
234 template<typename F>
235 void enqueue_impl(F&& f, task_arena_base* ta) {
236     small_object_allocator alloc{};
237     r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta);
238 }
239 /** 1-to-1 proxy representation class of scheduler's arena
240  * Constructors set up settings only, real construction is deferred till the first method invocation
241  * Destructor only removes one of the references to the inner arena representation.
242  * Final destruction happens when all the references (and the work) are gone.
243  */
244 class task_arena : public task_arena_base {
245 
246     void mark_initialized() {
247         __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" );
248         my_initialization_state.store(do_once_state::initialized, std::memory_order_release);
249     }
250 
251     template<typename R, typename F>
252     R execute_impl(F& f) {
253         initialize();
254         task_arena_function<F, R> func(f);
255         r1::execute(*this, func);
256         return func.consume_result();
257     }
258 public:
259     //! Creates task_arena with certain concurrency limits
260     /** Sets up settings only, real construction is deferred till the first method invocation
261      *  @arg max_concurrency specifies total number of slots in arena where threads work
262      *  @arg reserved_for_masters specifies number of slots to be used by external threads only.
263      *       Value of 1 is default and reflects behavior of implicit arenas.
264      **/
265     task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
266                priority a_priority = priority::normal)
267         : task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
268     {}
269 
270 #if __TBB_ARENA_BINDING
271     //! Creates task arena pinned to certain NUMA node
272     task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
273                priority a_priority = priority::normal)
274         : task_arena_base(constraints_, reserved_for_masters, a_priority)
275     {}
276 
277     //! Copies settings from another task_arena
278     task_arena(const task_arena &s) // copy settings but not the reference or instance
279         : task_arena_base(
280             constraints{}
281                 .set_numa_id(s.my_numa_id)
282                 .set_max_concurrency(s.my_max_concurrency)
283 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
284                 .set_core_type(s.my_core_type)
285                 .set_max_threads_per_core(s.my_max_threads_per_core)
286 #endif
287             , s.my_num_reserved_slots, s.my_priority)
288     {}
289 #else
290     //! Copies settings from another task_arena
291     task_arena(const task_arena& a) // copy settings but not the reference or instance
292         : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
293     {}
294 #endif /*__TBB_ARENA_BINDING*/
295 
296     //! Tag class used to indicate the "attaching" constructor
297     struct attach {};
298 
299     //! Creates an instance of task_arena attached to the current arena of the thread
300     explicit task_arena( attach )
301         : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
302     {
303         if (r1::attach(*this)) {
304             mark_initialized();
305         }
306     }
307 
308     //! Creates an instance of task_arena attached to the current arena of the thread
309     explicit task_arena(d1::attach)
310         : task_arena(attach{})
311     {}
312 
313     //! Forces allocation of the resources for the task_arena as specified in constructor arguments
314     void initialize() {
315         atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state);
316     }
317 
318     //! Overrides concurrency level and forces initialization of internal representation
319     void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
320                     priority a_priority = priority::normal)
321     {
322         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
323         if( !is_active() ) {
324             my_max_concurrency = max_concurrency_;
325             my_num_reserved_slots = reserved_for_masters;
326             my_priority = a_priority;
327             r1::initialize(*this);
328             mark_initialized();
329         }
330     }
331 
332 #if __TBB_ARENA_BINDING
333     void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
334                     priority a_priority = priority::normal)
335     {
336         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
337         if( !is_active() ) {
338             my_numa_id = constraints_.numa_id;
339             my_max_concurrency = constraints_.max_concurrency;
340 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
341             my_core_type = constraints_.core_type;
342             my_max_threads_per_core = constraints_.max_threads_per_core;
343 #endif
344             my_num_reserved_slots = reserved_for_masters;
345             my_priority = a_priority;
346             r1::initialize(*this);
347             mark_initialized();
348         }
349     }
350 #endif /*__TBB_ARENA_BINDING*/
351 
352     //! Attaches this instance to the current arena of the thread
353     void initialize(attach) {
354         // TODO: decide if this call must be thread-safe
355         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
356         if( !is_active() ) {
357             if ( !r1::attach(*this) ) {
358                 r1::initialize(*this);
359             }
360             mark_initialized();
361         }
362     }
363 
364     //! Attaches this instance to the current arena of the thread
365     void initialize(d1::attach) {
366         initialize(attach{});
367     }
368 
369     //! Removes the reference to the internal arena representation.
370     //! Not thread safe wrt concurrent invocations of other methods.
371     void terminate() {
372         if( is_active() ) {
373             r1::terminate(*this);
374             my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed);
375         }
376     }
377 
378     //! Removes the reference to the internal arena representation, and destroys the external object.
379     //! Not thread safe wrt concurrent invocations of other methods.
380     ~task_arena() {
381         terminate();
382     }
383 
384     //! Returns true if the arena is active (initialized); false otherwise.
385     //! The name was chosen to match a task_scheduler_init method with the same semantics.
386     bool is_active() const {
387         return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized;
388     }
389 
390     //! Enqueues a task into the arena to process a functor, and immediately returns.
391     //! Does not require the calling thread to join the arena
392 
393     template<typename F>
394     void enqueue(F&& f) {
395         initialize();
396         enqueue_impl(std::forward<F>(f), this);
397     }
398 
399     //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns.
400     //! Does not require the calling thread to join the arena
401     void enqueue(d2::task_handle&& th) {
402         initialize();
403         d2::enqueue_impl(std::move(th), this);
404     }
405 
406     //! Joins the arena and executes a mutable functor, then returns
407     //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
408     //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread
409     //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
410     template<typename F>
411     auto execute(F&& f) -> decltype(f()) {
412         return execute_impl<decltype(f())>(f);
413     }
414 
415 #if __TBB_EXTRA_DEBUG
416     //! Returns my_num_reserved_slots
417     int debug_reserved_slots() const {
418         // Handle special cases inside the library
419         return my_num_reserved_slots;
420     }
421 
422     //! Returns my_max_concurrency
423     int debug_max_concurrency() const {
424         // Handle special cases inside the library
425         return my_max_concurrency;
426     }
427 
428     //! Wait for all work in the arena to be completed
429     //! Even submitted by other application threads
430     //! Joins arena if/when possible (in the same way as execute())
431     void debug_wait_until_empty() {
432         initialize();
433         r1::wait(*this);
434     }
435 #endif //__TBB_EXTRA_DEBUG
436 
437     //! Returns the maximal number of threads that can work inside the arena
438     int max_concurrency() const {
439         // Handle special cases inside the library
440         return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this);
441     }
442 
443     friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) {
444         __TBB_ASSERT(ta.is_active(), nullptr);
445         call_itt_task_notify(releasing, &t);
446         r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0);
447     }
448 };
449 
450 //! Executes a mutable functor in isolation within the current task arena.
451 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
452 template<typename F>
453 inline auto isolate(F&& f) -> decltype(f()) {
454     return isolate_impl<decltype(f())>(f);
455 }
456 
457 //! Returns the index, aka slot number, of the calling thread in its current arena
458 inline int current_thread_index() {
459     slot_id idx = r1::execution_slot(nullptr);
460     return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx);
461 }
462 
463 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
464 inline bool is_inside_task() {
465     return nullptr != current_context();
466 }
467 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
468 
469 //! Returns the maximal number of threads that can work inside the arena
470 inline int max_concurrency() {
471     return r1::max_concurrency(nullptr);
472 }
473 
474 inline void enqueue(d2::task_handle&& th) {
475     d2::enqueue_impl(std::move(th), nullptr);
476 }
477 
478 template<typename F>
479 inline void enqueue(F&& f) {
480     enqueue_impl(std::forward<F>(f), nullptr);
481 }
482 
483 using r1::submit;
484 
485 } // namespace d1
486 } // namespace detail
487 
488 inline namespace v1 {
489 using detail::d1::task_arena;
490 using detail::d1::attach;
491 
492 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
493 using detail::d1::is_inside_task;
494 #endif
495 
496 namespace this_task_arena {
497 using detail::d1::current_thread_index;
498 using detail::d1::max_concurrency;
499 using detail::d1::isolate;
500 
501 using detail::d1::enqueue;
502 } // namespace this_task_arena
503 
504 } // inline namespace v1
505 
506 } // namespace tbb
507 #endif /* __TBB_task_arena_H */
508