1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "threading_control.h" 20 #include "arena.h" 21 #include "itt_notify.h" 22 #include "semaphore.h" 23 #include "waiters.h" 24 #include "oneapi/tbb/detail/_task.h" 25 #include "oneapi/tbb/info.h" 26 #include "oneapi/tbb/tbb_allocator.h" 27 28 #include <atomic> 29 #include <cstring> 30 #include <functional> 31 32 namespace tbb { 33 namespace detail { 34 namespace r1 { 35 36 #if __TBB_ARENA_BINDING 37 class numa_binding_observer : public tbb::task_scheduler_observer { 38 binding_handler* my_binding_handler; 39 public: 40 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 41 : task_scheduler_observer(*ta) 42 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 43 {} 44 45 void on_scheduler_entry( bool ) override { 46 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 47 } 48 49 void on_scheduler_exit( bool ) override { 50 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 51 } 52 53 ~numa_binding_observer() override{ 54 destroy_binding_handler(my_binding_handler); 55 } 56 }; 57 58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 59 numa_binding_observer* binding_observer = nullptr; 60 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 61 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 62 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 63 binding_observer->observe(true); 64 } 65 return binding_observer; 66 } 67 68 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 69 __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer"); 70 binding_observer->observe(false); 71 binding_observer->~numa_binding_observer(); 72 deallocate_memory(binding_observer); 73 } 74 #endif /*!__TBB_ARENA_BINDING*/ 75 76 void arena::on_thread_leaving(unsigned ref_param) { 77 // 78 // Implementation of arena destruction synchronization logic contained various 79 // bugs/flaws at the different stages of its evolution, so below is a detailed 80 // description of the issues taken into consideration in the framework of the 81 // current design. 82 // 83 // In case of using fire-and-forget tasks (scheduled via task::enqueue()) 84 // external thread is allowed to leave its arena before all its work is executed, 85 // and market may temporarily revoke all workers from this arena. Since revoked 86 // workers never attempt to reset arena state to EMPTY and cancel its request 87 // to RML for threads, the arena object is destroyed only when both the last 88 // thread is leaving it and arena's state is EMPTY (that is its external thread 89 // left and it does not contain any work). 90 // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not 91 // be done here (or anywhere else in the external thread to that matter); doing so 92 // can result either in arena's premature destruction (at least without 93 // additional costly checks in workers) or in unnecessary arena state changes 94 // (and ensuing workers migration). 95 // 96 // A worker that checks for work presence and transitions arena to the EMPTY 97 // state (in snapshot taking procedure arena::out_of_work()) updates 98 // arena::my_pool_state first and only then arena::my_num_workers_requested. 99 // So the check for work absence must be done against the latter field. 100 // 101 // In a time window between decrementing the active threads count and checking 102 // if there is an outstanding request for workers. New worker thread may arrive, 103 // finish remaining work, set arena state to empty, and leave decrementing its 104 // refcount and destroying. Then the current thread will destroy the arena 105 // the second time. To preclude it a local copy of the outstanding request 106 // value can be stored before decrementing active threads count. 107 // 108 // But this technique may cause two other problem. When the stored request is 109 // zero, it is possible that arena still has threads and they can generate new 110 // tasks and thus re-establish non-zero requests. Then all the threads can be 111 // revoked (as described above) leaving this thread the last one, and causing 112 // it to destroy non-empty arena. 113 // 114 // The other problem takes place when the stored request is non-zero. Another 115 // thread may complete the work, set arena state to empty, and leave without 116 // arena destruction before this thread decrements the refcount. This thread 117 // cannot destroy the arena either. Thus the arena may be "orphaned". 118 // 119 // In both cases we cannot dereference arena pointer after the refcount is 120 // decremented, as our arena may already be destroyed. 121 // 122 // If this is the external thread, the market is protected by refcount to it. 123 // In case of workers market's liveness is ensured by the RML connection 124 // rundown protocol, according to which the client (i.e. the market) lives 125 // until RML server notifies it about connection termination, and this 126 // notification is fired only after all workers return into RML. 127 // 128 // Thus if we decremented refcount to zero we ask the market to check arena 129 // state (including the fact if it is alive) under the lock. 130 // 131 132 __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter"); 133 134 // When there is no workers someone must free arena, as 135 // without workers, no one calls out_of_work(). 136 if (ref_param == ref_external && !my_mandatory_concurrency.test()) { 137 out_of_work(); 138 } 139 140 threading_control* tc = my_threading_control; 141 auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client); 142 // Release our reference to sync with destroy_client 143 unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param; 144 // do not access `this` it might be destroyed already 145 if (remaining_ref == 0) { 146 if (tc->try_destroy_client(tc_client_snapshot)) { 147 // We are requested to destroy ourself 148 free_arena(); 149 } 150 } 151 } 152 153 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 154 if ( lower >= upper ) return out_of_arena; 155 // Start search for an empty slot from the one we occupied the last time 156 std::size_t index = tls.my_arena_index; 157 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 158 __TBB_ASSERT( index >= lower && index < upper, nullptr); 159 // Find a free slot 160 for ( std::size_t i = index; i < upper; ++i ) 161 if (my_slots[i].try_occupy()) return i; 162 for ( std::size_t i = lower; i < index; ++i ) 163 if (my_slots[i].try_occupy()) return i; 164 return out_of_arena; 165 } 166 167 template <bool as_worker> 168 std::size_t arena::occupy_free_slot(thread_data& tls) { 169 // Firstly, external threads try to occupy reserved slots 170 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 171 if ( index == out_of_arena ) { 172 // Secondly, all threads try to occupy all non-reserved slots 173 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 174 // Likely this arena is already saturated 175 if ( index == out_of_arena ) 176 return out_of_arena; 177 } 178 179 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 180 return index; 181 } 182 183 std::uintptr_t arena::calculate_stealing_threshold() { 184 stack_anchor_type anchor; 185 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size()); 186 } 187 188 void arena::process(thread_data& tls) { 189 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 190 __TBB_ASSERT( is_alive(my_guard), nullptr); 191 __TBB_ASSERT( my_num_slots >= 1, nullptr); 192 193 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 194 if (index == out_of_arena) { 195 on_thread_leaving(ref_worker); 196 return; 197 } 198 199 my_tc_client.get_pm_client()->register_thread(); 200 201 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 202 tls.attach_arena(*this, index); 203 // worker thread enters the dispatch loop to look for a work 204 tls.my_inbox.set_is_idle(true); 205 if (tls.my_arena_slot->is_task_pool_published()) { 206 tls.my_inbox.set_is_idle(false); 207 } 208 209 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 210 tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold()); 211 __TBB_ASSERT(task_disp.can_steal(), nullptr); 212 213 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 214 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 215 216 // Waiting on special object tied to this arena 217 outermost_worker_waiter waiter(*this); 218 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 219 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is 220 // attached to it. 221 tls.my_inbox.set_is_idle(true); 222 223 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 224 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 225 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 226 227 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 228 tls.my_last_observer = nullptr; 229 230 tls.leave_task_dispatcher(); 231 232 // Arena slot detach (arena may be used in market::process) 233 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 234 tls.my_arena_slot->release(); 235 tls.my_arena_slot = nullptr; 236 tls.my_inbox.detach(); 237 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 238 __TBB_ASSERT(is_alive(my_guard), nullptr); 239 240 my_tc_client.get_pm_client()->unregister_thread(); 241 242 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 243 // that arena may be temporarily left unpopulated by threads. See comments in 244 // arena::on_thread_leaving() for more details. 245 on_thread_leaving(ref_worker); 246 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 247 } 248 249 arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) { 250 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 251 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 252 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 253 my_threading_control = control; 254 my_limit = 1; 255 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 256 my_num_slots = num_arena_slots(num_slots, num_reserved_slots); 257 my_num_reserved_slots = num_reserved_slots; 258 my_max_num_workers = num_slots-num_reserved_slots; 259 my_priority_level = priority_level; 260 my_references = ref_external; // accounts for the external thread 261 my_observers.my_arena = this; 262 my_co_cache.init(4 * num_slots); 263 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr); 264 // Initialize the default context. It should be allocated before task_dispatch construction. 265 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 266 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 267 // Construct slots. Mark internal synchronization elements for the tools. 268 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 269 for( unsigned i = 0; i < my_num_slots; ++i ) { 270 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr); 271 __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr); 272 __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr); 273 mailbox(i).construct(); 274 my_slots[i].init_task_streams(i); 275 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 276 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 277 } 278 my_fifo_task_stream.initialize(my_num_slots); 279 my_resume_task_stream.initialize(my_num_slots); 280 #if __TBB_PREVIEW_CRITICAL_TASKS 281 my_critical_task_stream.initialize(my_num_slots); 282 #endif 283 my_mandatory_requests = 0; 284 } 285 286 arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, 287 unsigned priority_level) 288 { 289 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 290 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 291 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 292 std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots)); 293 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 294 // Zero all slots to indicate that they are empty 295 std::memset( storage, 0, n ); 296 297 return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) ) 298 arena(control, num_slots, num_reserved_slots, priority_level); 299 } 300 301 void arena::free_arena () { 302 __TBB_ASSERT( is_alive(my_guard), nullptr); 303 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 304 __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 305 __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" ); 306 #if __TBB_ARENA_BINDING 307 if (my_numa_binding_observer != nullptr) { 308 destroy_binding_observer(my_numa_binding_observer); 309 my_numa_binding_observer = nullptr; 310 } 311 #endif /*__TBB_ARENA_BINDING*/ 312 poison_value( my_guard ); 313 for ( unsigned i = 0; i < my_num_slots; ++i ) { 314 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 315 // TODO: understand the assertion and modify 316 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr); 317 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty 318 my_slots[i].free_task_pool(); 319 mailbox(i).drain(); 320 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 321 } 322 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 323 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 324 // Cleanup coroutines/schedulers cache 325 my_co_cache.cleanup(); 326 my_default_ctx->~task_group_context(); 327 cache_aligned_deallocate(my_default_ctx); 328 #if __TBB_PREVIEW_CRITICAL_TASKS 329 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 330 #endif 331 // Clear enfources synchronization with observe(false) 332 my_observers.clear(); 333 334 void* storage = &mailbox(my_num_slots-1); 335 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr); 336 this->~arena(); 337 #if TBB_USE_ASSERT > 1 338 std::memset( storage, 0, allocation_size(my_num_slots) ); 339 #endif /* TBB_USE_ASSERT */ 340 cache_aligned_deallocate( storage ); 341 } 342 343 bool arena::has_enqueued_tasks() { 344 return !my_fifo_task_stream.empty(); 345 } 346 347 void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) { 348 my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta); 349 350 if (wakeup_threads) { 351 // Notify all sleeping threads that work has appeared in the arena. 352 get_waiting_threads_monitor().notify([&] (market_context context) { 353 return this == context.my_arena_addr; 354 }); 355 } 356 } 357 358 bool arena::has_tasks() { 359 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 360 std::size_t n = my_limit.load(std::memory_order_acquire); 361 bool tasks_are_available = false; 362 for (std::size_t k = 0; k < n && !tasks_are_available; ++k) { 363 tasks_are_available = !my_slots[k].is_empty(); 364 } 365 tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty(); 366 #if __TBB_PREVIEW_CRITICAL_TASKS 367 tasks_are_available = tasks_are_available || !my_critical_task_stream.empty(); 368 #endif 369 return tasks_are_available; 370 } 371 372 void arena::out_of_work() { 373 // We should try unset my_pool_state first due to keep arena invariants in consistent state 374 // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant 375 bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); }); 376 bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); }); 377 378 if (disable_mandatory || release_workers) { 379 int mandatory_delta = disable_mandatory ? -1 : 0; 380 int workers_delta = release_workers ? -(int)my_max_num_workers : 0; 381 382 if (disable_mandatory && is_arena_workerless()) { 383 // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now 384 workers_delta = -1; 385 } 386 request_workers(mandatory_delta, workers_delta); 387 } 388 } 389 390 void arena::set_top_priority(bool is_top_priority) { 391 my_is_top_priority.store(is_top_priority, std::memory_order_relaxed); 392 } 393 394 bool arena::is_top_priority() const { 395 return my_is_top_priority.load(std::memory_order_relaxed); 396 } 397 398 bool arena::try_join() { 399 if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) { 400 my_references += arena::ref_worker; 401 return true; 402 } 403 return false; 404 } 405 406 void arena::set_allotment(unsigned allotment) { 407 if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) { 408 my_num_workers_allotted.store(allotment, std::memory_order_relaxed); 409 } 410 } 411 412 int arena::update_concurrency(unsigned allotment) { 413 int delta = allotment - my_num_workers_allotted.load(std::memory_order_relaxed); 414 if (delta != 0) { 415 my_num_workers_allotted.store(allotment, std::memory_order_relaxed); 416 } 417 return delta; 418 } 419 420 std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) { 421 __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr); 422 423 int min_workers_request = 0; 424 int max_workers_request = 0; 425 426 // Calculate min request 427 my_mandatory_requests += mandatory_delta; 428 min_workers_request = my_mandatory_requests > 0 ? 1 : 0; 429 430 // Calculate max request 431 my_total_num_workers_requested += workers_delta; 432 // Clamp worker request into interval [0, my_max_num_workers] 433 max_workers_request = clamp(my_total_num_workers_requested, 0, 434 min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers); 435 436 return { min_workers_request, max_workers_request }; 437 } 438 439 thread_control_monitor& arena::get_waiting_threads_monitor() { 440 return my_threading_control->get_waiting_threads_monitor(); 441 } 442 443 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 444 task_group_context_impl::bind_to(ctx, &td); 445 task_accessor::context(t) = &ctx; 446 task_accessor::isolation(t) = no_isolation; 447 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 448 advertise_new_work<work_enqueued>(); 449 } 450 451 arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) { 452 __TBB_ASSERT(num_slots > 0, NULL); 453 __TBB_ASSERT(num_reserved_slots <= num_slots, NULL); 454 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange). 455 arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level); 456 a.my_tc_client = control->create_client(a); 457 // We should not publish arena until all fields are initialized 458 control->publish_client(a.my_tc_client, constraints); 459 return a; 460 } 461 462 } // namespace r1 463 } // namespace detail 464 } // namespace tbb 465 466 // Enable task_arena.h 467 #include "oneapi/tbb/task_arena.h" // task_arena_base 468 469 namespace tbb { 470 namespace detail { 471 namespace r1 { 472 473 #if TBB_USE_ASSERT 474 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 475 bool is_arena_priority_correct = 476 a_priority == tbb::task_arena::priority::high || 477 a_priority == tbb::task_arena::priority::normal || 478 a_priority == tbb::task_arena::priority::low; 479 __TBB_ASSERT( is_arena_priority_correct, 480 "Task arena priority should be equal to one of the predefined values." ); 481 } 482 #else 483 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 484 #endif 485 486 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 487 assert_arena_priority_valid( a_priority ); 488 return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 489 } 490 491 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 492 auto priority = tbb::task_arena::priority( 493 (d1::num_priority_levels - priority_level) * d1::priority_stride 494 ); 495 assert_arena_priority_valid( priority ); 496 return priority; 497 } 498 499 struct task_arena_impl { 500 static void initialize(d1::task_arena_base&); 501 static void terminate(d1::task_arena_base&); 502 static bool attach(d1::task_arena_base&); 503 static void execute(d1::task_arena_base&, d1::delegate_base&); 504 static void wait(d1::task_arena_base&); 505 static int max_concurrency(const d1::task_arena_base*); 506 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); 507 }; 508 509 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 510 task_arena_impl::initialize(ta); 511 } 512 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 513 task_arena_impl::terminate(ta); 514 } 515 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 516 return task_arena_impl::attach(ta); 517 } 518 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 519 task_arena_impl::execute(ta, d); 520 } 521 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 522 task_arena_impl::wait(ta); 523 } 524 525 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 526 return task_arena_impl::max_concurrency(ta); 527 } 528 529 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 530 task_arena_impl::enqueue(t, nullptr, ta); 531 } 532 533 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) { 534 task_arena_impl::enqueue(t, &ctx, ta); 535 } 536 537 void task_arena_impl::initialize(d1::task_arena_base& ta) { 538 // Enforce global market initialization to properly initialize soft limit 539 (void)governor::get_thread_data(); 540 d1::constraints arena_constraints; 541 542 #if __TBB_ARENA_BINDING 543 arena_constraints = d1::constraints{} 544 .set_core_type(ta.core_type()) 545 .set_max_threads_per_core(ta.max_threads_per_core()) 546 .set_numa_id(ta.my_numa_id); 547 #endif /*__TBB_ARENA_BINDING*/ 548 549 if (ta.my_max_concurrency < 1) { 550 #if __TBB_ARENA_BINDING 551 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 552 #else /*!__TBB_ARENA_BINDING*/ 553 ta.my_max_concurrency = (int)governor::default_num_threads(); 554 #endif /*!__TBB_ARENA_BINDING*/ 555 } 556 557 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 558 unsigned priority_level = arena_priority_level(ta.my_priority); 559 threading_control* thr_control = threading_control::register_public_reference(); 560 arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints); 561 562 ta.my_arena.store(&a, std::memory_order_release); 563 #if __TBB_CPUBIND_PRESENT 564 a.my_numa_binding_observer = construct_binding_observer( 565 static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 566 #endif /*__TBB_CPUBIND_PRESENT*/ 567 } 568 569 void task_arena_impl::terminate(d1::task_arena_base& ta) { 570 arena* a = ta.my_arena.load(std::memory_order_relaxed); 571 assert_pointer_valid(a); 572 threading_control::unregister_public_reference(/*blocking_terminate=*/false); 573 a->on_thread_leaving(arena::ref_external); 574 ta.my_arena.store(nullptr, std::memory_order_relaxed); 575 } 576 577 bool task_arena_impl::attach(d1::task_arena_base& ta) { 578 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 579 thread_data* td = governor::get_thread_data_if_initialized(); 580 if( td && td->my_arena ) { 581 arena* a = td->my_arena; 582 // There is an active arena to attach to. 583 // It's still used by s, so won't be destroyed right away. 584 __TBB_ASSERT(a->my_references > 0, nullptr); 585 a->my_references += arena::ref_external; 586 ta.my_num_reserved_slots = a->my_num_reserved_slots; 587 ta.my_priority = arena_priority(a->my_priority_level); 588 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 589 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr); 590 ta.my_arena.store(a, std::memory_order_release); 591 // increases threading_control's ref count for task_arena 592 threading_control::register_public_reference(); 593 return true; 594 } 595 return false; 596 } 597 598 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) { 599 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 600 assert_pointer_valid(td, "thread_data pointer should not be null"); 601 arena* a = ta ? 602 ta->my_arena.load(std::memory_order_relaxed) 603 : td->my_arena 604 ; 605 assert_pointer_valid(a, "arena pointer should not be null"); 606 auto* ctx = c ? c : a->my_default_ctx; 607 assert_pointer_valid(ctx, "context pointer should not be null"); 608 // Is there a better place for checking the state of ctx? 609 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 610 "The task will not be executed because its task_group_context is cancelled."); 611 a->enqueue_task(t, *ctx, *td); 612 } 613 614 class nested_arena_context : no_copy { 615 public: 616 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 617 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 618 { 619 if (td.my_arena != &nested_arena) { 620 m_orig_arena = td.my_arena; 621 m_orig_slot_index = td.my_arena_index; 622 m_orig_last_observer = td.my_last_observer; 623 624 td.detach_task_dispatcher(); 625 td.attach_arena(nested_arena, slot_index); 626 if (td.my_inbox.is_idle_state(true)) 627 td.my_inbox.set_is_idle(false); 628 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 629 td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold); 630 631 // If the calling thread occupies the slots out of external thread reserve we need to notify the 632 // market that this arena requires one worker less. 633 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 634 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1); 635 } 636 637 td.my_last_observer = nullptr; 638 // The task_arena::execute method considers each calling thread as an external thread. 639 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 640 } 641 642 m_task_dispatcher = td.my_task_dispatcher; 643 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 644 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 645 m_task_dispatcher->m_properties.critical_task_allowed = true; 646 647 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 648 ed_ext.context = td.my_arena->my_default_ctx; 649 ed_ext.original_slot = td.my_arena_index; 650 ed_ext.affinity_slot = d1::no_slot; 651 ed_ext.task_disp = td.my_task_dispatcher; 652 ed_ext.isolation = no_isolation; 653 654 __TBB_ASSERT(td.my_arena_slot, nullptr); 655 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 656 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 657 } 658 ~nested_arena_context() { 659 thread_data& td = *m_task_dispatcher->m_thread_data; 660 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 661 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 662 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 663 if (m_orig_arena) { 664 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 665 td.my_last_observer = m_orig_last_observer; 666 667 // Notify the market that this thread releasing a one slot 668 // that can be used by a worker thread. 669 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 670 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1); 671 } 672 673 td.leave_task_dispatcher(); 674 td.my_arena_slot->release(); 675 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 676 677 td.attach_arena(*m_orig_arena, m_orig_slot_index); 678 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 679 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 680 } 681 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 682 } 683 684 private: 685 execution_data_ext m_orig_execute_data_ext{}; 686 arena* m_orig_arena{ nullptr }; 687 observer_proxy* m_orig_last_observer{ nullptr }; 688 task_dispatcher* m_task_dispatcher{ nullptr }; 689 unsigned m_orig_slot_index{}; 690 bool m_orig_fifo_tasks_allowed{}; 691 bool m_orig_critical_task_allowed{}; 692 }; 693 694 class delegated_task : public d1::task { 695 d1::delegate_base& m_delegate; 696 concurrent_monitor& m_monitor; 697 d1::wait_context& m_wait_ctx; 698 std::atomic<bool> m_completed; 699 d1::task* execute(d1::execution_data& ed) override { 700 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 701 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 702 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 703 "The execute data shall point to the current task dispatcher execute data"); 704 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 705 706 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 707 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 708 try_call([&] { 709 m_delegate(); 710 }).on_completion([&] { 711 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 712 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 713 }); 714 715 finalize(); 716 return nullptr; 717 } 718 d1::task* cancel(d1::execution_data&) override { 719 finalize(); 720 return nullptr; 721 } 722 void finalize() { 723 m_wait_ctx.release(); // must precede the wakeup 724 m_monitor.notify([this] (std::uintptr_t ctx) { 725 return ctx == std::uintptr_t(&m_delegate); 726 }); // do not relax, it needs a fence! 727 m_completed.store(true, std::memory_order_release); 728 } 729 public: 730 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 731 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 732 ~delegated_task() override { 733 // The destructor can be called earlier than the m_monitor is notified 734 // because the waiting thread can be released after m_wait_ctx.release_wait. 735 // To close that race we wait for the m_completed signal. 736 spin_wait_until_eq(m_completed, true); 737 } 738 }; 739 740 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 741 arena* a = ta.my_arena.load(std::memory_order_relaxed); 742 __TBB_ASSERT(a != nullptr, nullptr); 743 thread_data* td = governor::get_thread_data(); 744 745 bool same_arena = td->my_arena == a; 746 std::size_t index1 = td->my_arena_index; 747 if (!same_arena) { 748 index1 = a->occupy_free_slot</*as_worker */false>(*td); 749 if (index1 == arena::out_of_arena) { 750 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 751 d1::wait_context wo(1); 752 d1::task_group_context exec_context(d1::task_group_context::isolated); 753 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 754 755 delegated_task dt(d, a->my_exit_monitors, wo); 756 a->enqueue_task( dt, exec_context, *td); 757 size_t index2 = arena::out_of_arena; 758 do { 759 a->my_exit_monitors.prepare_wait(waiter); 760 if (!wo.continue_execution()) { 761 a->my_exit_monitors.cancel_wait(waiter); 762 break; 763 } 764 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 765 if (index2 != arena::out_of_arena) { 766 a->my_exit_monitors.cancel_wait(waiter); 767 nested_arena_context scope(*td, *a, index2 ); 768 r1::wait(wo, exec_context); 769 __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred 770 break; 771 } 772 a->my_exit_monitors.commit_wait(waiter); 773 } while (wo.continue_execution()); 774 if (index2 == arena::out_of_arena) { 775 // notify a waiting thread even if this thread did not enter arena, 776 // in case it was woken by a leaving thread but did not need to enter 777 a->my_exit_monitors.notify_one(); // do not relax! 778 } 779 // process possible exception 780 auto exception = exec_context.my_exception.load(std::memory_order_acquire); 781 if (exception) { 782 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 783 exception->throw_self(); 784 } 785 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 786 return; 787 } // if (index1 == arena::out_of_arena) 788 } // if (!same_arena) 789 790 context_guard_helper</*report_tasks=*/false> context_guard; 791 context_guard.set_ctx(a->my_default_ctx); 792 nested_arena_context scope(*td, *a, index1); 793 #if _WIN64 794 try { 795 #endif 796 d(); 797 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 798 #if _WIN64 799 } catch (...) { 800 context_guard.restore_default(); 801 throw; 802 } 803 #endif 804 } 805 806 void task_arena_impl::wait(d1::task_arena_base& ta) { 807 arena* a = ta.my_arena.load(std::memory_order_relaxed); 808 __TBB_ASSERT(a != nullptr, nullptr); 809 thread_data* td = governor::get_thread_data(); 810 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 811 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 812 if (a->my_max_num_workers != 0) { 813 while (a->num_workers_active() || !a->is_empty()) { 814 yield(); 815 } 816 } 817 } 818 819 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 820 arena* a = nullptr; 821 if( ta ) // for special cases of ta->max_concurrency() 822 a = ta->my_arena.load(std::memory_order_relaxed); 823 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 824 a = td->my_arena; // the current arena if any 825 826 if( a ) { // Get parameters from the arena 827 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr); 828 int mandatory_worker = 0; 829 if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) { 830 mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0; 831 } 832 return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker; 833 } 834 835 if (ta && ta->my_max_concurrency == 1) { 836 return 1; 837 } 838 839 #if __TBB_ARENA_BINDING 840 if (ta) { 841 d1::constraints arena_constraints = d1::constraints{} 842 .set_numa_id(ta->my_numa_id) 843 .set_core_type(ta->core_type()) 844 .set_max_threads_per_core(ta->max_threads_per_core()); 845 return (int)default_concurrency(arena_constraints); 846 } 847 #endif /*!__TBB_ARENA_BINDING*/ 848 849 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr); 850 return int(governor::default_num_threads()); 851 } 852 853 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 854 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 855 thread_data* tls = governor::get_thread_data(); 856 assert_pointers_valid(tls, tls->my_task_dispatcher); 857 task_dispatcher* dispatcher = tls->my_task_dispatcher; 858 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 859 try_call([&] { 860 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 861 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 862 // Save the current isolation value and set new one 863 previous_isolation = dispatcher->set_isolation(current_isolation); 864 // Isolation within this callable 865 d(); 866 }).on_completion([&] { 867 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr); 868 dispatcher->set_isolation(previous_isolation); 869 }); 870 } 871 872 } // namespace r1 873 } // namespace detail 874 } // namespace tbb 875