1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "threading_control.h" 20 #include "arena.h" 21 #include "itt_notify.h" 22 #include "semaphore.h" 23 #include "waiters.h" 24 #include "oneapi/tbb/detail/_task.h" 25 #include "oneapi/tbb/info.h" 26 #include "oneapi/tbb/tbb_allocator.h" 27 28 #include <atomic> 29 #include <cstring> 30 #include <functional> 31 32 namespace tbb { 33 namespace detail { 34 namespace r1 { 35 36 #if __TBB_ARENA_BINDING 37 class numa_binding_observer : public tbb::task_scheduler_observer { 38 binding_handler* my_binding_handler; 39 public: 40 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 41 : task_scheduler_observer(*ta) 42 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 43 {} 44 45 void on_scheduler_entry( bool ) override { 46 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 47 } 48 49 void on_scheduler_exit( bool ) override { 50 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 51 } 52 53 ~numa_binding_observer() override{ 54 destroy_binding_handler(my_binding_handler); 55 } 56 }; 57 58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 59 numa_binding_observer* binding_observer = nullptr; 60 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 61 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 62 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 63 binding_observer->observe(true); 64 } 65 return binding_observer; 66 } 67 68 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 69 __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer"); 70 binding_observer->observe(false); 71 binding_observer->~numa_binding_observer(); 72 deallocate_memory(binding_observer); 73 } 74 #endif /*!__TBB_ARENA_BINDING*/ 75 76 void arena::on_thread_leaving(unsigned ref_param) { 77 // 78 // Implementation of arena destruction synchronization logic contained various 79 // bugs/flaws at the different stages of its evolution, so below is a detailed 80 // description of the issues taken into consideration in the framework of the 81 // current design. 82 // 83 // In case of using fire-and-forget tasks (scheduled via task::enqueue()) 84 // external thread is allowed to leave its arena before all its work is executed, 85 // and market may temporarily revoke all workers from this arena. Since revoked 86 // workers never attempt to reset arena state to EMPTY and cancel its request 87 // to RML for threads, the arena object is destroyed only when both the last 88 // thread is leaving it and arena's state is EMPTY (that is its external thread 89 // left and it does not contain any work). 90 // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not 91 // be done here (or anywhere else in the external thread to that matter); doing so 92 // can result either in arena's premature destruction (at least without 93 // additional costly checks in workers) or in unnecessary arena state changes 94 // (and ensuing workers migration). 95 // 96 // A worker that checks for work presence and transitions arena to the EMPTY 97 // state (in snapshot taking procedure arena::out_of_work()) updates 98 // arena::my_pool_state first and only then arena::my_num_workers_requested. 99 // So the check for work absence must be done against the latter field. 100 // 101 // In a time window between decrementing the active threads count and checking 102 // if there is an outstanding request for workers. New worker thread may arrive, 103 // finish remaining work, set arena state to empty, and leave decrementing its 104 // refcount and destroying. Then the current thread will destroy the arena 105 // the second time. To preclude it a local copy of the outstanding request 106 // value can be stored before decrementing active threads count. 107 // 108 // But this technique may cause two other problem. When the stored request is 109 // zero, it is possible that arena still has threads and they can generate new 110 // tasks and thus re-establish non-zero requests. Then all the threads can be 111 // revoked (as described above) leaving this thread the last one, and causing 112 // it to destroy non-empty arena. 113 // 114 // The other problem takes place when the stored request is non-zero. Another 115 // thread may complete the work, set arena state to empty, and leave without 116 // arena destruction before this thread decrements the refcount. This thread 117 // cannot destroy the arena either. Thus the arena may be "orphaned". 118 // 119 // In both cases we cannot dereference arena pointer after the refcount is 120 // decremented, as our arena may already be destroyed. 121 // 122 // If this is the external thread, the market is protected by refcount to it. 123 // In case of workers market's liveness is ensured by the RML connection 124 // rundown protocol, according to which the client (i.e. the market) lives 125 // until RML server notifies it about connection termination, and this 126 // notification is fired only after all workers return into RML. 127 // 128 // Thus if we decremented refcount to zero we ask the market to check arena 129 // state (including the fact if it is alive) under the lock. 130 // 131 132 __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter"); 133 134 // When there is no workers someone must free arena, as 135 // without workers, no one calls out_of_work(). 136 if (ref_param == ref_external && !my_mandatory_concurrency.test()) { 137 out_of_work(); 138 } 139 140 threading_control* tc = my_threading_control; 141 auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client); 142 // Release our reference to sync with destroy_client 143 unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param; 144 // do not access `this` it might be destroyed already 145 if (remaining_ref == 0) { 146 if (tc->try_destroy_client(tc_client_snapshot)) { 147 // We are requested to destroy ourself 148 free_arena(); 149 } 150 } 151 } 152 153 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 154 if ( lower >= upper ) return out_of_arena; 155 // Start search for an empty slot from the one we occupied the last time 156 std::size_t index = tls.my_arena_index; 157 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 158 __TBB_ASSERT( index >= lower && index < upper, nullptr); 159 // Find a free slot 160 for ( std::size_t i = index; i < upper; ++i ) 161 if (my_slots[i].try_occupy()) return i; 162 for ( std::size_t i = lower; i < index; ++i ) 163 if (my_slots[i].try_occupy()) return i; 164 return out_of_arena; 165 } 166 167 template <bool as_worker> 168 std::size_t arena::occupy_free_slot(thread_data& tls) { 169 // Firstly, external threads try to occupy reserved slots 170 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 171 if ( index == out_of_arena ) { 172 // Secondly, all threads try to occupy all non-reserved slots 173 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 174 // Likely this arena is already saturated 175 if ( index == out_of_arena ) 176 return out_of_arena; 177 } 178 179 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 180 return index; 181 } 182 183 std::uintptr_t arena::calculate_stealing_threshold() { 184 stack_anchor_type anchor; 185 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size()); 186 } 187 188 void arena::process(thread_data& tls) { 189 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 190 __TBB_ASSERT( is_alive(my_guard), nullptr); 191 __TBB_ASSERT( my_num_slots >= 1, nullptr); 192 193 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 194 if (index == out_of_arena) { 195 on_thread_leaving(ref_worker); 196 return; 197 } 198 199 my_tc_client.get_pm_client()->register_thread(); 200 201 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 202 tls.attach_arena(*this, index); 203 // worker thread enters the dispatch loop to look for a work 204 tls.my_inbox.set_is_idle(true); 205 if (tls.my_arena_slot->is_task_pool_published()) { 206 tls.my_inbox.set_is_idle(false); 207 } 208 209 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 210 tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold()); 211 __TBB_ASSERT(task_disp.can_steal(), nullptr); 212 213 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 214 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 215 216 // Waiting on special object tied to this arena 217 outermost_worker_waiter waiter(*this); 218 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 219 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is 220 // attached to it. 221 tls.my_inbox.set_is_idle(true); 222 223 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 224 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 225 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 226 227 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 228 tls.my_last_observer = nullptr; 229 230 tls.leave_task_dispatcher(); 231 232 // Arena slot detach (arena may be used in market::process) 233 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 234 tls.my_arena_slot->release(); 235 tls.my_arena_slot = nullptr; 236 tls.my_inbox.detach(); 237 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 238 __TBB_ASSERT(is_alive(my_guard), nullptr); 239 240 my_tc_client.get_pm_client()->unregister_thread(); 241 242 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 243 // that arena may be temporarily left unpopulated by threads. See comments in 244 // arena::on_thread_leaving() for more details. 245 on_thread_leaving(ref_worker); 246 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 247 } 248 249 arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) { 250 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 251 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 252 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 253 my_threading_control = control; 254 my_limit = 1; 255 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 256 my_num_slots = num_arena_slots(num_slots, num_reserved_slots); 257 my_num_reserved_slots = num_reserved_slots; 258 my_max_num_workers = num_slots-num_reserved_slots; 259 my_priority_level = priority_level; 260 my_references = ref_external; // accounts for the external thread 261 my_observers.my_arena = this; 262 my_co_cache.init(4 * num_slots); 263 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr); 264 // Initialize the default context. It should be allocated before task_dispatch construction. 265 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 266 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 267 // Construct slots. Mark internal synchronization elements for the tools. 268 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 269 for( unsigned i = 0; i < my_num_slots; ++i ) { 270 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr); 271 __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr); 272 __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr); 273 mailbox(i).construct(); 274 my_slots[i].init_task_streams(i); 275 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 276 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 277 } 278 my_fifo_task_stream.initialize(my_num_slots); 279 my_resume_task_stream.initialize(my_num_slots); 280 #if __TBB_PREVIEW_CRITICAL_TASKS 281 my_critical_task_stream.initialize(my_num_slots); 282 #endif 283 my_mandatory_requests = 0; 284 } 285 286 arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, 287 unsigned priority_level) 288 { 289 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 290 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 291 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 292 std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots)); 293 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 294 // Zero all slots to indicate that they are empty 295 std::memset( storage, 0, n ); 296 297 return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) ) 298 arena(control, num_slots, num_reserved_slots, priority_level); 299 } 300 301 void arena::free_arena () { 302 __TBB_ASSERT( is_alive(my_guard), nullptr); 303 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 304 __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 305 __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" ); 306 #if __TBB_ARENA_BINDING 307 if (my_numa_binding_observer != nullptr) { 308 destroy_binding_observer(my_numa_binding_observer); 309 my_numa_binding_observer = nullptr; 310 } 311 #endif /*__TBB_ARENA_BINDING*/ 312 poison_value( my_guard ); 313 for ( unsigned i = 0; i < my_num_slots; ++i ) { 314 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 315 // TODO: understand the assertion and modify 316 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr); 317 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty 318 my_slots[i].free_task_pool(); 319 mailbox(i).drain(); 320 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 321 } 322 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 323 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 324 // Cleanup coroutines/schedulers cache 325 my_co_cache.cleanup(); 326 my_default_ctx->~task_group_context(); 327 cache_aligned_deallocate(my_default_ctx); 328 #if __TBB_PREVIEW_CRITICAL_TASKS 329 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 330 #endif 331 // Clear enfources synchronization with observe(false) 332 my_observers.clear(); 333 334 void* storage = &mailbox(my_num_slots-1); 335 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr); 336 this->~arena(); 337 #if TBB_USE_ASSERT > 1 338 std::memset( storage, 0, allocation_size(my_num_slots) ); 339 #endif /* TBB_USE_ASSERT */ 340 cache_aligned_deallocate( storage ); 341 } 342 343 bool arena::has_enqueued_tasks() { 344 return !my_fifo_task_stream.empty(); 345 } 346 347 void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) { 348 my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta); 349 350 if (wakeup_threads) { 351 // Notify all sleeping threads that work has appeared in the arena. 352 get_waiting_threads_monitor().notify([&] (market_context context) { 353 return this == context.my_arena_addr; 354 }); 355 } 356 } 357 358 bool arena::has_tasks() { 359 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 360 std::size_t n = my_limit.load(std::memory_order_acquire); 361 bool tasks_are_available = false; 362 for (std::size_t k = 0; k < n && !tasks_are_available; ++k) { 363 tasks_are_available = !my_slots[k].is_empty(); 364 } 365 tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty(); 366 #if __TBB_PREVIEW_CRITICAL_TASKS 367 tasks_are_available = tasks_are_available || !my_critical_task_stream.empty(); 368 #endif 369 return tasks_are_available; 370 } 371 372 void arena::out_of_work() { 373 // We should try unset my_pool_state first due to keep arena invariants in consistent state 374 // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant 375 bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); }); 376 bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); }); 377 378 if (disable_mandatory || release_workers) { 379 int mandatory_delta = disable_mandatory ? -1 : 0; 380 int workers_delta = release_workers ? -(int)my_max_num_workers : 0; 381 382 if (disable_mandatory && is_arena_workerless()) { 383 // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now 384 workers_delta = -1; 385 } 386 request_workers(mandatory_delta, workers_delta); 387 } 388 } 389 390 void arena::set_top_priority(bool is_top_priority) { 391 my_is_top_priority.store(is_top_priority, std::memory_order_relaxed); 392 } 393 394 bool arena::is_top_priority() const { 395 return my_is_top_priority.load(std::memory_order_relaxed); 396 } 397 398 bool arena::try_join() { 399 if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) { 400 my_references += arena::ref_worker; 401 return true; 402 } 403 return false; 404 } 405 406 void arena::set_allotment(unsigned allotment) { 407 if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) { 408 my_num_workers_allotted.store(allotment, std::memory_order_relaxed); 409 } 410 } 411 412 int arena::update_concurrency(unsigned allotment) { 413 set_allotment(allotment); 414 return allotment - static_cast<int>(my_num_workers_allotted.load(std::memory_order_relaxed)); 415 } 416 417 std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) { 418 __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr); 419 420 int min_workers_request = 0; 421 int max_workers_request = 0; 422 423 // Calculate min request 424 my_mandatory_requests += mandatory_delta; 425 min_workers_request = my_mandatory_requests > 0 ? 1 : 0; 426 427 // Calculate max request 428 my_total_num_workers_requested += workers_delta; 429 // Clamp worker request into interval [0, my_max_num_workers] 430 max_workers_request = clamp(my_total_num_workers_requested, 0, 431 min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers); 432 433 return { min_workers_request, max_workers_request }; 434 } 435 436 thread_control_monitor& arena::get_waiting_threads_monitor() { 437 return my_threading_control->get_waiting_threads_monitor(); 438 } 439 440 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 441 task_group_context_impl::bind_to(ctx, &td); 442 task_accessor::context(t) = &ctx; 443 task_accessor::isolation(t) = no_isolation; 444 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 445 advertise_new_work<work_enqueued>(); 446 } 447 448 arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) { 449 __TBB_ASSERT(num_slots > 0, NULL); 450 __TBB_ASSERT(num_reserved_slots <= num_slots, NULL); 451 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). 452 arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level); 453 a.my_tc_client = control->create_client(a); 454 // We should not publish arena until all fields are initialized 455 control->publish_client(a.my_tc_client, constraints); 456 return a; 457 } 458 459 } // namespace r1 460 } // namespace detail 461 } // namespace tbb 462 463 // Enable task_arena.h 464 #include "oneapi/tbb/task_arena.h" // task_arena_base 465 466 namespace tbb { 467 namespace detail { 468 namespace r1 { 469 470 #if TBB_USE_ASSERT 471 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 472 bool is_arena_priority_correct = 473 a_priority == tbb::task_arena::priority::high || 474 a_priority == tbb::task_arena::priority::normal || 475 a_priority == tbb::task_arena::priority::low; 476 __TBB_ASSERT( is_arena_priority_correct, 477 "Task arena priority should be equal to one of the predefined values." ); 478 } 479 #else 480 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 481 #endif 482 483 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 484 assert_arena_priority_valid( a_priority ); 485 return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 486 } 487 488 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 489 auto priority = tbb::task_arena::priority( 490 (d1::num_priority_levels - priority_level) * d1::priority_stride 491 ); 492 assert_arena_priority_valid( priority ); 493 return priority; 494 } 495 496 struct task_arena_impl { 497 static void initialize(d1::task_arena_base&); 498 static void terminate(d1::task_arena_base&); 499 static bool attach(d1::task_arena_base&); 500 static void execute(d1::task_arena_base&, d1::delegate_base&); 501 static void wait(d1::task_arena_base&); 502 static int max_concurrency(const d1::task_arena_base*); 503 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); 504 }; 505 506 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 507 task_arena_impl::initialize(ta); 508 } 509 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 510 task_arena_impl::terminate(ta); 511 } 512 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 513 return task_arena_impl::attach(ta); 514 } 515 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 516 task_arena_impl::execute(ta, d); 517 } 518 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 519 task_arena_impl::wait(ta); 520 } 521 522 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 523 return task_arena_impl::max_concurrency(ta); 524 } 525 526 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 527 task_arena_impl::enqueue(t, nullptr, ta); 528 } 529 530 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) { 531 task_arena_impl::enqueue(t, &ctx, ta); 532 } 533 534 void task_arena_impl::initialize(d1::task_arena_base& ta) { 535 // Enforce global market initialization to properly initialize soft limit 536 (void)governor::get_thread_data(); 537 d1::constraints arena_constraints; 538 539 #if __TBB_ARENA_BINDING 540 arena_constraints = d1::constraints{} 541 .set_core_type(ta.core_type()) 542 .set_max_threads_per_core(ta.max_threads_per_core()) 543 .set_numa_id(ta.my_numa_id); 544 #endif /*__TBB_ARENA_BINDING*/ 545 546 if (ta.my_max_concurrency < 1) { 547 #if __TBB_ARENA_BINDING 548 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 549 #else /*!__TBB_ARENA_BINDING*/ 550 ta.my_max_concurrency = (int)governor::default_num_threads(); 551 #endif /*!__TBB_ARENA_BINDING*/ 552 } 553 554 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 555 unsigned priority_level = arena_priority_level(ta.my_priority); 556 threading_control* thr_control = threading_control::register_public_reference(); 557 arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints); 558 559 ta.my_arena.store(&a, std::memory_order_release); 560 #if __TBB_CPUBIND_PRESENT 561 a.my_numa_binding_observer = construct_binding_observer( 562 static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 563 #endif /*__TBB_CPUBIND_PRESENT*/ 564 } 565 566 void task_arena_impl::terminate(d1::task_arena_base& ta) { 567 arena* a = ta.my_arena.load(std::memory_order_relaxed); 568 assert_pointer_valid(a); 569 threading_control::unregister_public_reference(/*blocking_terminate=*/false); 570 a->on_thread_leaving(arena::ref_external); 571 ta.my_arena.store(nullptr, std::memory_order_relaxed); 572 } 573 574 bool task_arena_impl::attach(d1::task_arena_base& ta) { 575 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 576 thread_data* td = governor::get_thread_data_if_initialized(); 577 if( td && td->my_arena ) { 578 arena* a = td->my_arena; 579 // There is an active arena to attach to. 580 // It's still used by s, so won't be destroyed right away. 581 __TBB_ASSERT(a->my_references > 0, nullptr); 582 a->my_references += arena::ref_external; 583 ta.my_num_reserved_slots = a->my_num_reserved_slots; 584 ta.my_priority = arena_priority(a->my_priority_level); 585 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 586 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr); 587 ta.my_arena.store(a, std::memory_order_release); 588 // increases threading_control's ref count for task_arena 589 threading_control::register_public_reference(); 590 return true; 591 } 592 return false; 593 } 594 595 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) { 596 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 597 assert_pointer_valid(td, "thread_data pointer should not be null"); 598 arena* a = ta ? 599 ta->my_arena.load(std::memory_order_relaxed) 600 : td->my_arena 601 ; 602 assert_pointer_valid(a, "arena pointer should not be null"); 603 auto* ctx = c ? c : a->my_default_ctx; 604 assert_pointer_valid(ctx, "context pointer should not be null"); 605 // Is there a better place for checking the state of ctx? 606 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 607 "The task will not be executed because its task_group_context is cancelled."); 608 a->enqueue_task(t, *ctx, *td); 609 } 610 611 class nested_arena_context : no_copy { 612 public: 613 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 614 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 615 { 616 if (td.my_arena != &nested_arena) { 617 m_orig_arena = td.my_arena; 618 m_orig_slot_index = td.my_arena_index; 619 m_orig_last_observer = td.my_last_observer; 620 621 td.detach_task_dispatcher(); 622 td.attach_arena(nested_arena, slot_index); 623 if (td.my_inbox.is_idle_state(true)) 624 td.my_inbox.set_is_idle(false); 625 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 626 td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold); 627 628 // If the calling thread occupies the slots out of external thread reserve we need to notify the 629 // market that this arena requires one worker less. 630 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 631 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1); 632 } 633 634 td.my_last_observer = nullptr; 635 // The task_arena::execute method considers each calling thread as an external thread. 636 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 637 } 638 639 m_task_dispatcher = td.my_task_dispatcher; 640 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 641 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 642 m_task_dispatcher->m_properties.critical_task_allowed = true; 643 644 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 645 ed_ext.context = td.my_arena->my_default_ctx; 646 ed_ext.original_slot = td.my_arena_index; 647 ed_ext.affinity_slot = d1::no_slot; 648 ed_ext.task_disp = td.my_task_dispatcher; 649 ed_ext.isolation = no_isolation; 650 651 __TBB_ASSERT(td.my_arena_slot, nullptr); 652 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 653 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 654 } 655 ~nested_arena_context() { 656 thread_data& td = *m_task_dispatcher->m_thread_data; 657 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 658 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 659 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 660 if (m_orig_arena) { 661 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 662 td.my_last_observer = m_orig_last_observer; 663 664 // Notify the market that this thread releasing a one slot 665 // that can be used by a worker thread. 666 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 667 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1); 668 } 669 670 td.leave_task_dispatcher(); 671 td.my_arena_slot->release(); 672 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 673 674 td.attach_arena(*m_orig_arena, m_orig_slot_index); 675 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 676 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 677 } 678 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 679 } 680 681 private: 682 execution_data_ext m_orig_execute_data_ext{}; 683 arena* m_orig_arena{ nullptr }; 684 observer_proxy* m_orig_last_observer{ nullptr }; 685 task_dispatcher* m_task_dispatcher{ nullptr }; 686 unsigned m_orig_slot_index{}; 687 bool m_orig_fifo_tasks_allowed{}; 688 bool m_orig_critical_task_allowed{}; 689 }; 690 691 class delegated_task : public d1::task { 692 d1::delegate_base& m_delegate; 693 concurrent_monitor& m_monitor; 694 d1::wait_context& m_wait_ctx; 695 std::atomic<bool> m_completed; 696 d1::task* execute(d1::execution_data& ed) override { 697 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 698 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 699 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 700 "The execute data shall point to the current task dispatcher execute data"); 701 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 702 703 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 704 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 705 try_call([&] { 706 m_delegate(); 707 }).on_completion([&] { 708 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 709 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 710 }); 711 712 finalize(); 713 return nullptr; 714 } 715 d1::task* cancel(d1::execution_data&) override { 716 finalize(); 717 return nullptr; 718 } 719 void finalize() { 720 m_wait_ctx.release(); // must precede the wakeup 721 m_monitor.notify([this] (std::uintptr_t ctx) { 722 return ctx == std::uintptr_t(&m_delegate); 723 }); // do not relax, it needs a fence! 724 m_completed.store(true, std::memory_order_release); 725 } 726 public: 727 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 728 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 729 ~delegated_task() override { 730 // The destructor can be called earlier than the m_monitor is notified 731 // because the waiting thread can be released after m_wait_ctx.release_wait. 732 // To close that race we wait for the m_completed signal. 733 spin_wait_until_eq(m_completed, true); 734 } 735 }; 736 737 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 738 arena* a = ta.my_arena.load(std::memory_order_relaxed); 739 __TBB_ASSERT(a != nullptr, nullptr); 740 thread_data* td = governor::get_thread_data(); 741 742 bool same_arena = td->my_arena == a; 743 std::size_t index1 = td->my_arena_index; 744 if (!same_arena) { 745 index1 = a->occupy_free_slot</*as_worker */false>(*td); 746 if (index1 == arena::out_of_arena) { 747 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 748 d1::wait_context wo(1); 749 d1::task_group_context exec_context(d1::task_group_context::isolated); 750 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 751 752 delegated_task dt(d, a->my_exit_monitors, wo); 753 a->enqueue_task( dt, exec_context, *td); 754 size_t index2 = arena::out_of_arena; 755 do { 756 a->my_exit_monitors.prepare_wait(waiter); 757 if (!wo.continue_execution()) { 758 a->my_exit_monitors.cancel_wait(waiter); 759 break; 760 } 761 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 762 if (index2 != arena::out_of_arena) { 763 a->my_exit_monitors.cancel_wait(waiter); 764 nested_arena_context scope(*td, *a, index2 ); 765 r1::wait(wo, exec_context); 766 __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred 767 break; 768 } 769 a->my_exit_monitors.commit_wait(waiter); 770 } while (wo.continue_execution()); 771 if (index2 == arena::out_of_arena) { 772 // notify a waiting thread even if this thread did not enter arena, 773 // in case it was woken by a leaving thread but did not need to enter 774 a->my_exit_monitors.notify_one(); // do not relax! 775 } 776 // process possible exception 777 auto exception = exec_context.my_exception.load(std::memory_order_acquire); 778 if (exception) { 779 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 780 exception->throw_self(); 781 } 782 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 783 return; 784 } // if (index1 == arena::out_of_arena) 785 } // if (!same_arena) 786 787 context_guard_helper</*report_tasks=*/false> context_guard; 788 context_guard.set_ctx(a->my_default_ctx); 789 nested_arena_context scope(*td, *a, index1); 790 #if _WIN64 791 try { 792 #endif 793 d(); 794 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 795 #if _WIN64 796 } catch (...) { 797 context_guard.restore_default(); 798 throw; 799 } 800 #endif 801 } 802 803 void task_arena_impl::wait(d1::task_arena_base& ta) { 804 arena* a = ta.my_arena.load(std::memory_order_relaxed); 805 __TBB_ASSERT(a != nullptr, nullptr); 806 thread_data* td = governor::get_thread_data(); 807 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 808 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 809 if (a->my_max_num_workers != 0) { 810 while (a->num_workers_active() || !a->is_empty()) { 811 yield(); 812 } 813 } 814 } 815 816 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 817 arena* a = nullptr; 818 if( ta ) // for special cases of ta->max_concurrency() 819 a = ta->my_arena.load(std::memory_order_relaxed); 820 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 821 a = td->my_arena; // the current arena if any 822 823 if( a ) { // Get parameters from the arena 824 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr); 825 int mandatory_worker = 0; 826 if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) { 827 mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0; 828 } 829 return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker; 830 } 831 832 if (ta && ta->my_max_concurrency == 1) { 833 return 1; 834 } 835 836 #if __TBB_ARENA_BINDING 837 if (ta) { 838 d1::constraints arena_constraints = d1::constraints{} 839 .set_numa_id(ta->my_numa_id) 840 .set_core_type(ta->core_type()) 841 .set_max_threads_per_core(ta->max_threads_per_core()); 842 return (int)default_concurrency(arena_constraints); 843 } 844 #endif /*!__TBB_ARENA_BINDING*/ 845 846 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr); 847 return int(governor::default_num_threads()); 848 } 849 850 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 851 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 852 thread_data* tls = governor::get_thread_data(); 853 assert_pointers_valid(tls, tls->my_task_dispatcher); 854 task_dispatcher* dispatcher = tls->my_task_dispatcher; 855 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 856 try_call([&] { 857 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 858 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 859 // Save the current isolation value and set new one 860 previous_isolation = dispatcher->set_isolation(current_isolation); 861 // Isolation within this callable 862 d(); 863 }).on_completion([&] { 864 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr); 865 dispatcher->set_isolation(previous_isolation); 866 }); 867 } 868 869 } // namespace r1 870 } // namespace detail 871 } // namespace tbb 872