1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_task_arena_H 18 #define __TBB_task_arena_H 19 20 #include "detail/_namespace_injection.h" 21 #include "detail/_task.h" 22 #include "detail/_exception.h" 23 #include "detail/_aligned_space.h" 24 #include "detail/_small_object_pool.h" 25 26 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 27 #include "detail/_task_handle.h" 28 #endif 29 30 #if __TBB_ARENA_BINDING 31 #include "info.h" 32 #endif /*__TBB_ARENA_BINDING*/ 33 34 namespace tbb { 35 namespace detail { 36 37 namespace d1 { 38 39 template<typename F, typename R> 40 class task_arena_function : public delegate_base { 41 F &my_func; 42 aligned_space<R> my_return_storage; 43 bool my_constructed{false}; 44 // The function should be called only once. 45 bool operator()() const override { 46 new (my_return_storage.begin()) R(my_func()); 47 return true; 48 } 49 public: 50 task_arena_function(F& f) : my_func(f) {} 51 // The function can be called only after operator() and only once. 52 R consume_result() { 53 my_constructed = true; 54 return std::move(*(my_return_storage.begin())); 55 } 56 ~task_arena_function() override { 57 if (my_constructed) { 58 my_return_storage.begin()->~R(); 59 } 60 } 61 }; 62 63 template<typename F> 64 class task_arena_function<F,void> : public delegate_base { 65 F &my_func; 66 bool operator()() const override { 67 my_func(); 68 return true; 69 } 70 public: 71 task_arena_function(F& f) : my_func(f) {} 72 void consume_result() const {} 73 74 friend class task_arena_base; 75 }; 76 77 class task_arena_base; 78 class task_scheduler_observer; 79 } // namespace d1 80 81 namespace r1 { 82 class arena; 83 struct task_arena_impl; 84 85 TBB_EXPORT void __TBB_EXPORTED_FUNC observe(d1::task_scheduler_observer&, bool); 86 TBB_EXPORT void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base&); 87 TBB_EXPORT void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base&); 88 TBB_EXPORT bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base&); 89 TBB_EXPORT void __TBB_EXPORTED_FUNC execute(d1::task_arena_base&, d1::delegate_base&); 90 TBB_EXPORT void __TBB_EXPORTED_FUNC wait(d1::task_arena_base&); 91 TBB_EXPORT int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base*); 92 TBB_EXPORT void __TBB_EXPORTED_FUNC isolate_within_arena(d1::delegate_base& d, std::intptr_t); 93 94 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_arena_base*); 95 TBB_EXPORT void __TBB_EXPORTED_FUNC enqueue(d1::task&, d1::task_group_context&, d1::task_arena_base*); 96 TBB_EXPORT void __TBB_EXPORTED_FUNC submit(d1::task&, d1::task_group_context&, arena*, std::uintptr_t); 97 } // namespace r1 98 99 namespace d2 { 100 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 101 inline void enqueue_impl(task_handle&& th, d1::task_arena_base* ta) { 102 if (th == nullptr) { 103 throw_exception(exception_id::bad_task_handle); 104 } 105 106 auto& ctx = task_handle_accessor::ctx_of(th); 107 108 // Do not access th after release 109 r1::enqueue(*task_handle_accessor::release(th), ctx, ta); 110 } 111 #endif// __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 112 113 } 114 namespace d1 { 115 116 static constexpr int priority_stride = INT_MAX / 4; 117 118 class task_arena_base { 119 friend struct r1::task_arena_impl; 120 friend void r1::observe(d1::task_scheduler_observer&, bool); 121 public: 122 enum class priority : int { 123 low = 1 * priority_stride, 124 normal = 2 * priority_stride, 125 high = 3 * priority_stride 126 }; 127 #if __TBB_ARENA_BINDING 128 using constraints = tbb::detail::d1::constraints; 129 #endif /*__TBB_ARENA_BINDING*/ 130 protected: 131 //! Special settings 132 intptr_t my_version_and_traits; 133 134 std::atomic<do_once_state> my_initialization_state; 135 136 //! NULL if not currently initialized. 137 std::atomic<r1::arena*> my_arena; 138 static_assert(sizeof(std::atomic<r1::arena*>) == sizeof(r1::arena*), 139 "To preserve backward compatibility we need the equal size of an atomic pointer and a pointer"); 140 141 //! Concurrency level for deferred initialization 142 int my_max_concurrency; 143 144 //! Reserved slots for external threads 145 unsigned my_num_reserved_slots; 146 147 //! Arena priority 148 priority my_priority; 149 150 //! The NUMA node index to which the arena will be attached 151 numa_node_id my_numa_id; 152 153 //! The core type index to which arena will be attached 154 core_type_id my_core_type; 155 156 //! Number of threads per core 157 int my_max_threads_per_core; 158 159 // Backward compatibility checks. 160 core_type_id core_type() const { 161 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_core_type : automatic; 162 } 163 int max_threads_per_core() const { 164 return (my_version_and_traits & core_type_support_flag) == core_type_support_flag ? my_max_threads_per_core : automatic; 165 } 166 167 enum { 168 default_flags = 0 169 , core_type_support_flag = 1 170 }; 171 172 task_arena_base(int max_concurrency, unsigned reserved_for_masters, priority a_priority) 173 : my_version_and_traits(default_flags | core_type_support_flag) 174 , my_initialization_state(do_once_state::uninitialized) 175 , my_arena(nullptr) 176 , my_max_concurrency(max_concurrency) 177 , my_num_reserved_slots(reserved_for_masters) 178 , my_priority(a_priority) 179 , my_numa_id(automatic) 180 , my_core_type(automatic) 181 , my_max_threads_per_core(automatic) 182 {} 183 184 #if __TBB_ARENA_BINDING 185 task_arena_base(const constraints& constraints_, unsigned reserved_for_masters, priority a_priority) 186 : my_version_and_traits(default_flags | core_type_support_flag) 187 , my_initialization_state(do_once_state::uninitialized) 188 , my_arena(nullptr) 189 , my_max_concurrency(constraints_.max_concurrency) 190 , my_num_reserved_slots(reserved_for_masters) 191 , my_priority(a_priority) 192 , my_numa_id(constraints_.numa_id) 193 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 194 , my_core_type(constraints_.core_type) 195 , my_max_threads_per_core(constraints_.max_threads_per_core) 196 #else 197 , my_core_type(automatic) 198 , my_max_threads_per_core(automatic) 199 #endif 200 {} 201 #endif /*__TBB_ARENA_BINDING*/ 202 public: 203 //! Typedef for number of threads that is automatic. 204 static const int automatic = -1; 205 static const int not_initialized = -2; 206 }; 207 208 template<typename R, typename F> 209 R isolate_impl(F& f) { 210 task_arena_function<F, R> func(f); 211 r1::isolate_within_arena(func, /*isolation*/ 0); 212 return func.consume_result(); 213 } 214 215 template <typename F> 216 class enqueue_task : public task { 217 small_object_allocator m_allocator; 218 const F m_func; 219 220 void finalize(const execution_data& ed) { 221 m_allocator.delete_object(this, ed); 222 } 223 task* execute(execution_data& ed) override { 224 m_func(); 225 finalize(ed); 226 return nullptr; 227 } 228 task* cancel(execution_data&) override { 229 __TBB_ASSERT_RELEASE(false, "Unhandled exception from enqueue task is caught"); 230 return nullptr; 231 } 232 public: 233 enqueue_task(const F& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(f) {} 234 enqueue_task(F&& f, small_object_allocator& alloc) : m_allocator(alloc), m_func(std::move(f)) {} 235 }; 236 237 template<typename F> 238 void enqueue_impl(F&& f, task_arena_base* ta) { 239 small_object_allocator alloc{}; 240 r1::enqueue(*alloc.new_object<enqueue_task<typename std::decay<F>::type>>(std::forward<F>(f), alloc), ta); 241 } 242 /** 1-to-1 proxy representation class of scheduler's arena 243 * Constructors set up settings only, real construction is deferred till the first method invocation 244 * Destructor only removes one of the references to the inner arena representation. 245 * Final destruction happens when all the references (and the work) are gone. 246 */ 247 class task_arena : public task_arena_base { 248 249 void mark_initialized() { 250 __TBB_ASSERT( my_arena.load(std::memory_order_relaxed), "task_arena initialization is incomplete" ); 251 my_initialization_state.store(do_once_state::initialized, std::memory_order_release); 252 } 253 254 template<typename R, typename F> 255 R execute_impl(F& f) { 256 initialize(); 257 task_arena_function<F, R> func(f); 258 r1::execute(*this, func); 259 return func.consume_result(); 260 } 261 public: 262 //! Creates task_arena with certain concurrency limits 263 /** Sets up settings only, real construction is deferred till the first method invocation 264 * @arg max_concurrency specifies total number of slots in arena where threads work 265 * @arg reserved_for_masters specifies number of slots to be used by external threads only. 266 * Value of 1 is default and reflects behavior of implicit arenas. 267 **/ 268 task_arena(int max_concurrency_ = automatic, unsigned reserved_for_masters = 1, 269 priority a_priority = priority::normal) 270 : task_arena_base(max_concurrency_, reserved_for_masters, a_priority) 271 {} 272 273 #if __TBB_ARENA_BINDING 274 //! Creates task arena pinned to certain NUMA node 275 task_arena(const constraints& constraints_, unsigned reserved_for_masters = 1, 276 priority a_priority = priority::normal) 277 : task_arena_base(constraints_, reserved_for_masters, a_priority) 278 {} 279 280 //! Copies settings from another task_arena 281 task_arena(const task_arena &s) // copy settings but not the reference or instance 282 : task_arena_base( 283 constraints{} 284 .set_numa_id(s.my_numa_id) 285 .set_max_concurrency(s.my_max_concurrency) 286 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 287 .set_core_type(s.my_core_type) 288 .set_max_threads_per_core(s.my_max_threads_per_core) 289 #endif 290 , s.my_num_reserved_slots, s.my_priority) 291 {} 292 #else 293 //! Copies settings from another task_arena 294 task_arena(const task_arena& a) // copy settings but not the reference or instance 295 : task_arena_base(a.my_max_concurrency, a.my_num_reserved_slots, a.my_priority) 296 {} 297 #endif /*__TBB_ARENA_BINDING*/ 298 299 //! Tag class used to indicate the "attaching" constructor 300 struct attach {}; 301 302 //! Creates an instance of task_arena attached to the current arena of the thread 303 explicit task_arena( attach ) 304 : task_arena_base(automatic, 1, priority::normal) // use default settings if attach fails 305 { 306 if (r1::attach(*this)) { 307 mark_initialized(); 308 } 309 } 310 311 //! Forces allocation of the resources for the task_arena as specified in constructor arguments 312 void initialize() { 313 atomic_do_once([this]{ r1::initialize(*this); }, my_initialization_state); 314 } 315 316 //! Overrides concurrency level and forces initialization of internal representation 317 void initialize(int max_concurrency_, unsigned reserved_for_masters = 1, 318 priority a_priority = priority::normal) 319 { 320 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); 321 if( !is_active() ) { 322 my_max_concurrency = max_concurrency_; 323 my_num_reserved_slots = reserved_for_masters; 324 my_priority = a_priority; 325 r1::initialize(*this); 326 mark_initialized(); 327 } 328 } 329 330 #if __TBB_ARENA_BINDING 331 void initialize(constraints constraints_, unsigned reserved_for_masters = 1, 332 priority a_priority = priority::normal) 333 { 334 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); 335 if( !is_active() ) { 336 my_numa_id = constraints_.numa_id; 337 my_max_concurrency = constraints_.max_concurrency; 338 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 339 my_core_type = constraints_.core_type; 340 my_max_threads_per_core = constraints_.max_threads_per_core; 341 #endif 342 my_num_reserved_slots = reserved_for_masters; 343 my_priority = a_priority; 344 r1::initialize(*this); 345 mark_initialized(); 346 } 347 } 348 #endif /*__TBB_ARENA_BINDING*/ 349 350 //! Attaches this instance to the current arena of the thread 351 void initialize(attach) { 352 // TODO: decide if this call must be thread-safe 353 __TBB_ASSERT(!my_arena.load(std::memory_order_relaxed), "Impossible to modify settings of an already initialized task_arena"); 354 if( !is_active() ) { 355 if ( !r1::attach(*this) ) { 356 r1::initialize(*this); 357 } 358 mark_initialized(); 359 } 360 } 361 362 //! Removes the reference to the internal arena representation. 363 //! Not thread safe wrt concurrent invocations of other methods. 364 void terminate() { 365 if( is_active() ) { 366 r1::terminate(*this); 367 my_initialization_state.store(do_once_state::uninitialized, std::memory_order_relaxed); 368 } 369 } 370 371 //! Removes the reference to the internal arena representation, and destroys the external object. 372 //! Not thread safe wrt concurrent invocations of other methods. 373 ~task_arena() { 374 terminate(); 375 } 376 377 //! Returns true if the arena is active (initialized); false otherwise. 378 //! The name was chosen to match a task_scheduler_init method with the same semantics. 379 bool is_active() const { 380 return my_initialization_state.load(std::memory_order_acquire) == do_once_state::initialized; 381 } 382 383 //! Enqueues a task into the arena to process a functor, and immediately returns. 384 //! Does not require the calling thread to join the arena 385 386 template<typename F> 387 void enqueue(F&& f) { 388 initialize(); 389 enqueue_impl(std::forward<F>(f), this); 390 } 391 392 //! Enqueues a task into the arena to process a functor wrapped in task_handle, and immediately returns. 393 //! Does not require the calling thread to join the arena 394 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 395 void enqueue(d2::task_handle&& th) { 396 initialize(); 397 d2::enqueue_impl(std::move(th), this); 398 } 399 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 400 401 //! Joins the arena and executes a mutable functor, then returns 402 //! If not possible to join, wraps the functor into a task, enqueues it and waits for task completion 403 //! Can decrement the arena demand for workers, causing a worker to leave and free a slot to the calling thread 404 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). 405 template<typename F> 406 auto execute(F&& f) -> decltype(f()) { 407 return execute_impl<decltype(f())>(f); 408 } 409 410 #if __TBB_EXTRA_DEBUG 411 //! Returns my_num_reserved_slots 412 int debug_reserved_slots() const { 413 // Handle special cases inside the library 414 return my_num_reserved_slots; 415 } 416 417 //! Returns my_max_concurrency 418 int debug_max_concurrency() const { 419 // Handle special cases inside the library 420 return my_max_concurrency; 421 } 422 423 //! Wait for all work in the arena to be completed 424 //! Even submitted by other application threads 425 //! Joins arena if/when possible (in the same way as execute()) 426 void debug_wait_until_empty() { 427 initialize(); 428 r1::wait(*this); 429 } 430 #endif //__TBB_EXTRA_DEBUG 431 432 //! Returns the maximal number of threads that can work inside the arena 433 int max_concurrency() const { 434 // Handle special cases inside the library 435 return (my_max_concurrency > 1) ? my_max_concurrency : r1::max_concurrency(this); 436 } 437 438 friend void submit(task& t, task_arena& ta, task_group_context& ctx, bool as_critical) { 439 __TBB_ASSERT(ta.is_active(), nullptr); 440 call_itt_task_notify(releasing, &t); 441 r1::submit(t, ctx, ta.my_arena.load(std::memory_order_relaxed), as_critical ? 1 : 0); 442 } 443 }; 444 445 //! Executes a mutable functor in isolation within the current task arena. 446 //! Since C++11, the method returns the value returned by functor (prior to C++11 it returns void). 447 template<typename F> 448 inline auto isolate(F&& f) -> decltype(f()) { 449 return isolate_impl<decltype(f())>(f); 450 } 451 452 //! Returns the index, aka slot number, of the calling thread in its current arena 453 inline int current_thread_index() { 454 slot_id idx = r1::execution_slot(nullptr); 455 return idx == slot_id(-1) ? task_arena_base::not_initialized : int(idx); 456 } 457 458 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 459 inline bool is_inside_task() { 460 return nullptr != current_context(); 461 } 462 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 463 464 //! Returns the maximal number of threads that can work inside the arena 465 inline int max_concurrency() { 466 return r1::max_concurrency(nullptr); 467 } 468 469 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 470 inline void enqueue(d2::task_handle&& th) { 471 d2::enqueue_impl(std::move(th), nullptr); 472 } 473 474 template<typename F> 475 inline void enqueue(F&& f) { 476 enqueue_impl(std::forward<F>(f), nullptr); 477 } 478 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 479 480 using r1::submit; 481 482 } // namespace d1 483 } // namespace detail 484 485 inline namespace v1 { 486 using detail::d1::task_arena; 487 488 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 489 using detail::d1::is_inside_task; 490 #endif 491 492 namespace this_task_arena { 493 using detail::d1::current_thread_index; 494 using detail::d1::max_concurrency; 495 using detail::d1::isolate; 496 497 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 498 using detail::d1::enqueue; 499 #endif 500 } // namespace this_task_arena 501 502 } // inline namespace v1 503 504 } // namespace tbb 505 #endif /* __TBB_task_arena_H */ 506