xref: /oneTBB/include/oneapi/tbb/task_arena.h (revision 0a2b3987)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_task_arena_H
18 #define __TBB_task_arena_H
19 
20 #include "detail/_namespace_injection.h"
21 #include "detail/_task.h"
22 #include "detail/_exception.h"
23 #include "detail/_aligned_space.h"
24 #include "detail/_small_object_pool.h"
25 
26 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
27 #include "detail/_task_handle.h"
28 #endif
29 
30 #if __TBB_ARENA_BINDING
31 #include "info.h"
32 #endif /*__TBB_ARENA_BINDING*/
33 
34 namespace tbb {
35 namespace detail {
36 
37 namespace d1 {
38 
39 template<typename F, typename R>
40 class task_arena_function : public delegate_base {
41     F &my_func;
42     aligned_space<R> my_return_storage;
43     bool my_constructed{false};
44     // The function should be called only once.
45     bool operator()() const override {
46         new (my_return_storage.begin()) R(my_func());
47         return true;
48     }
49 public:
50     task_arena_function(F& f) : my_func(f) {}
51     // The function can be called only after operator() and only once.
52     R consume_result() {
53         my_constructed = true;
54         return std::move(*(my_return_storage.begin()));
55     }
56     ~task_arena_function() override {
57         if (my_constructed) {
58             my_return_storage.begin()->~R();
59         }
60     }
61 };
62 
63 template<typename F>
64 class task_arena_function<F,void> : public delegate_base {
65     F &my_func;
66     bool operator()() const override {
67         my_func();
68         return true;
69     }
70 public:
71     task_arena_function(F& f) : my_func(f) {}
72     void consume_result() const {}
73 
74     friend class task_arena_base;
75 };
76 
77 class task_arena_base;
78 class task_scheduler_observer;
79 } // namespace d1
80 
81 namespace r1 {
82 class arena;
83 struct task_arena_impl;
84 
85 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool);
86 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&);
87 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&);
88 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&);
89 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&);
90 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&);
91 TBB_EXPORT int  __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*);
92 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t);
93 
94 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*);
95 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*);
96 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t);
97 } // namespace r1
98 
99 namespace d2 {
100 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) {
102     if (th == nullptr) {
103         throw_exception(exception_id::bad_task_handle);
104     }
105 
106     auto& ctx = task_handle_accessor::ctx_of(th);
107 
108     // Do not access th after release
109     r1::enqueue(*task_handle_accessor::release(th), ctx, ta);
110 }
111 #endif// __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
112 
113 }
114 namespace d1 {
115 
116 static constexpr int priority_stride = INT_MAX / 4;
117 
118 class task_arena_base {
119     friend struct r1::task_arena_impl;
120     friend void r1::observe(d1::task_scheduler_observer&, bool);
121 public:
122     enum class priority : int {
123         low    = 1 * priority_stride,
124         normal = 2 * priority_stride,
125         high   = 3 * priority_stride
126     };
127 #if __TBB_ARENA_BINDING
128     using constraints = tbb::detail::d1::constraints;
129 #endif /*__TBB_ARENA_BINDING*/
130 protected:
131     //! Special settings
132     intptr_t my_version_and_traits;
133 
134     std::atomic<do_once_state> my_initialization_state;
135 
136     //! NULL if not currently initialized.
137     std::atomic<r1::arena*> my_arena;
138     static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*),
139         "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer");
140 
141     //! Concurrency level for deferred initialization
142     int my_max_concurrency;
143 
144     //! Reserved slots for external threads
145     unsigned my_num_reserved_slots;
146 
147     //! Arena priority
148     priority my_priority;
149 
150     //! The NUMA node index to which the arena will be attached
151     numa_node_id my_numa_id;
152 
153     //! The core type index to which arena will be attached
154     core_type_id my_core_type;
155 
156     //! Number of threads per core
157     int my_max_threads_per_core;
158 
159     // Backward compatibility checks.
160     core_type_id core_type() const {
161         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic;
162     }
163     int max_threads_per_core() const {
164         return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic;
165     }
166 
167     enum {
168         default_flags = 0
169         , core_type_support_flag = 1
170     };
171 
172     task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority)
173         : my_version_and_traits(default_flags | core_type_support_flag)
174         , my_initialization_state(do_once_state::uninitialized)
175         , my_arena(nullptr)
176         , my_max_concurrency(max_concurrency)
177         , my_num_reserved_slots(reserved_for_masters)
178         , my_priority(a_priority)
179         , my_numa_id(automatic)
180         , my_core_type(automatic)
181         , my_max_threads_per_core(automatic)
182         {}
183 
184 #if __TBB_ARENA_BINDING
185     task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority)
186         : my_version_and_traits(default_flags | core_type_support_flag)
187         , my_initialization_state(do_once_state::uninitialized)
188         , my_arena(nullptr)
189         , my_max_concurrency(constraints_.max_concurrency)
190         , my_num_reserved_slots(reserved_for_masters)
191         , my_priority(a_priority)
192         , my_numa_id(constraints_.numa_id)
193 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
194         , my_core_type(constraints_.core_type)
195         , my_max_threads_per_core(constraints_.max_threads_per_core)
196 #else
197         , my_core_type(automatic)
198         , my_max_threads_per_core(automatic)
199 #endif
200         {}
201 #endif /*__TBB_ARENA_BINDING*/
202 public:
203     //! Typedef for number of threads that is automatic.
204     static const int automatic = -1;
205     static const int not_initialized = -2;
206 };
207 
208 template<typename R, typename F>
209 R isolate_impl(F& f) {
210     task_arena_function<F, R> func(f);
211     r1::isolate_within_arena(func, /*isolation*/ 0);
212     return func.consume_result();
213 }
214 
215 template <typename F>
216 class enqueue_task : public task {
217     small_object_allocator m_allocator;
218     const F m_func;
219 
220     void finalize(const execution_data& ed) {
221         m_allocator.delete_object(this, ed);
222     }
223     task* execute(execution_data& ed) override {
224         m_func();
225         finalize(ed);
226         return nullptr;
227     }
228     task* cancel(execution_data&) override {
229         __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught");
230         return nullptr;
231     }
232 public:
233     enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {}
234     enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {}
235 };
236 
237 template<typename F>
238 void enqueue_impl(F&& f, task_arena_base* ta) {
239     small_object_allocator alloc{};
240     r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta);
241 }
242 /** 1-to-1 proxy representation class of scheduler's arena
243  * Constructors set up settings only, real construction is deferred till the first method invocation
244  * Destructor only removes one of the references to the inner arena representation.
245  * Final destruction happens when all the references (and the work) are gone.
246  */
247 class task_arena : public task_arena_base {
248 
249     void mark_initialized() {
250         __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" );
251         my_initialization_state.store(do_once_state::initialized, std::memory_order_release);
252     }
253 
254     template<typename R, typename F>
255     R execute_impl(F& f) {
256         initialize();
257         task_arena_function<F, R> func(f);
258         r1::execute(*this, func);
259         return func.consume_result();
260     }
261 public:
262     //! Creates task_arena with certain concurrency limits
263     /** Sets up settings only, real construction is deferred till the first method invocation
264      *  @arg max_concurrency specifies total number of slots in arena where threads work
265      *  @arg reserved_for_masters specifies number of slots to be used by external threads only.
266      *       Value of 1 is default and reflects behavior of implicit arenas.
267      **/
268     task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1,
269                priority a_priority = priority::normal)
270         : task_arena_base(max_concurrency_, reserved_for_masters, a_priority)
271     {}
272 
273 #if __TBB_ARENA_BINDING
274     //! Creates task arena pinned to certain NUMA node
275     task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1,
276                priority a_priority = priority::normal)
277         : task_arena_base(constraints_, reserved_for_masters, a_priority)
278     {}
279 
280     //! Copies settings from another task_arena
281     task_arena(const task_arena &s) // copy settings but not the reference or instance
282         : task_arena_base(
283             constraints{}
284                 .set_numa_id(s.my_numa_id)
285                 .set_max_concurrency(s.my_max_concurrency)
286 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
287                 .set_core_type(s.my_core_type)
288                 .set_max_threads_per_core(s.my_max_threads_per_core)
289 #endif
290             , s.my_num_reserved_slots, s.my_priority)
291     {}
292 #else
293     //! Copies settings from another task_arena
294     task_arena(const task_arena& a) // copy settings but not the reference or instance
295         : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority)
296     {}
297 #endif /*__TBB_ARENA_BINDING*/
298 
299     //! Tag class used to indicate the "attaching" constructor
300     struct attach {};
301 
302     //! Creates an instance of task_arena attached to the current arena of the thread
303     explicit task_arena( attach )
304         : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails
305     {
306         if (r1::attach(*this)) {
307             mark_initialized();
308         }
309     }
310 
311     //! Forces allocation of the resources for the task_arena as specified in constructor arguments
312     void initialize() {
313         atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state);
314     }
315 
316     //! Overrides concurrency level and forces initialization of internal representation
317     void initialize(int max_concurrency_, unsigned reserved_for_masters = 1,
318                     priority a_priority = priority::normal)
319     {
320         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
321         if( !is_active() ) {
322             my_max_concurrency = max_concurrency_;
323             my_num_reserved_slots = reserved_for_masters;
324             my_priority = a_priority;
325             r1::initialize(*this);
326             mark_initialized();
327         }
328     }
329 
330 #if __TBB_ARENA_BINDING
331     void initialize(constraints constraints_, unsigned reserved_for_masters = 1,
332                     priority a_priority = priority::normal)
333     {
334         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
335         if( !is_active() ) {
336             my_numa_id = constraints_.numa_id;
337             my_max_concurrency = constraints_.max_concurrency;
338 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT
339             my_core_type = constraints_.core_type;
340             my_max_threads_per_core = constraints_.max_threads_per_core;
341 #endif
342             my_num_reserved_slots = reserved_for_masters;
343             my_priority = a_priority;
344             r1::initialize(*this);
345             mark_initialized();
346         }
347     }
348 #endif /*__TBB_ARENA_BINDING*/
349 
350     //! Attaches this instance to the current arena of the thread
351     void initialize(attach) {
352         // TODO: decide if this call must be thread-safe
353         __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena");
354         if( !is_active() ) {
355             if ( !r1::attach(*this) ) {
356                 r1::initialize(*this);
357             }
358             mark_initialized();
359         }
360     }
361 
362     //! Removes the reference to the internal arena representation.
363     //! Not thread safe wrt concurrent invocations of other methods.
364     void terminate() {
365         if( is_active() ) {
366             r1::terminate(*this);
367             my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed);
368         }
369     }
370 
371     //! Removes the reference to the internal arena representation, and destroys the external object.
372     //! Not thread safe wrt concurrent invocations of other methods.
373     ~task_arena() {
374         terminate();
375     }
376 
377     //! Returns true if the arena is active (initialized); false otherwise.
378     //! The name was chosen to match a task_scheduler_init method with the same semantics.
379     bool is_active() const {
380         return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized;
381     }
382 
383     //! Enqueues a task into the arena to process a functor, and immediately returns.
384     //! Does not require the calling thread to join the arena
385 
386     template<typename F>
387     void enqueue(F&& f) {
388         initialize();
389         enqueue_impl(std::forward<F>(f), this);
390     }
391 
392     //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns.
393     //! Does not require the calling thread to join the arena
394 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
395     void enqueue(d2::task_handle&& th) {
396         initialize();
397         d2::enqueue_impl(std::move(th), this);
398     }
399 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
400 
401     //! Joins the arena and executes a mutable functor, then returns
402     //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion
403     //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread
404     //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
405     template<typename F>
406     auto execute(F&& f) -> decltype(f()) {
407         return execute_impl<decltype(f())>(f);
408     }
409 
410 #if __TBB_EXTRA_DEBUG
411     //! Returns my_num_reserved_slots
412     int debug_reserved_slots() const {
413         // Handle special cases inside the library
414         return my_num_reserved_slots;
415     }
416 
417     //! Returns my_max_concurrency
418     int debug_max_concurrency() const {
419         // Handle special cases inside the library
420         return my_max_concurrency;
421     }
422 
423     //! Wait for all work in the arena to be completed
424     //! Even submitted by other application threads
425     //! Joins arena if/when possible (in the same way as execute())
426     void debug_wait_until_empty() {
427         initialize();
428         r1::wait(*this);
429     }
430 #endif //__TBB_EXTRA_DEBUG
431 
432     //! Returns the maximal number of threads that can work inside the arena
433     int max_concurrency() const {
434         // Handle special cases inside the library
435         return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this);
436     }
437 
438     friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) {
439         __TBB_ASSERT(ta.is_active(), nullptr);
440         call_itt_task_notify(releasing, &t);
441         r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0);
442     }
443 };
444 
445 //! Executes a mutable functor in isolation within the current task arena.
446 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void).
447 template<typename F>
448 inline auto isolate(F&& f) -> decltype(f()) {
449     return isolate_impl<decltype(f())>(f);
450 }
451 
452 //! Returns the index, aka slot number, of the calling thread in its current arena
453 inline int current_thread_index() {
454     slot_id idx = r1::execution_slot(nullptr);
455     return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx);
456 }
457 
458 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
459 inline bool is_inside_task() {
460     return nullptr != current_context();
461 }
462 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
463 
464 //! Returns the maximal number of threads that can work inside the arena
465 inline int max_concurrency() {
466     return r1::max_concurrency(nullptr);
467 }
468 
469 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
470 inline void enqueue(d2::task_handle&& th) {
471     d2::enqueue_impl(std::move(th), nullptr);
472 }
473 
474 template<typename F>
475 inline void enqueue(F&& f) {
476     enqueue_impl(std::forward<F>(f), nullptr);
477 }
478 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
479 
480 using r1::submit;
481 
482 } // namespace d1
483 } // namespace detail
484 
485 inline namespace v1 {
486 using detail::d1::task_arena;
487 
488 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
489 using detail::d1::is_inside_task;
490 #endif
491 
492 namespace this_task_arena {
493 using detail::d1::current_thread_index;
494 using detail::d1::max_concurrency;
495 using detail::d1::isolate;
496 
497 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
498 using detail::d1::enqueue;
499 #endif
500 } // namespace this_task_arena
501 
502 } // inline namespace v1
503 
504 } // namespace tbb
505 #endif /* __TBB_task_arena_H */
506