1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_task_arena_H 18 #define __TBB_task_arena_H 19 20 #include "detail/_config.h" 21 22 #include "detail/_aligned_space.h" 23 #include "detail/_attach.h" 24 #include "detail/_exception.h" 25 #include "detail/_namespace_injection.h" 26 #include "detail/_small_object_pool.h" 27 #include "detail/_task.h" 28 29 #include "detail/_task_handle.h" 30 31 #if __TBB_ARENA_BINDING 32 #include "info.h" 33 #endif /*__TBB_ARENA_BINDING*/ 34 35 namespace tbb { 36 namespace detail { 37 38 namespace d1 { 39 40 template<typename F, typename R> 41 class task_arena_function : public delegate_base { 42 F &my_func; 43 aligned_space<R> my_return_storage; 44 bool my_constructed{false}; 45 // The function should be called only once. 46 bool operator()() const override { 47 new (my_return_storage.begin()) R(my_func()); 48 return true; 49 } 50 public: 51 task_arena_function(F& f) : my_func(f) {} 52 // The function can be called only after operator() and only once. 53 R consume_result() { 54 my_constructed = true; 55 return std::move(*(my_return_storage.begin())); 56 } 57 ~task_arena_function() override { 58 if (my_constructed) { 59 my_return_storage.begin()->~R(); 60 } 61 } 62 }; 63 64 template<typename F> 65 class task_arena_function<F,void> : public delegate_base { 66 F &my_func; 67 bool operator()() const override { 68 my_func(); 69 return true; 70 } 71 public: 72 task_arena_function(F& f) : my_func(f) {} 73 void consume_result() const {} 74 75 friend class task_arena_base; 76 }; 77 78 class task_arena_base; 79 class task_scheduler_observer; 80 } // namespace d1 81 82 namespace r1 { 83 class arena; 84 struct task_arena_impl; 85 86 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool); 87 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&); 88 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&); 89 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&); 90 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&); 91 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&); 92 TBB_EXPORT int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*); 93 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t); 94 95 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*); 96 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*); 97 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t); 98 } // namespace r1 99 100 namespace d2 { 101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) { 102 __TBB_ASSERT(th != nullptr, "Attempt to schedule empty task_handle"); 103 104 auto& ctx = task_handle_accessor::ctx_of(th); 105 106 // Do not access th after release 107 r1::enqueue(*task_handle_accessor::release(th), ctx, ta); 108 } 109 } //namespace d2 110 111 namespace d1 { 112 113 static constexpr int priority_stride = INT_MAX / 4; 114 115 class task_arena_base { 116 friend struct r1::task_arena_impl; 117 friend void r1::observe(d1::task_scheduler_observer&, bool); 118 public: 119 enum class priority : int { 120 low = 1 * priority_stride, 121 normal = 2 * priority_stride, 122 high = 3 * priority_stride 123 }; 124 #if __TBB_ARENA_BINDING 125 using constraints = tbb::detail::d1::constraints; 126 #endif /*__TBB_ARENA_BINDING*/ 127 protected: 128 //! Special settings 129 intptr_t my_version_and_traits; 130 131 std::atomic<do_once_state> my_initialization_state; 132 133 //! nullptr if not currently initialized. 134 std::atomic<r1::arena*> my_arena; 135 static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*), 136 "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer"); 137 138 //! Concurrency level for deferred initialization 139 int my_max_concurrency; 140 141 //! Reserved slots for external threads 142 unsigned my_num_reserved_slots; 143 144 //! Arena priority 145 priority my_priority; 146 147 //! The NUMA node index to which the arena will be attached 148 numa_node_id my_numa_id; 149 150 //! The core type index to which arena will be attached 151 core_type_id my_core_type; 152 153 //! Number of threads per core 154 int my_max_threads_per_core; 155 156 // Backward compatibility checks. 157 core_type_id core_type() const { 158 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic; 159 } 160 int max_threads_per_core() const { 161 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic; 162 } 163 164 enum { 165 default_flags = 0 166 , core_type_support_flag = 1 167 }; 168 169 task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority) 170 : my_version_and_traits(default_flags | core_type_support_flag) 171 , my_initialization_state(do_once_state::uninitialized) 172 , my_arena(nullptr) 173 , my_max_concurrency(max_concurrency) 174 , my_num_reserved_slots(reserved_for_masters) 175 , my_priority(a_priority) 176 , my_numa_id(automatic) 177 , my_core_type(automatic) 178 , my_max_threads_per_core(automatic) 179 {} 180 181 #if __TBB_ARENA_BINDING 182 task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority) 183 : my_version_and_traits(default_flags | core_type_support_flag) 184 , my_initialization_state(do_once_state::uninitialized) 185 , my_arena(nullptr) 186 , my_max_concurrency(constraints_.max_concurrency) 187 , my_num_reserved_slots(reserved_for_masters) 188 , my_priority(a_priority) 189 , my_numa_id(constraints_.numa_id) 190 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 191 , my_core_type(constraints_.core_type) 192 , my_max_threads_per_core(constraints_.max_threads_per_core) 193 #else 194 , my_core_type(automatic) 195 , my_max_threads_per_core(automatic) 196 #endif 197 {} 198 #endif /*__TBB_ARENA_BINDING*/ 199 public: 200 //! Typedef for number of threads that is automatic. 201 static const int automatic = -1; 202 static const int not_initialized = -2; 203 }; 204 205 template<typename R, typename F> 206 R isolate_impl(F& f) { 207 task_arena_function<F, R> func(f); 208 r1::isolate_within_arena(func, /*isolation*/ 0); 209 return func.consume_result(); 210 } 211 212 template <typename F> 213 class enqueue_task : public task { 214 small_object_allocator m_allocator; 215 const F m_func; 216 217 void finalize(const execution_data& ed) { 218 m_allocator.delete_object(this, ed); 219 } 220 task* execute(execution_data& ed) override { 221 m_func(); 222 finalize(ed); 223 return nullptr; 224 } 225 task* cancel(execution_data&) override { 226 __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught"); 227 return nullptr; 228 } 229 public: 230 enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {} 231 enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {} 232 }; 233 234 template<typename F> 235 void enqueue_impl(F&& f, task_arena_base* ta) { 236 small_object_allocator alloc{}; 237 r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta); 238 } 239 /** 1-to-1 proxy representation class of scheduler's arena 240 * Constructors set up settings only, real construction is deferred till the first method invocation 241 * Destructor only removes one of the references to the inner arena representation. 242 * Final destruction happens when all the references (and the work) are gone. 243 */ 244 class task_arena : public task_arena_base { 245 246 void mark_initialized() { 247 __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" ); 248 my_initialization_state.store(do_once_state::initialized, std::memory_order_release); 249 } 250 251 template<typename R, typename F> 252 R execute_impl(F& f) { 253 initialize(); 254 task_arena_function<F, R> func(f); 255 r1::execute(*this, func); 256 return func.consume_result(); 257 } 258 public: 259 //! Creates task_arena with certain concurrency limits 260 /** Sets up settings only, real construction is deferred till the first method invocation 261 * @arg max_concurrency specifies total number of slots in arena where threads work 262 * @arg reserved_for_masters specifies number of slots to be used by external threads only. 263 * Value of 1 is default and reflects behavior of implicit arenas. 264 **/ 265 task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1, 266 priority a_priority = priority::normal) 267 : task_arena_base(max_concurrency_, reserved_for_masters, a_priority) 268 {} 269 270 #if __TBB_ARENA_BINDING 271 //! Creates task arena pinned to certain NUMA node 272 task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1, 273 priority a_priority = priority::normal) 274 : task_arena_base(constraints_, reserved_for_masters, a_priority) 275 {} 276 277 //! Copies settings from another task_arena 278 task_arena(const task_arena &s) // copy settings but not the reference or instance 279 : task_arena_base( 280 constraints{} 281 .set_numa_id(s.my_numa_id) 282 .set_max_concurrency(s.my_max_concurrency) 283 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 284 .set_core_type(s.my_core_type) 285 .set_max_threads_per_core(s.my_max_threads_per_core) 286 #endif 287 , s.my_num_reserved_slots, s.my_priority) 288 {} 289 #else 290 //! Copies settings from another task_arena 291 task_arena(const task_arena& a) // copy settings but not the reference or instance 292 : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority) 293 {} 294 #endif /*__TBB_ARENA_BINDING*/ 295 296 //! Tag class used to indicate the "attaching" constructor 297 struct attach {}; 298 299 //! Creates an instance of task_arena attached to the current arena of the thread 300 explicit task_arena( attach ) 301 : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails 302 { 303 if (r1::attach(*this)) { 304 mark_initialized(); 305 } 306 } 307 308 //! Creates an instance of task_arena attached to the current arena of the thread 309 explicit task_arena(d1::attach) 310 : task_arena(attach{}) 311 {} 312 313 //! Forces allocation of the resources for the task_arena as specified in constructor arguments 314 void initialize() { 315 atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state); 316 } 317 318 //! Overrides concurrency level and forces initialization of internal representation 319 void initialize(int max_concurrency_, unsigned reserved_for_masters = 1, 320 priority a_priority = priority::normal) 321 { 322 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); 323 if( !is_active() ) { 324 my_max_concurrency = max_concurrency_; 325 my_num_reserved_slots = reserved_for_masters; 326 my_priority = a_priority; 327 r1::initialize(*this); 328 mark_initialized(); 329 } 330 } 331 332 #if __TBB_ARENA_BINDING 333 void initialize(constraints constraints_, unsigned reserved_for_masters = 1, 334 priority a_priority = priority::normal) 335 { 336 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); 337 if( !is_active() ) { 338 my_numa_id = constraints_.numa_id; 339 my_max_concurrency = constraints_.max_concurrency; 340 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 341 my_core_type = constraints_.core_type; 342 my_max_threads_per_core = constraints_.max_threads_per_core; 343 #endif 344 my_num_reserved_slots = reserved_for_masters; 345 my_priority = a_priority; 346 r1::initialize(*this); 347 mark_initialized(); 348 } 349 } 350 #endif /*__TBB_ARENA_BINDING*/ 351 352 //! Attaches this instance to the current arena of the thread 353 void initialize(attach) { 354 // TODO: decide if this call must be thread-safe 355 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); 356 if( !is_active() ) { 357 if ( !r1::attach(*this) ) { 358 r1::initialize(*this); 359 } 360 mark_initialized(); 361 } 362 } 363 364 //! Attaches this instance to the current arena of the thread 365 void initialize(d1::attach) { 366 initialize(attach{}); 367 } 368 369 //! Removes the reference to the internal arena representation. 370 //! Not thread safe wrt concurrent invocations of other methods. 371 void terminate() { 372 if( is_active() ) { 373 r1::terminate(*this); 374 my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed); 375 } 376 } 377 378 //! Removes the reference to the internal arena representation, and destroys the external object. 379 //! Not thread safe wrt concurrent invocations of other methods. 380 ~task_arena() { 381 terminate(); 382 } 383 384 //! Returns true if the arena is active (initialized); false otherwise. 385 //! The name was chosen to match a task_scheduler_init method with the same semantics. 386 bool is_active() const { 387 return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized; 388 } 389 390 //! Enqueues a task into the arena to process a functor, and immediately returns. 391 //! Does not require the calling thread to join the arena 392 393 template<typename F> 394 void enqueue(F&& f) { 395 initialize(); 396 enqueue_impl(std::forward<F>(f), this); 397 } 398 399 //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns. 400 //! Does not require the calling thread to join the arena 401 void enqueue(d2::task_handle&& th) { 402 initialize(); 403 d2::enqueue_impl(std::move(th), this); 404 } 405 406 //! Joins the arena and executes a mutable functor, then returns 407 //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion 408 //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread 409 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). 410 template<typename F> 411 auto execute(F&& f) -> decltype(f()) { 412 return execute_impl<decltype(f())>(f); 413 } 414 415 #if __TBB_EXTRA_DEBUG 416 //! Returns my_num_reserved_slots 417 int debug_reserved_slots() const { 418 // Handle special cases inside the library 419 return my_num_reserved_slots; 420 } 421 422 //! Returns my_max_concurrency 423 int debug_max_concurrency() const { 424 // Handle special cases inside the library 425 return my_max_concurrency; 426 } 427 428 //! Wait for all work in the arena to be completed 429 //! Even submitted by other application threads 430 //! Joins arena if/when possible (in the same way as execute()) 431 void debug_wait_until_empty() { 432 initialize(); 433 r1::wait(*this); 434 } 435 #endif //__TBB_EXTRA_DEBUG 436 437 //! Returns the maximal number of threads that can work inside the arena 438 int max_concurrency() const { 439 // Handle special cases inside the library 440 return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this); 441 } 442 443 friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) { 444 __TBB_ASSERT(ta.is_active(), nullptr); 445 call_itt_task_notify(releasing, &t); 446 r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0); 447 } 448 }; 449 450 //! Executes a mutable functor in isolation within the current task arena. 451 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). 452 template<typename F> 453 inline auto isolate(F&& f) -> decltype(f()) { 454 return isolate_impl<decltype(f())>(f); 455 } 456 457 //! Returns the index, aka slot number, of the calling thread in its current arena 458 inline int current_thread_index() { 459 slot_id idx = r1::execution_slot(nullptr); 460 return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx); 461 } 462 463 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 464 inline bool is_inside_task() { 465 return nullptr != current_context(); 466 } 467 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 468 469 //! Returns the maximal number of threads that can work inside the arena 470 inline int max_concurrency() { 471 return r1::max_concurrency(nullptr); 472 } 473 474 inline void enqueue(d2::task_handle&& th) { 475 d2::enqueue_impl(std::move(th), nullptr); 476 } 477 478 template<typename F> 479 inline void enqueue(F&& f) { 480 enqueue_impl(std::forward<F>(f), nullptr); 481 } 482 483 using r1::submit; 484 485 } // namespace d1 486 } // namespace detail 487 488 inline namespace v1 { 489 using detail::d1::task_arena; 490 using detail::d1::attach; 491 492 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 493 using detail::d1::is_inside_task; 494 #endif 495 496 namespace this_task_arena { 497 using detail::d1::current_thread_index; 498 using detail::d1::max_concurrency; 499 using detail::d1::isolate; 500 501 using detail::d1::enqueue; 502 } // namespace this_task_arena 503 504 } // inline namespace v1 505 506 } // namespace tbb 507 #endif /* __TBB_task_arena_H */ 508