1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "threading_control.h" 20 #include "arena.h" 21 #include "itt_notify.h" 22 #include "semaphore.h" 23 #include "waiters.h" 24 #include "oneapi/tbb/detail/_task.h" 25 #include "oneapi/tbb/info.h" 26 #include "oneapi/tbb/tbb_allocator.h" 27 28 #include <atomic> 29 #include <cstring> 30 #include <functional> 31 32 namespace tbb { 33 namespace detail { 34 namespace r1 { 35 36 #if __TBB_ARENA_BINDING 37 class numa_binding_observer : public tbb::task_scheduler_observer { 38 binding_handler* my_binding_handler; 39 public: 40 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 41 : task_scheduler_observer(*ta) 42 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 43 {} 44 45 void on_scheduler_entry( bool ) override { 46 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 47 } 48 49 void on_scheduler_exit( bool ) override { 50 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 51 } 52 53 ~numa_binding_observer() override{ 54 destroy_binding_handler(my_binding_handler); 55 } 56 }; 57 58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 59 numa_binding_observer* binding_observer = nullptr; 60 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 61 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 62 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 63 binding_observer->observe(true); 64 } 65 return binding_observer; 66 } 67 68 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 69 __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer"); 70 binding_observer->observe(false); 71 binding_observer->~numa_binding_observer(); 72 deallocate_memory(binding_observer); 73 } 74 #endif /*!__TBB_ARENA_BINDING*/ 75 76 void arena::on_thread_leaving(unsigned ref_param) { 77 // 78 // Implementation of arena destruction synchronization logic contained various 79 // bugs/flaws at the different stages of its evolution, so below is a detailed 80 // description of the issues taken into consideration in the framework of the 81 // current design. 82 // 83 // In case of using fire-and-forget tasks (scheduled via task::enqueue()) 84 // external thread is allowed to leave its arena before all its work is executed, 85 // and market may temporarily revoke all workers from this arena. Since revoked 86 // workers never attempt to reset arena state to EMPTY and cancel its request 87 // to RML for threads, the arena object is destroyed only when both the last 88 // thread is leaving it and arena's state is EMPTY (that is its external thread 89 // left and it does not contain any work). 90 // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not 91 // be done here (or anywhere else in the external thread to that matter); doing so 92 // can result either in arena's premature destruction (at least without 93 // additional costly checks in workers) or in unnecessary arena state changes 94 // (and ensuing workers migration). 95 // 96 // A worker that checks for work presence and transitions arena to the EMPTY 97 // state (in snapshot taking procedure arena::out_of_work()) updates 98 // arena::my_pool_state first and only then arena::my_num_workers_requested. 99 // So the check for work absence must be done against the latter field. 100 // 101 // In a time window between decrementing the active threads count and checking 102 // if there is an outstanding request for workers. New worker thread may arrive, 103 // finish remaining work, set arena state to empty, and leave decrementing its 104 // refcount and destroying. Then the current thread will destroy the arena 105 // the second time. To preclude it a local copy of the outstanding request 106 // value can be stored before decrementing active threads count. 107 // 108 // But this technique may cause two other problem. When the stored request is 109 // zero, it is possible that arena still has threads and they can generate new 110 // tasks and thus re-establish non-zero requests. Then all the threads can be 111 // revoked (as described above) leaving this thread the last one, and causing 112 // it to destroy non-empty arena. 113 // 114 // The other problem takes place when the stored request is non-zero. Another 115 // thread may complete the work, set arena state to empty, and leave without 116 // arena destruction before this thread decrements the refcount. This thread 117 // cannot destroy the arena either. Thus the arena may be "orphaned". 118 // 119 // In both cases we cannot dereference arena pointer after the refcount is 120 // decremented, as our arena may already be destroyed. 121 // 122 // If this is the external thread, the market is protected by refcount to it. 123 // In case of workers market's liveness is ensured by the RML connection 124 // rundown protocol, according to which the client (i.e. the market) lives 125 // until RML server notifies it about connection termination, and this 126 // notification is fired only after all workers return into RML. 127 // 128 // Thus if we decremented refcount to zero we ask the market to check arena 129 // state (including the fact if it is alive) under the lock. 130 // 131 132 __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter"); 133 134 // When there is no workers someone must free arena, as 135 // without workers, no one calls out_of_work(). 136 if (ref_param == ref_external && !my_mandatory_concurrency.test()) { 137 out_of_work(); 138 } 139 140 threading_control* tc = my_threading_control; 141 auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client); 142 // Release our reference to sync with destroy_client 143 unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param; 144 // do not access `this` it might be destroyed already 145 if (remaining_ref == 0) { 146 if (tc->try_destroy_client(tc_client_snapshot)) { 147 // We are requested to destroy ourself 148 free_arena(); 149 } 150 } 151 } 152 153 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 154 if ( lower >= upper ) return out_of_arena; 155 // Start search for an empty slot from the one we occupied the last time 156 std::size_t index = tls.my_arena_index; 157 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 158 __TBB_ASSERT( index >= lower && index < upper, nullptr); 159 // Find a free slot 160 for ( std::size_t i = index; i < upper; ++i ) 161 if (my_slots[i].try_occupy()) return i; 162 for ( std::size_t i = lower; i < index; ++i ) 163 if (my_slots[i].try_occupy()) return i; 164 return out_of_arena; 165 } 166 167 template <bool as_worker> 168 std::size_t arena::occupy_free_slot(thread_data& tls) { 169 // Firstly, external threads try to occupy reserved slots 170 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 171 if ( index == out_of_arena ) { 172 // Secondly, all threads try to occupy all non-reserved slots 173 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 174 // Likely this arena is already saturated 175 if ( index == out_of_arena ) 176 return out_of_arena; 177 } 178 179 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 180 return index; 181 } 182 183 std::uintptr_t arena::calculate_stealing_threshold() { 184 stack_anchor_type anchor; 185 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size()); 186 } 187 188 void arena::process(thread_data& tls) { 189 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 190 __TBB_ASSERT( is_alive(my_guard), nullptr); 191 __TBB_ASSERT( my_num_slots >= 1, nullptr); 192 193 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 194 if (index == out_of_arena) { 195 on_thread_leaving(ref_worker); 196 return; 197 } 198 199 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 200 tls.attach_arena(*this, index); 201 // worker thread enters the dispatch loop to look for a work 202 tls.my_inbox.set_is_idle(true); 203 if (tls.my_arena_slot->is_task_pool_published()) { 204 tls.my_inbox.set_is_idle(false); 205 } 206 207 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 208 tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold()); 209 __TBB_ASSERT(task_disp.can_steal(), nullptr); 210 211 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 212 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 213 214 // Waiting on special object tied to this arena 215 outermost_worker_waiter waiter(*this); 216 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 217 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is 218 // attached to it. 219 tls.my_inbox.set_is_idle(true); 220 221 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 222 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 223 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 224 225 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 226 tls.my_last_observer = nullptr; 227 228 tls.leave_task_dispatcher(); 229 230 // Arena slot detach (arena may be used in market::process) 231 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 232 tls.my_arena_slot->release(); 233 tls.my_arena_slot = nullptr; 234 tls.my_inbox.detach(); 235 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 236 __TBB_ASSERT(is_alive(my_guard), nullptr); 237 238 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 239 // that arena may be temporarily left unpopulated by threads. See comments in 240 // arena::on_thread_leaving() for more details. 241 on_thread_leaving(ref_worker); 242 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 243 } 244 245 arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) { 246 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 247 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 248 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 249 my_threading_control = control; 250 my_limit = 1; 251 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 252 my_num_slots = num_arena_slots(num_slots, num_reserved_slots); 253 my_num_reserved_slots = num_reserved_slots; 254 my_max_num_workers = num_slots-num_reserved_slots; 255 my_priority_level = priority_level; 256 my_references = ref_external; // accounts for the external thread 257 my_observers.my_arena = this; 258 my_co_cache.init(4 * num_slots); 259 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr); 260 // Initialize the default context. It should be allocated before task_dispatch construction. 261 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 262 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 263 // Construct slots. Mark internal synchronization elements for the tools. 264 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 265 for( unsigned i = 0; i < my_num_slots; ++i ) { 266 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr); 267 __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr); 268 __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr); 269 mailbox(i).construct(); 270 my_slots[i].init_task_streams(i); 271 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 272 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 273 } 274 my_fifo_task_stream.initialize(my_num_slots); 275 my_resume_task_stream.initialize(my_num_slots); 276 #if __TBB_PREVIEW_CRITICAL_TASKS 277 my_critical_task_stream.initialize(my_num_slots); 278 #endif 279 my_mandatory_requests = 0; 280 } 281 282 arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, 283 unsigned priority_level) 284 { 285 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 286 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 287 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 288 std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots)); 289 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 290 // Zero all slots to indicate that they are empty 291 std::memset( storage, 0, n ); 292 293 return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) ) 294 arena(control, num_slots, num_reserved_slots, priority_level); 295 } 296 297 void arena::free_arena () { 298 __TBB_ASSERT( is_alive(my_guard), nullptr); 299 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 300 __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 301 __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" ); 302 #if __TBB_ARENA_BINDING 303 if (my_numa_binding_observer != nullptr) { 304 destroy_binding_observer(my_numa_binding_observer); 305 my_numa_binding_observer = nullptr; 306 } 307 #endif /*__TBB_ARENA_BINDING*/ 308 poison_value( my_guard ); 309 for ( unsigned i = 0; i < my_num_slots; ++i ) { 310 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 311 // TODO: understand the assertion and modify 312 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr); 313 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty 314 my_slots[i].free_task_pool(); 315 mailbox(i).drain(); 316 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 317 } 318 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 319 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 320 // Cleanup coroutines/schedulers cache 321 my_co_cache.cleanup(); 322 my_default_ctx->~task_group_context(); 323 cache_aligned_deallocate(my_default_ctx); 324 #if __TBB_PREVIEW_CRITICAL_TASKS 325 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 326 #endif 327 // Clear enfources synchronization with observe(false) 328 my_observers.clear(); 329 330 void* storage = &mailbox(my_num_slots-1); 331 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr); 332 this->~arena(); 333 #if TBB_USE_ASSERT > 1 334 std::memset( storage, 0, allocation_size(my_num_slots) ); 335 #endif /* TBB_USE_ASSERT */ 336 cache_aligned_deallocate( storage ); 337 } 338 339 bool arena::has_enqueued_tasks() { 340 return !my_fifo_task_stream.empty(); 341 } 342 343 void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) { 344 my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta); 345 346 if (wakeup_threads) { 347 // Notify all sleeping threads that work has appeared in the arena. 348 get_waiting_threads_monitor().notify([&] (market_context context) { 349 return this == context.my_arena_addr; 350 }); 351 } 352 } 353 354 bool arena::has_tasks() { 355 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 356 std::size_t n = my_limit.load(std::memory_order_acquire); 357 bool tasks_are_available = false; 358 for (std::size_t k = 0; k < n && !tasks_are_available; ++k) { 359 tasks_are_available = !my_slots[k].is_empty(); 360 } 361 tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty(); 362 #if __TBB_PREVIEW_CRITICAL_TASKS 363 tasks_are_available = tasks_are_available || !my_critical_task_stream.empty(); 364 #endif 365 return tasks_are_available; 366 } 367 368 void arena::out_of_work() { 369 // We should try unset my_pool_state first due to keep arena invariants in consistent state 370 // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant 371 bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); }); 372 bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); }); 373 374 if (disable_mandatory || release_workers) { 375 int mandatory_delta = disable_mandatory ? -1 : 0; 376 int workers_delta = release_workers ? -(int)my_max_num_workers : 0; 377 378 if (disable_mandatory && is_arena_workerless()) { 379 // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now 380 workers_delta = -1; 381 } 382 request_workers(mandatory_delta, workers_delta); 383 } 384 } 385 386 void arena::set_top_priority(bool is_top_priority) { 387 my_is_top_priority.store(is_top_priority, std::memory_order_relaxed); 388 } 389 390 bool arena::is_top_priority() const { 391 return my_is_top_priority.load(std::memory_order_relaxed); 392 } 393 394 bool arena::try_join() { 395 if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) { 396 my_references += arena::ref_worker; 397 return true; 398 } 399 return false; 400 } 401 402 void arena::set_allotment(unsigned allotment) { 403 if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) { 404 my_num_workers_allotted.store(allotment, std::memory_order_relaxed); 405 } 406 } 407 408 std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) { 409 __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr); 410 411 int min_workers_request = 0; 412 int max_workers_request = 0; 413 414 // Calculate min request 415 my_mandatory_requests += mandatory_delta; 416 min_workers_request = my_mandatory_requests > 0 ? 1 : 0; 417 418 // Calculate max request 419 my_total_num_workers_requested += workers_delta; 420 // Clamp worker request into interval [0, my_max_num_workers] 421 max_workers_request = clamp(my_total_num_workers_requested, 0, 422 min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers); 423 424 return { min_workers_request, max_workers_request }; 425 } 426 427 thread_control_monitor& arena::get_waiting_threads_monitor() { 428 return my_threading_control->get_waiting_threads_monitor(); 429 } 430 431 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 432 task_group_context_impl::bind_to(ctx, &td); 433 task_accessor::context(t) = &ctx; 434 task_accessor::isolation(t) = no_isolation; 435 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 436 advertise_new_work<work_enqueued>(); 437 } 438 439 arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level) 440 { 441 __TBB_ASSERT(num_slots > 0, NULL); 442 __TBB_ASSERT(num_reserved_slots <= num_slots, NULL); 443 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). 444 arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level); 445 a.my_tc_client = control->create_client(a); 446 // We should not publish arena until all fields are initialized 447 control->publish_client(a.my_tc_client); 448 return a; 449 } 450 451 } // namespace r1 452 } // namespace detail 453 } // namespace tbb 454 455 // Enable task_arena.h 456 #include "oneapi/tbb/task_arena.h" // task_arena_base 457 458 namespace tbb { 459 namespace detail { 460 namespace r1 { 461 462 #if TBB_USE_ASSERT 463 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 464 bool is_arena_priority_correct = 465 a_priority == tbb::task_arena::priority::high || 466 a_priority == tbb::task_arena::priority::normal || 467 a_priority == tbb::task_arena::priority::low; 468 __TBB_ASSERT( is_arena_priority_correct, 469 "Task arena priority should be equal to one of the predefined values." ); 470 } 471 #else 472 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 473 #endif 474 475 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 476 assert_arena_priority_valid( a_priority ); 477 return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 478 } 479 480 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 481 auto priority = tbb::task_arena::priority( 482 (d1::num_priority_levels - priority_level) * d1::priority_stride 483 ); 484 assert_arena_priority_valid( priority ); 485 return priority; 486 } 487 488 struct task_arena_impl { 489 static void initialize(d1::task_arena_base&); 490 static void terminate(d1::task_arena_base&); 491 static bool attach(d1::task_arena_base&); 492 static void execute(d1::task_arena_base&, d1::delegate_base&); 493 static void wait(d1::task_arena_base&); 494 static int max_concurrency(const d1::task_arena_base*); 495 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); 496 }; 497 498 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 499 task_arena_impl::initialize(ta); 500 } 501 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 502 task_arena_impl::terminate(ta); 503 } 504 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 505 return task_arena_impl::attach(ta); 506 } 507 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 508 task_arena_impl::execute(ta, d); 509 } 510 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 511 task_arena_impl::wait(ta); 512 } 513 514 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 515 return task_arena_impl::max_concurrency(ta); 516 } 517 518 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 519 task_arena_impl::enqueue(t, nullptr, ta); 520 } 521 522 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) { 523 task_arena_impl::enqueue(t, &ctx, ta); 524 } 525 526 void task_arena_impl::initialize(d1::task_arena_base& ta) { 527 // Enforce global market initialization to properly initialize soft limit 528 (void)governor::get_thread_data(); 529 if (ta.my_max_concurrency < 1) { 530 #if __TBB_ARENA_BINDING 531 d1::constraints arena_constraints = d1::constraints{} 532 .set_core_type(ta.core_type()) 533 .set_max_threads_per_core(ta.max_threads_per_core()) 534 .set_numa_id(ta.my_numa_id); 535 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 536 #else /*!__TBB_ARENA_BINDING*/ 537 ta.my_max_concurrency = (int)governor::default_num_threads(); 538 #endif /*!__TBB_ARENA_BINDING*/ 539 } 540 541 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 542 unsigned priority_level = arena_priority_level(ta.my_priority); 543 threading_control* thr_control = threading_control::register_public_reference(); 544 arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level); 545 ta.my_arena.store(&a, std::memory_order_release); 546 #if __TBB_CPUBIND_PRESENT 547 a.my_numa_binding_observer = construct_binding_observer( 548 static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 549 #endif /*__TBB_CPUBIND_PRESENT*/ 550 } 551 552 void task_arena_impl::terminate(d1::task_arena_base& ta) { 553 arena* a = ta.my_arena.load(std::memory_order_relaxed); 554 assert_pointer_valid(a); 555 threading_control::unregister_public_reference(/*blocking_terminate=*/false); 556 a->on_thread_leaving(arena::ref_external); 557 ta.my_arena.store(nullptr, std::memory_order_relaxed); 558 } 559 560 bool task_arena_impl::attach(d1::task_arena_base& ta) { 561 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 562 thread_data* td = governor::get_thread_data_if_initialized(); 563 if( td && td->my_arena ) { 564 arena* a = td->my_arena; 565 // There is an active arena to attach to. 566 // It's still used by s, so won't be destroyed right away. 567 __TBB_ASSERT(a->my_references > 0, nullptr); 568 a->my_references += arena::ref_external; 569 ta.my_num_reserved_slots = a->my_num_reserved_slots; 570 ta.my_priority = arena_priority(a->my_priority_level); 571 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 572 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr); 573 ta.my_arena.store(a, std::memory_order_release); 574 // increases threading_control's ref count for task_arena 575 threading_control::register_public_reference(); 576 return true; 577 } 578 return false; 579 } 580 581 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) { 582 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 583 assert_pointer_valid(td, "thread_data pointer should not be null"); 584 arena* a = ta ? 585 ta->my_arena.load(std::memory_order_relaxed) 586 : td->my_arena 587 ; 588 assert_pointer_valid(a, "arena pointer should not be null"); 589 auto* ctx = c ? c : a->my_default_ctx; 590 assert_pointer_valid(ctx, "context pointer should not be null"); 591 // Is there a better place for checking the state of ctx? 592 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 593 "The task will not be executed because its task_group_context is cancelled."); 594 a->enqueue_task(t, *ctx, *td); 595 } 596 597 class nested_arena_context : no_copy { 598 public: 599 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 600 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 601 { 602 if (td.my_arena != &nested_arena) { 603 m_orig_arena = td.my_arena; 604 m_orig_slot_index = td.my_arena_index; 605 m_orig_last_observer = td.my_last_observer; 606 607 td.detach_task_dispatcher(); 608 td.attach_arena(nested_arena, slot_index); 609 if (td.my_inbox.is_idle_state(true)) 610 td.my_inbox.set_is_idle(false); 611 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 612 td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold); 613 614 // If the calling thread occupies the slots out of external thread reserve we need to notify the 615 // market that this arena requires one worker less. 616 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 617 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1); 618 } 619 620 td.my_last_observer = nullptr; 621 // The task_arena::execute method considers each calling thread as an external thread. 622 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 623 } 624 625 m_task_dispatcher = td.my_task_dispatcher; 626 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 627 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 628 m_task_dispatcher->m_properties.critical_task_allowed = true; 629 630 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 631 ed_ext.context = td.my_arena->my_default_ctx; 632 ed_ext.original_slot = td.my_arena_index; 633 ed_ext.affinity_slot = d1::no_slot; 634 ed_ext.task_disp = td.my_task_dispatcher; 635 ed_ext.isolation = no_isolation; 636 637 __TBB_ASSERT(td.my_arena_slot, nullptr); 638 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 639 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 640 } 641 ~nested_arena_context() { 642 thread_data& td = *m_task_dispatcher->m_thread_data; 643 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 644 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 645 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 646 if (m_orig_arena) { 647 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 648 td.my_last_observer = m_orig_last_observer; 649 650 // Notify the market that this thread releasing a one slot 651 // that can be used by a worker thread. 652 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 653 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1); 654 } 655 656 td.leave_task_dispatcher(); 657 td.my_arena_slot->release(); 658 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 659 660 td.attach_arena(*m_orig_arena, m_orig_slot_index); 661 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 662 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 663 } 664 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 665 } 666 667 private: 668 execution_data_ext m_orig_execute_data_ext{}; 669 arena* m_orig_arena{ nullptr }; 670 observer_proxy* m_orig_last_observer{ nullptr }; 671 task_dispatcher* m_task_dispatcher{ nullptr }; 672 unsigned m_orig_slot_index{}; 673 bool m_orig_fifo_tasks_allowed{}; 674 bool m_orig_critical_task_allowed{}; 675 }; 676 677 class delegated_task : public d1::task { 678 d1::delegate_base& m_delegate; 679 concurrent_monitor& m_monitor; 680 d1::wait_context& m_wait_ctx; 681 std::atomic<bool> m_completed; 682 d1::task* execute(d1::execution_data& ed) override { 683 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 684 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 685 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 686 "The execute data shall point to the current task dispatcher execute data"); 687 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 688 689 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 690 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 691 try_call([&] { 692 m_delegate(); 693 }).on_completion([&] { 694 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 695 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 696 }); 697 698 finalize(); 699 return nullptr; 700 } 701 d1::task* cancel(d1::execution_data&) override { 702 finalize(); 703 return nullptr; 704 } 705 void finalize() { 706 m_wait_ctx.release(); // must precede the wakeup 707 m_monitor.notify([this] (std::uintptr_t ctx) { 708 return ctx == std::uintptr_t(&m_delegate); 709 }); // do not relax, it needs a fence! 710 m_completed.store(true, std::memory_order_release); 711 } 712 public: 713 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 714 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 715 ~delegated_task() override { 716 // The destructor can be called earlier than the m_monitor is notified 717 // because the waiting thread can be released after m_wait_ctx.release_wait. 718 // To close that race we wait for the m_completed signal. 719 spin_wait_until_eq(m_completed, true); 720 } 721 }; 722 723 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 724 arena* a = ta.my_arena.load(std::memory_order_relaxed); 725 __TBB_ASSERT(a != nullptr, nullptr); 726 thread_data* td = governor::get_thread_data(); 727 728 bool same_arena = td->my_arena == a; 729 std::size_t index1 = td->my_arena_index; 730 if (!same_arena) { 731 index1 = a->occupy_free_slot</*as_worker */false>(*td); 732 if (index1 == arena::out_of_arena) { 733 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 734 d1::wait_context wo(1); 735 d1::task_group_context exec_context(d1::task_group_context::isolated); 736 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 737 738 delegated_task dt(d, a->my_exit_monitors, wo); 739 a->enqueue_task( dt, exec_context, *td); 740 size_t index2 = arena::out_of_arena; 741 do { 742 a->my_exit_monitors.prepare_wait(waiter); 743 if (!wo.continue_execution()) { 744 a->my_exit_monitors.cancel_wait(waiter); 745 break; 746 } 747 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 748 if (index2 != arena::out_of_arena) { 749 a->my_exit_monitors.cancel_wait(waiter); 750 nested_arena_context scope(*td, *a, index2 ); 751 r1::wait(wo, exec_context); 752 __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred 753 break; 754 } 755 a->my_exit_monitors.commit_wait(waiter); 756 } while (wo.continue_execution()); 757 if (index2 == arena::out_of_arena) { 758 // notify a waiting thread even if this thread did not enter arena, 759 // in case it was woken by a leaving thread but did not need to enter 760 a->my_exit_monitors.notify_one(); // do not relax! 761 } 762 // process possible exception 763 auto exception = exec_context.my_exception.load(std::memory_order_acquire); 764 if (exception) { 765 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 766 exception->throw_self(); 767 } 768 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 769 return; 770 } // if (index1 == arena::out_of_arena) 771 } // if (!same_arena) 772 773 context_guard_helper</*report_tasks=*/false> context_guard; 774 context_guard.set_ctx(a->my_default_ctx); 775 nested_arena_context scope(*td, *a, index1); 776 #if _WIN64 777 try { 778 #endif 779 d(); 780 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 781 #if _WIN64 782 } catch (...) { 783 context_guard.restore_default(); 784 throw; 785 } 786 #endif 787 } 788 789 void task_arena_impl::wait(d1::task_arena_base& ta) { 790 arena* a = ta.my_arena.load(std::memory_order_relaxed); 791 __TBB_ASSERT(a != nullptr, nullptr); 792 thread_data* td = governor::get_thread_data(); 793 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 794 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 795 if (a->my_max_num_workers != 0) { 796 while (a->num_workers_active() || !a->is_empty()) { 797 yield(); 798 } 799 } 800 } 801 802 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 803 arena* a = nullptr; 804 if( ta ) // for special cases of ta->max_concurrency() 805 a = ta->my_arena.load(std::memory_order_relaxed); 806 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 807 a = td->my_arena; // the current arena if any 808 809 if( a ) { // Get parameters from the arena 810 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr); 811 int mandatory_worker = 0; 812 if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) { 813 mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0; 814 } 815 return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker; 816 } 817 818 if (ta && ta->my_max_concurrency == 1) { 819 return 1; 820 } 821 822 #if __TBB_ARENA_BINDING 823 if (ta) { 824 d1::constraints arena_constraints = d1::constraints{} 825 .set_numa_id(ta->my_numa_id) 826 .set_core_type(ta->core_type()) 827 .set_max_threads_per_core(ta->max_threads_per_core()); 828 return (int)default_concurrency(arena_constraints); 829 } 830 #endif /*!__TBB_ARENA_BINDING*/ 831 832 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr); 833 return int(governor::default_num_threads()); 834 } 835 836 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 837 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 838 thread_data* tls = governor::get_thread_data(); 839 assert_pointers_valid(tls, tls->my_task_dispatcher); 840 task_dispatcher* dispatcher = tls->my_task_dispatcher; 841 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 842 try_call([&] { 843 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 844 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 845 // Save the current isolation value and set new one 846 previous_isolation = dispatcher->set_isolation(current_isolation); 847 // Isolation within this callable 848 d(); 849 }).on_completion([&] { 850 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr); 851 dispatcher->set_isolation(previous_isolation); 852 }); 853 } 854 855 } // namespace r1 856 } // namespace detail 857 } // namespace tbb 858