1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "arena.h" 20 #include "itt_notify.h" 21 #include "semaphore.h" 22 #include "waiters.h" 23 #include "oneapi/tbb/detail/_task.h" 24 #include "oneapi/tbb/info.h" 25 #include "oneapi/tbb/tbb_allocator.h" 26 27 #include <atomic> 28 #include <cstring> 29 #include <functional> 30 31 namespace tbb { 32 namespace detail { 33 namespace r1 { 34 35 #if __TBB_ARENA_BINDING 36 class numa_binding_observer : public tbb::task_scheduler_observer { 37 binding_handler* my_binding_handler; 38 public: 39 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 40 : task_scheduler_observer(*ta) 41 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 42 {} 43 44 void on_scheduler_entry( bool ) override { 45 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 46 } 47 48 void on_scheduler_exit( bool ) override { 49 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 50 } 51 52 ~numa_binding_observer() override{ 53 destroy_binding_handler(my_binding_handler); 54 } 55 }; 56 57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 58 numa_binding_observer* binding_observer = nullptr; 59 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 60 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 61 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 62 binding_observer->observe(true); 63 } 64 return binding_observer; 65 } 66 67 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 68 __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer"); 69 binding_observer->observe(false); 70 binding_observer->~numa_binding_observer(); 71 deallocate_memory(binding_observer); 72 } 73 #endif /*!__TBB_ARENA_BINDING*/ 74 75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 76 if ( lower >= upper ) return out_of_arena; 77 // Start search for an empty slot from the one we occupied the last time 78 std::size_t index = tls.my_arena_index; 79 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 80 __TBB_ASSERT( index >= lower && index < upper, nullptr); 81 // Find a free slot 82 for ( std::size_t i = index; i < upper; ++i ) 83 if (my_slots[i].try_occupy()) return i; 84 for ( std::size_t i = lower; i < index; ++i ) 85 if (my_slots[i].try_occupy()) return i; 86 return out_of_arena; 87 } 88 89 template <bool as_worker> 90 std::size_t arena::occupy_free_slot(thread_data& tls) { 91 // Firstly, external threads try to occupy reserved slots 92 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 93 if ( index == out_of_arena ) { 94 // Secondly, all threads try to occupy all non-reserved slots 95 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 96 // Likely this arena is already saturated 97 if ( index == out_of_arena ) 98 return out_of_arena; 99 } 100 101 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 102 return index; 103 } 104 105 std::uintptr_t arena::calculate_stealing_threshold() { 106 stack_anchor_type anchor; 107 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size()); 108 } 109 110 void arena::process(thread_data& tls) { 111 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 112 __TBB_ASSERT( is_alive(my_guard), nullptr); 113 __TBB_ASSERT( my_num_slots >= 1, nullptr); 114 115 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 116 if (index == out_of_arena) { 117 on_thread_leaving<ref_worker>(); 118 return; 119 } 120 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 121 tls.attach_arena(*this, index); 122 // worker thread enters the dispatch loop to look for a work 123 tls.my_inbox.set_is_idle(true); 124 if (tls.my_arena_slot->is_task_pool_published()) { 125 tls.my_inbox.set_is_idle(false); 126 } 127 128 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 129 tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold()); 130 __TBB_ASSERT(task_disp.can_steal(), nullptr); 131 132 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 133 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 134 135 // Waiting on special object tied to this arena 136 outermost_worker_waiter waiter(*this); 137 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 138 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is 139 // attached to it. 140 tls.my_inbox.set_is_idle(true); 141 142 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 143 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 144 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 145 146 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 147 tls.my_last_observer = nullptr; 148 149 tls.leave_task_dispatcher(); 150 151 // Arena slot detach (arena may be used in market::process) 152 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 153 tls.my_arena_slot->release(); 154 tls.my_arena_slot = nullptr; 155 tls.my_inbox.detach(); 156 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 157 __TBB_ASSERT(is_alive(my_guard), nullptr); 158 159 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 160 // that arena may be temporarily left unpopulated by threads. See comments in 161 // arena::on_thread_leaving() for more details. 162 on_thread_leaving<ref_worker>(); 163 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 164 } 165 166 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level ) 167 { 168 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 169 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 170 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 171 my_market = &m; 172 my_limit = 1; 173 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 174 my_num_slots = num_arena_slots(num_slots, num_reserved_slots); 175 my_num_reserved_slots = num_reserved_slots; 176 my_max_num_workers = num_slots-num_reserved_slots; 177 my_priority_level = priority_level; 178 my_references = ref_external; // accounts for the external thread 179 my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed); 180 my_observers.my_arena = this; 181 my_co_cache.init(4 * num_slots); 182 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr); 183 // Initialize the default context. It should be allocated before task_dispatch construction. 184 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 185 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 186 // Construct slots. Mark internal synchronization elements for the tools. 187 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 188 for( unsigned i = 0; i < my_num_slots; ++i ) { 189 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr); 190 __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr); 191 __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr); 192 mailbox(i).construct(); 193 my_slots[i].init_task_streams(i); 194 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 195 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 196 } 197 my_fifo_task_stream.initialize(my_num_slots); 198 my_resume_task_stream.initialize(my_num_slots); 199 #if __TBB_PREVIEW_CRITICAL_TASKS 200 my_critical_task_stream.initialize(my_num_slots); 201 #endif 202 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 203 my_local_concurrency_requests = 0; 204 my_local_concurrency_flag.clear(); 205 my_global_concurrency_mode.store(false, std::memory_order_relaxed); 206 #endif 207 } 208 209 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, 210 unsigned priority_level ) 211 { 212 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 213 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 214 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 215 std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots)); 216 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 217 // Zero all slots to indicate that they are empty 218 std::memset( storage, 0, n ); 219 return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) ) 220 arena(m, num_slots, num_reserved_slots, priority_level); 221 } 222 223 void arena::free_arena () { 224 __TBB_ASSERT( is_alive(my_guard), nullptr); 225 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 226 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 227 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, 228 "Inconsistent state of a dying arena" ); 229 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 230 __TBB_ASSERT( !my_global_concurrency_mode, nullptr); 231 #endif 232 #if __TBB_ARENA_BINDING 233 if (my_numa_binding_observer != nullptr) { 234 destroy_binding_observer(my_numa_binding_observer); 235 my_numa_binding_observer = nullptr; 236 } 237 #endif /*__TBB_ARENA_BINDING*/ 238 poison_value( my_guard ); 239 for ( unsigned i = 0; i < my_num_slots; ++i ) { 240 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 241 // TODO: understand the assertion and modify 242 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr); 243 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty 244 my_slots[i].free_task_pool(); 245 mailbox(i).drain(); 246 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 247 } 248 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 249 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 250 // Cleanup coroutines/schedulers cache 251 my_co_cache.cleanup(); 252 my_default_ctx->~task_group_context(); 253 cache_aligned_deallocate(my_default_ctx); 254 #if __TBB_PREVIEW_CRITICAL_TASKS 255 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 256 #endif 257 // remove an internal reference 258 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); 259 260 // Clear enfources synchronization with observe(false) 261 my_observers.clear(); 262 263 void* storage = &mailbox(my_num_slots-1); 264 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr); 265 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, nullptr); 266 this->~arena(); 267 #if TBB_USE_ASSERT > 1 268 std::memset( storage, 0, allocation_size(my_num_slots) ); 269 #endif /* TBB_USE_ASSERT */ 270 cache_aligned_deallocate( storage ); 271 } 272 273 bool arena::has_enqueued_tasks() { 274 return !my_fifo_task_stream.empty(); 275 } 276 277 bool arena::is_out_of_work() { 278 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 279 if (my_local_concurrency_flag.try_clear_if([this] { 280 return !has_enqueued_tasks(); 281 })) { 282 my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true); 283 } 284 #endif 285 286 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 287 switch (my_pool_state.load(std::memory_order_acquire)) { 288 case SNAPSHOT_EMPTY: 289 return true; 290 case SNAPSHOT_FULL: { 291 // Use unique id for "busy" in order to avoid ABA problems. 292 const pool_state_t busy = pool_state_t(&busy); 293 // Helper for CAS execution 294 pool_state_t expected_state; 295 296 // Request permission to take snapshot 297 expected_state = SNAPSHOT_FULL; 298 if (my_pool_state.compare_exchange_strong(expected_state, busy)) { 299 // Got permission. Take the snapshot. 300 // NOTE: This is not a lock, as the state can be set to FULL at 301 // any moment by a thread that spawns/enqueues new task. 302 std::size_t n = my_limit.load(std::memory_order_acquire); 303 // Make local copies of volatile parameters. Their change during 304 // snapshot taking procedure invalidates the attempt, and returns 305 // this thread into the dispatch loop. 306 std::size_t k; 307 for (k = 0; k < n; ++k) { 308 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool && 309 my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed)) 310 { 311 // k-th primary task pool is nonempty and does contain tasks. 312 break; 313 } 314 if (my_pool_state.load(std::memory_order_acquire) != busy) 315 return false; // the work was published 316 } 317 bool work_absent = k == n; 318 // Test and test-and-set. 319 if (my_pool_state.load(std::memory_order_acquire) == busy) { 320 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty(); 321 #if __TBB_PREVIEW_CRITICAL_TASKS 322 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty(); 323 #endif 324 work_absent = work_absent && no_stream_tasks; 325 if (work_absent) { 326 // save current demand value before setting SNAPSHOT_EMPTY, 327 // to avoid race with advertise_new_work. 328 int current_demand = (int)my_max_num_workers; 329 expected_state = busy; 330 if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) { 331 // This thread transitioned pool to empty state, and thus is 332 // responsible for telling the market that there is no work to do. 333 my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false); 334 return true; 335 } 336 return false; 337 } 338 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. 339 expected_state = busy; 340 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL); 341 } 342 } 343 return false; 344 } 345 default: 346 // Another thread is taking a snapshot. 347 return false; 348 } 349 } 350 351 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 352 task_group_context_impl::bind_to(ctx, &td); 353 task_accessor::context(t) = &ctx; 354 task_accessor::isolation(t) = no_isolation; 355 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 356 advertise_new_work<work_enqueued>(); 357 } 358 359 } // namespace r1 360 } // namespace detail 361 } // namespace tbb 362 363 // Enable task_arena.h 364 #include "oneapi/tbb/task_arena.h" // task_arena_base 365 366 namespace tbb { 367 namespace detail { 368 namespace r1 { 369 370 #if TBB_USE_ASSERT 371 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 372 bool is_arena_priority_correct = 373 a_priority == tbb::task_arena::priority::high || 374 a_priority == tbb::task_arena::priority::normal || 375 a_priority == tbb::task_arena::priority::low; 376 __TBB_ASSERT( is_arena_priority_correct, 377 "Task arena priority should be equal to one of the predefined values." ); 378 } 379 #else 380 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 381 #endif 382 383 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 384 assert_arena_priority_valid( a_priority ); 385 return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 386 } 387 388 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 389 auto priority = tbb::task_arena::priority( 390 (market::num_priority_levels - priority_level) * d1::priority_stride 391 ); 392 assert_arena_priority_valid( priority ); 393 return priority; 394 } 395 396 struct task_arena_impl { 397 static void initialize(d1::task_arena_base&); 398 static void terminate(d1::task_arena_base&); 399 static bool attach(d1::task_arena_base&); 400 static void execute(d1::task_arena_base&, d1::delegate_base&); 401 static void wait(d1::task_arena_base&); 402 static int max_concurrency(const d1::task_arena_base*); 403 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); 404 }; 405 406 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 407 task_arena_impl::initialize(ta); 408 } 409 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 410 task_arena_impl::terminate(ta); 411 } 412 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 413 return task_arena_impl::attach(ta); 414 } 415 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 416 task_arena_impl::execute(ta, d); 417 } 418 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 419 task_arena_impl::wait(ta); 420 } 421 422 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 423 return task_arena_impl::max_concurrency(ta); 424 } 425 426 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 427 task_arena_impl::enqueue(t, nullptr, ta); 428 } 429 430 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) { 431 task_arena_impl::enqueue(t, &ctx, ta); 432 } 433 434 void task_arena_impl::initialize(d1::task_arena_base& ta) { 435 // Enforce global market initialization to properly initialize soft limit 436 (void)governor::get_thread_data(); 437 if (ta.my_max_concurrency < 1) { 438 #if __TBB_ARENA_BINDING 439 d1::constraints arena_constraints = d1::constraints{} 440 .set_core_type(ta.core_type()) 441 .set_max_threads_per_core(ta.max_threads_per_core()) 442 .set_numa_id(ta.my_numa_id); 443 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 444 #else /*!__TBB_ARENA_BINDING*/ 445 ta.my_max_concurrency = (int)governor::default_num_threads(); 446 #endif /*!__TBB_ARENA_BINDING*/ 447 } 448 449 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 450 unsigned priority_level = arena_priority_level(ta.my_priority); 451 arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0); 452 ta.my_arena.store(a, std::memory_order_release); 453 // add an internal market reference; a public reference was added in create_arena 454 market::global_market( /*is_public=*/false); 455 #if __TBB_ARENA_BINDING 456 a->my_numa_binding_observer = construct_binding_observer( 457 static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 458 #endif /*__TBB_ARENA_BINDING*/ 459 } 460 461 void task_arena_impl::terminate(d1::task_arena_base& ta) { 462 arena* a = ta.my_arena.load(std::memory_order_relaxed); 463 assert_pointer_valid(a); 464 a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); 465 a->on_thread_leaving<arena::ref_external>(); 466 ta.my_arena.store(nullptr, std::memory_order_relaxed); 467 } 468 469 bool task_arena_impl::attach(d1::task_arena_base& ta) { 470 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 471 thread_data* td = governor::get_thread_data_if_initialized(); 472 if( td && td->my_arena ) { 473 arena* a = td->my_arena; 474 // There is an active arena to attach to. 475 // It's still used by s, so won't be destroyed right away. 476 __TBB_ASSERT(a->my_references > 0, nullptr); 477 a->my_references += arena::ref_external; 478 ta.my_num_reserved_slots = a->my_num_reserved_slots; 479 ta.my_priority = arena_priority(a->my_priority_level); 480 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 481 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr); 482 ta.my_arena.store(a, std::memory_order_release); 483 // increases market's ref count for task_arena 484 market::global_market( /*is_public=*/true ); 485 return true; 486 } 487 return false; 488 } 489 490 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) { 491 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 492 assert_pointer_valid(td, "thread_data pointer should not be null"); 493 arena* a = ta ? 494 ta->my_arena.load(std::memory_order_relaxed) 495 : td->my_arena 496 ; 497 assert_pointer_valid(a, "arena pointer should not be null"); 498 auto* ctx = c ? c : a->my_default_ctx; 499 assert_pointer_valid(ctx, "context pointer should not be null"); 500 // Is there a better place for checking the state of ctx? 501 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 502 "The task will not be executed because its task_group_context is cancelled."); 503 a->enqueue_task(t, *ctx, *td); 504 } 505 506 class nested_arena_context : no_copy { 507 public: 508 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 509 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 510 { 511 if (td.my_arena != &nested_arena) { 512 m_orig_arena = td.my_arena; 513 m_orig_slot_index = td.my_arena_index; 514 m_orig_last_observer = td.my_last_observer; 515 516 td.detach_task_dispatcher(); 517 td.attach_arena(nested_arena, slot_index); 518 if (td.my_inbox.is_idle_state(true)) 519 td.my_inbox.set_is_idle(false); 520 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 521 td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold); 522 523 // If the calling thread occupies the slots out of external thread reserve we need to notify the 524 // market that this arena requires one worker less. 525 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 526 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false); 527 } 528 529 td.my_last_observer = nullptr; 530 // The task_arena::execute method considers each calling thread as an external thread. 531 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 532 } 533 534 m_task_dispatcher = td.my_task_dispatcher; 535 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 536 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 537 m_task_dispatcher->m_properties.critical_task_allowed = true; 538 539 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 540 ed_ext.context = td.my_arena->my_default_ctx; 541 ed_ext.original_slot = td.my_arena_index; 542 ed_ext.affinity_slot = d1::no_slot; 543 ed_ext.task_disp = td.my_task_dispatcher; 544 ed_ext.isolation = no_isolation; 545 546 __TBB_ASSERT(td.my_arena_slot, nullptr); 547 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 548 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 549 } 550 ~nested_arena_context() { 551 thread_data& td = *m_task_dispatcher->m_thread_data; 552 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 553 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 554 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 555 if (m_orig_arena) { 556 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 557 td.my_last_observer = m_orig_last_observer; 558 559 // Notify the market that this thread releasing a one slot 560 // that can be used by a worker thread. 561 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 562 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false); 563 } 564 565 td.leave_task_dispatcher(); 566 td.my_arena_slot->release(); 567 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 568 569 td.attach_arena(*m_orig_arena, m_orig_slot_index); 570 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 571 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 572 } 573 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 574 } 575 576 private: 577 execution_data_ext m_orig_execute_data_ext{}; 578 arena* m_orig_arena{ nullptr }; 579 observer_proxy* m_orig_last_observer{ nullptr }; 580 task_dispatcher* m_task_dispatcher{ nullptr }; 581 unsigned m_orig_slot_index{}; 582 bool m_orig_fifo_tasks_allowed{}; 583 bool m_orig_critical_task_allowed{}; 584 }; 585 586 class delegated_task : public d1::task { 587 d1::delegate_base& m_delegate; 588 concurrent_monitor& m_monitor; 589 d1::wait_context& m_wait_ctx; 590 std::atomic<bool> m_completed; 591 d1::task* execute(d1::execution_data& ed) override { 592 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 593 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 594 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 595 "The execute data shall point to the current task dispatcher execute data"); 596 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 597 598 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 599 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 600 try_call([&] { 601 m_delegate(); 602 }).on_completion([&] { 603 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 604 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 605 }); 606 607 finalize(); 608 return nullptr; 609 } 610 d1::task* cancel(d1::execution_data&) override { 611 finalize(); 612 return nullptr; 613 } 614 void finalize() { 615 m_wait_ctx.release(); // must precede the wakeup 616 m_monitor.notify([this](std::uintptr_t ctx) { 617 return ctx == std::uintptr_t(&m_delegate); 618 }); // do not relax, it needs a fence! 619 m_completed.store(true, std::memory_order_release); 620 } 621 public: 622 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 623 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 624 ~delegated_task() override { 625 // The destructor can be called earlier than the m_monitor is notified 626 // because the waiting thread can be released after m_wait_ctx.release_wait. 627 // To close that race we wait for the m_completed signal. 628 spin_wait_until_eq(m_completed, true); 629 } 630 }; 631 632 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 633 arena* a = ta.my_arena.load(std::memory_order_relaxed); 634 __TBB_ASSERT(a != nullptr, nullptr); 635 thread_data* td = governor::get_thread_data(); 636 637 bool same_arena = td->my_arena == a; 638 std::size_t index1 = td->my_arena_index; 639 if (!same_arena) { 640 index1 = a->occupy_free_slot</*as_worker */false>(*td); 641 if (index1 == arena::out_of_arena) { 642 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 643 d1::wait_context wo(1); 644 d1::task_group_context exec_context(d1::task_group_context::isolated); 645 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 646 647 delegated_task dt(d, a->my_exit_monitors, wo); 648 a->enqueue_task( dt, exec_context, *td); 649 size_t index2 = arena::out_of_arena; 650 do { 651 a->my_exit_monitors.prepare_wait(waiter); 652 if (!wo.continue_execution()) { 653 a->my_exit_monitors.cancel_wait(waiter); 654 break; 655 } 656 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 657 if (index2 != arena::out_of_arena) { 658 a->my_exit_monitors.cancel_wait(waiter); 659 nested_arena_context scope(*td, *a, index2 ); 660 r1::wait(wo, exec_context); 661 __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred 662 break; 663 } 664 a->my_exit_monitors.commit_wait(waiter); 665 } while (wo.continue_execution()); 666 if (index2 == arena::out_of_arena) { 667 // notify a waiting thread even if this thread did not enter arena, 668 // in case it was woken by a leaving thread but did not need to enter 669 a->my_exit_monitors.notify_one(); // do not relax! 670 } 671 // process possible exception 672 auto exception = exec_context.my_exception.load(std::memory_order_acquire); 673 if (exception) { 674 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 675 exception->throw_self(); 676 } 677 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 678 return; 679 } // if (index1 == arena::out_of_arena) 680 } // if (!same_arena) 681 682 context_guard_helper</*report_tasks=*/false> context_guard; 683 context_guard.set_ctx(a->my_default_ctx); 684 nested_arena_context scope(*td, *a, index1); 685 #if _WIN64 686 try { 687 #endif 688 d(); 689 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 690 #if _WIN64 691 } catch (...) { 692 context_guard.restore_default(); 693 throw; 694 } 695 #endif 696 } 697 698 void task_arena_impl::wait(d1::task_arena_base& ta) { 699 arena* a = ta.my_arena.load(std::memory_order_relaxed); 700 __TBB_ASSERT(a != nullptr, nullptr); 701 thread_data* td = governor::get_thread_data(); 702 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 703 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 704 if (a->my_max_num_workers != 0) { 705 while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) { 706 yield(); 707 } 708 } 709 } 710 711 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 712 arena* a = nullptr; 713 if( ta ) // for special cases of ta->max_concurrency() 714 a = ta->my_arena.load(std::memory_order_relaxed); 715 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 716 a = td->my_arena; // the current arena if any 717 718 if( a ) { // Get parameters from the arena 719 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr); 720 return a->my_num_reserved_slots + a->my_max_num_workers 721 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 722 + (a->my_local_concurrency_flag.test() ? 1 : 0) 723 #endif 724 ; 725 } 726 727 if (ta && ta->my_max_concurrency == 1) { 728 return 1; 729 } 730 731 #if __TBB_ARENA_BINDING 732 if (ta) { 733 d1::constraints arena_constraints = d1::constraints{} 734 .set_numa_id(ta->my_numa_id) 735 .set_core_type(ta->core_type()) 736 .set_max_threads_per_core(ta->max_threads_per_core()); 737 return (int)default_concurrency(arena_constraints); 738 } 739 #endif /*!__TBB_ARENA_BINDING*/ 740 741 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr); 742 return int(governor::default_num_threads()); 743 } 744 745 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 746 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 747 thread_data* tls = governor::get_thread_data(); 748 assert_pointers_valid(tls, tls->my_task_dispatcher); 749 task_dispatcher* dispatcher = tls->my_task_dispatcher; 750 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 751 try_call([&] { 752 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 753 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 754 // Save the current isolation value and set new one 755 previous_isolation = dispatcher->set_isolation(current_isolation); 756 // Isolation within this callable 757 d(); 758 }).on_completion([&] { 759 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr); 760 dispatcher->set_isolation(previous_isolation); 761 }); 762 } 763 764 } // namespace r1 765 } // namespace detail 766 } // namespace tbb 767