1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "arena.h" 20 #include "itt_notify.h" 21 #include "semaphore.h" 22 #include "waiters.h" 23 #include "oneapi/tbb/detail/_task.h" 24 #include "oneapi/tbb/info.h" 25 #include "oneapi/tbb/tbb_allocator.h" 26 27 #include <atomic> 28 #include <cstring> 29 #include <functional> 30 31 namespace tbb { 32 namespace detail { 33 namespace r1 { 34 35 #if __TBB_ARENA_BINDING 36 class numa_binding_observer : public tbb::task_scheduler_observer { 37 binding_handler* my_binding_handler; 38 public: 39 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 40 : task_scheduler_observer(*ta) 41 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 42 {} 43 44 void on_scheduler_entry( bool ) override { 45 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 46 } 47 48 void on_scheduler_exit( bool ) override { 49 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 50 } 51 52 ~numa_binding_observer(){ 53 destroy_binding_handler(my_binding_handler); 54 } 55 }; 56 57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 58 numa_binding_observer* binding_observer = nullptr; 59 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 60 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 61 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 62 binding_observer->observe(true); 63 } 64 return binding_observer; 65 } 66 67 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 68 __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer"); 69 binding_observer->observe(false); 70 binding_observer->~numa_binding_observer(); 71 deallocate_memory(binding_observer); 72 } 73 #endif /*!__TBB_ARENA_BINDING*/ 74 75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 76 if ( lower >= upper ) return out_of_arena; 77 // Start search for an empty slot from the one we occupied the last time 78 std::size_t index = tls.my_arena_index; 79 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 80 __TBB_ASSERT( index >= lower && index < upper, NULL ); 81 // Find a free slot 82 for ( std::size_t i = index; i < upper; ++i ) 83 if (my_slots[i].try_occupy()) return i; 84 for ( std::size_t i = lower; i < index; ++i ) 85 if (my_slots[i].try_occupy()) return i; 86 return out_of_arena; 87 } 88 89 template <bool as_worker> 90 std::size_t arena::occupy_free_slot(thread_data& tls) { 91 // Firstly, external threads try to occupy reserved slots 92 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 93 if ( index == out_of_arena ) { 94 // Secondly, all threads try to occupy all non-reserved slots 95 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 96 // Likely this arena is already saturated 97 if ( index == out_of_arena ) 98 return out_of_arena; 99 } 100 101 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 102 return index; 103 } 104 105 std::uintptr_t arena::calculate_stealing_threshold() { 106 stack_anchor_type anchor; 107 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size()); 108 } 109 110 void arena::process(thread_data& tls) { 111 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 112 __TBB_ASSERT( is_alive(my_guard), nullptr); 113 __TBB_ASSERT( my_num_slots > 1, nullptr); 114 115 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 116 if (index == out_of_arena) { 117 on_thread_leaving<ref_worker>(); 118 return; 119 } 120 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 121 tls.attach_arena(*this, index); 122 // worker thread enters the dispatch loop to look for a work 123 tls.my_inbox.set_is_idle(true); 124 if (tls.my_arena_slot->is_task_pool_published()) { 125 tls.my_inbox.set_is_idle(false); 126 } 127 128 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 129 task_disp.set_stealing_threshold(calculate_stealing_threshold()); 130 __TBB_ASSERT(task_disp.can_steal(), nullptr); 131 tls.attach_task_dispatcher(task_disp); 132 133 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 134 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 135 136 // Waiting on special object tied to this arena 137 outermost_worker_waiter waiter(*this); 138 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 139 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is 140 // attached to it. 141 tls.my_inbox.set_is_idle(true); 142 143 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 144 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 145 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 146 147 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 148 tls.my_last_observer = nullptr; 149 150 task_disp.set_stealing_threshold(0); 151 tls.detach_task_dispatcher(); 152 153 // Arena slot detach (arena may be used in market::process) 154 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 155 tls.my_arena_slot->release(); 156 tls.my_arena_slot = nullptr; 157 tls.my_inbox.detach(); 158 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 159 __TBB_ASSERT(is_alive(my_guard), nullptr); 160 161 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 162 // that arena may be temporarily left unpopulated by threads. See comments in 163 // arena::on_thread_leaving() for more details. 164 on_thread_leaving<ref_worker>(); 165 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 166 } 167 168 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level ) 169 { 170 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 171 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 172 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 173 my_market = &m; 174 my_limit = 1; 175 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 176 my_num_slots = num_arena_slots(num_slots); 177 my_num_reserved_slots = num_reserved_slots; 178 my_max_num_workers = num_slots-num_reserved_slots; 179 my_priority_level = priority_level; 180 my_references = ref_external; // accounts for the external thread 181 my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed); 182 my_observers.my_arena = this; 183 my_co_cache.init(4 * num_slots); 184 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL ); 185 // Initialize the default context. It should be allocated before task_dispatch construction. 186 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 187 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 188 // Construct slots. Mark internal synchronization elements for the tools. 189 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 190 for( unsigned i = 0; i < my_num_slots; ++i ) { 191 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL ); 192 __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL ); 193 __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL ); 194 mailbox(i).construct(); 195 my_slots[i].init_task_streams(i); 196 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 197 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 198 } 199 my_fifo_task_stream.initialize(my_num_slots); 200 my_resume_task_stream.initialize(my_num_slots); 201 #if __TBB_PREVIEW_CRITICAL_TASKS 202 my_critical_task_stream.initialize(my_num_slots); 203 #endif 204 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 205 my_local_concurrency_requests = 0; 206 my_local_concurrency_flag.clear(); 207 my_global_concurrency_mode.store(false, std::memory_order_relaxed); 208 #endif 209 } 210 211 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, 212 unsigned priority_level ) 213 { 214 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 215 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 216 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 217 std::size_t n = allocation_size(num_arena_slots(num_slots)); 218 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 219 // Zero all slots to indicate that they are empty 220 std::memset( storage, 0, n ); 221 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) 222 arena(m, num_slots, num_reserved_slots, priority_level); 223 } 224 225 void arena::free_arena () { 226 __TBB_ASSERT( is_alive(my_guard), NULL ); 227 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 228 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 229 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, 230 "Inconsistent state of a dying arena" ); 231 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 232 __TBB_ASSERT( !my_global_concurrency_mode, NULL ); 233 #endif 234 poison_value( my_guard ); 235 for ( unsigned i = 0; i < my_num_slots; ++i ) { 236 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 237 // TODO: understand the assertion and modify 238 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL ); 239 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty 240 my_slots[i].free_task_pool(); 241 mailbox(i).drain(); 242 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 243 } 244 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 245 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 246 // Cleanup coroutines/schedulers cache 247 my_co_cache.cleanup(); 248 my_default_ctx->~task_group_context(); 249 cache_aligned_deallocate(my_default_ctx); 250 #if __TBB_PREVIEW_CRITICAL_TASKS 251 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 252 #endif 253 // remove an internal reference 254 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); 255 if ( !my_observers.empty() ) { 256 my_observers.clear(); 257 } 258 void* storage = &mailbox(my_num_slots-1); 259 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL ); 260 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL ); 261 this->~arena(); 262 #if TBB_USE_ASSERT > 1 263 std::memset( storage, 0, allocation_size(my_num_slots) ); 264 #endif /* TBB_USE_ASSERT */ 265 cache_aligned_deallocate( storage ); 266 } 267 268 bool arena::has_enqueued_tasks() { 269 return !my_fifo_task_stream.empty(); 270 } 271 272 bool arena::is_out_of_work() { 273 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 274 if (my_local_concurrency_flag.try_clear_if([this] { 275 return !has_enqueued_tasks(); 276 })) { 277 my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true); 278 } 279 #endif 280 281 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 282 switch (my_pool_state.load(std::memory_order_acquire)) { 283 case SNAPSHOT_EMPTY: 284 return true; 285 case SNAPSHOT_FULL: { 286 // Use unique id for "busy" in order to avoid ABA problems. 287 const pool_state_t busy = pool_state_t(&busy); 288 // Helper for CAS execution 289 pool_state_t expected_state; 290 291 // Request permission to take snapshot 292 expected_state = SNAPSHOT_FULL; 293 if (my_pool_state.compare_exchange_strong(expected_state, busy)) { 294 // Got permission. Take the snapshot. 295 // NOTE: This is not a lock, as the state can be set to FULL at 296 // any moment by a thread that spawns/enqueues new task. 297 std::size_t n = my_limit.load(std::memory_order_acquire); 298 // Make local copies of volatile parameters. Their change during 299 // snapshot taking procedure invalidates the attempt, and returns 300 // this thread into the dispatch loop. 301 std::size_t k; 302 for (k = 0; k < n; ++k) { 303 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool && 304 my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed)) 305 { 306 // k-th primary task pool is nonempty and does contain tasks. 307 break; 308 } 309 if (my_pool_state.load(std::memory_order_acquire) != busy) 310 return false; // the work was published 311 } 312 bool work_absent = k == n; 313 // Test and test-and-set. 314 if (my_pool_state.load(std::memory_order_acquire) == busy) { 315 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty(); 316 #if __TBB_PREVIEW_CRITICAL_TASKS 317 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty(); 318 #endif 319 work_absent = work_absent && no_stream_tasks; 320 if (work_absent) { 321 // save current demand value before setting SNAPSHOT_EMPTY, 322 // to avoid race with advertise_new_work. 323 int current_demand = (int)my_max_num_workers; 324 expected_state = busy; 325 if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) { 326 // This thread transitioned pool to empty state, and thus is 327 // responsible for telling the market that there is no work to do. 328 my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false); 329 return true; 330 } 331 return false; 332 } 333 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. 334 expected_state = busy; 335 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL); 336 } 337 } 338 return false; 339 } 340 default: 341 // Another thread is taking a snapshot. 342 return false; 343 } 344 } 345 346 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 347 task_group_context_impl::bind_to(ctx, &td); 348 task_accessor::context(t) = &ctx; 349 task_accessor::isolation(t) = no_isolation; 350 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 351 advertise_new_work<work_enqueued>(); 352 } 353 354 } // namespace r1 355 } // namespace detail 356 } // namespace tbb 357 358 // Enable task_arena.h 359 #include "oneapi/tbb/task_arena.h" // task_arena_base 360 361 namespace tbb { 362 namespace detail { 363 namespace r1 { 364 365 #if TBB_USE_ASSERT 366 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 367 bool is_arena_priority_correct = 368 a_priority == tbb::task_arena::priority::high || 369 a_priority == tbb::task_arena::priority::normal || 370 a_priority == tbb::task_arena::priority::low; 371 __TBB_ASSERT( is_arena_priority_correct, 372 "Task arena priority should be equal to one of the predefined values." ); 373 } 374 #else 375 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 376 #endif 377 378 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 379 assert_arena_priority_valid( a_priority ); 380 return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 381 } 382 383 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 384 auto priority = tbb::task_arena::priority( 385 (market::num_priority_levels - priority_level) * d1::priority_stride 386 ); 387 assert_arena_priority_valid( priority ); 388 return priority; 389 } 390 391 struct task_arena_impl { 392 static void initialize(d1::task_arena_base&); 393 static void terminate(d1::task_arena_base&); 394 static bool attach(d1::task_arena_base&); 395 static void execute(d1::task_arena_base&, d1::delegate_base&); 396 static void wait(d1::task_arena_base&); 397 static int max_concurrency(const d1::task_arena_base*); 398 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); 399 }; 400 401 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 402 task_arena_impl::initialize(ta); 403 } 404 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 405 task_arena_impl::terminate(ta); 406 } 407 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 408 return task_arena_impl::attach(ta); 409 } 410 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 411 task_arena_impl::execute(ta, d); 412 } 413 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 414 task_arena_impl::wait(ta); 415 } 416 417 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 418 return task_arena_impl::max_concurrency(ta); 419 } 420 421 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 422 task_arena_impl::enqueue(t, nullptr, ta); 423 } 424 425 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) { 426 task_arena_impl::enqueue(t, &ctx, ta); 427 } 428 429 void task_arena_impl::initialize(d1::task_arena_base& ta) { 430 governor::one_time_init(); 431 if (ta.my_max_concurrency < 1) { 432 #if __TBB_ARENA_BINDING 433 434 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 435 d1::constraints arena_constraints = d1::constraints{} 436 .set_core_type(ta.core_type()) 437 .set_max_threads_per_core(ta.max_threads_per_core()) 438 .set_numa_id(ta.my_numa_id); 439 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 440 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 441 ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id); 442 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 443 444 #else /*!__TBB_ARENA_BINDING*/ 445 ta.my_max_concurrency = (int)governor::default_num_threads(); 446 #endif /*!__TBB_ARENA_BINDING*/ 447 } 448 449 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 450 unsigned priority_level = arena_priority_level(ta.my_priority); 451 arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0); 452 ta.my_arena.store(a, std::memory_order_release); 453 // add an internal market reference; a public reference was added in create_arena 454 market::global_market( /*is_public=*/false); 455 #if __TBB_ARENA_BINDING 456 a->my_numa_binding_observer = construct_binding_observer( 457 static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 458 #endif /*__TBB_ARENA_BINDING*/ 459 } 460 461 void task_arena_impl::terminate(d1::task_arena_base& ta) { 462 arena* a = ta.my_arena.load(std::memory_order_relaxed); 463 assert_pointer_valid(a); 464 #if __TBB_ARENA_BINDING 465 if(a->my_numa_binding_observer != nullptr ) { 466 destroy_binding_observer(a->my_numa_binding_observer); 467 a->my_numa_binding_observer = nullptr; 468 } 469 #endif /*__TBB_ARENA_BINDING*/ 470 a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); 471 a->on_thread_leaving<arena::ref_external>(); 472 ta.my_arena.store(nullptr, std::memory_order_relaxed); 473 } 474 475 bool task_arena_impl::attach(d1::task_arena_base& ta) { 476 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 477 thread_data* td = governor::get_thread_data_if_initialized(); 478 if( td && td->my_arena ) { 479 arena* a = td->my_arena; 480 // There is an active arena to attach to. 481 // It's still used by s, so won't be destroyed right away. 482 __TBB_ASSERT(a->my_references > 0, NULL ); 483 a->my_references += arena::ref_external; 484 ta.my_num_reserved_slots = a->my_num_reserved_slots; 485 ta.my_priority = arena_priority(a->my_priority_level); 486 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 487 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL); 488 ta.my_arena.store(a, std::memory_order_release); 489 // increases market's ref count for task_arena 490 market::global_market( /*is_public=*/true ); 491 return true; 492 } 493 return false; 494 } 495 496 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) { 497 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 498 assert_pointer_valid(td, "thread_data pointer should not be null"); 499 arena* a = ta ? 500 ta->my_arena.load(std::memory_order_relaxed) 501 : td->my_arena 502 ; 503 assert_pointer_valid(a, "arena pointer should not be null"); 504 auto* ctx = c ? c : a->my_default_ctx; 505 assert_pointer_valid(ctx, "context pointer should not be null"); 506 // Is there a better place for checking the state of ctx? 507 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 508 "The task will not be executed because its task_group_context is cancelled."); 509 a->enqueue_task(t, *ctx, *td); 510 } 511 512 class nested_arena_context : no_copy { 513 public: 514 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 515 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 516 { 517 if (td.my_arena != &nested_arena) { 518 m_orig_arena = td.my_arena; 519 m_orig_slot_index = td.my_arena_index; 520 m_orig_last_observer = td.my_last_observer; 521 522 td.detach_task_dispatcher(); 523 td.attach_arena(nested_arena, slot_index); 524 if (td.my_inbox.is_idle_state(true)) 525 td.my_inbox.set_is_idle(false); 526 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 527 task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold); 528 td.attach_task_dispatcher(task_disp); 529 530 // If the calling thread occupies the slots out of external thread reserve we need to notify the 531 // market that this arena requires one worker less. 532 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 533 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false); 534 } 535 536 td.my_last_observer = nullptr; 537 // The task_arena::execute method considers each calling thread as an external thread. 538 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 539 } 540 541 m_task_dispatcher = td.my_task_dispatcher; 542 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 543 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 544 m_task_dispatcher->m_properties.critical_task_allowed = true; 545 546 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 547 ed_ext.context = td.my_arena->my_default_ctx; 548 ed_ext.original_slot = td.my_arena_index; 549 ed_ext.affinity_slot = d1::no_slot; 550 ed_ext.task_disp = td.my_task_dispatcher; 551 ed_ext.isolation = no_isolation; 552 553 __TBB_ASSERT(td.my_arena_slot, nullptr); 554 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 555 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 556 } 557 ~nested_arena_context() { 558 thread_data& td = *m_task_dispatcher->m_thread_data; 559 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 560 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 561 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 562 if (m_orig_arena) { 563 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 564 td.my_last_observer = m_orig_last_observer; 565 566 // Notify the market that this thread releasing a one slot 567 // that can be used by a worker thread. 568 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 569 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false); 570 } 571 572 td.my_task_dispatcher->set_stealing_threshold(0); 573 td.detach_task_dispatcher(); 574 td.my_arena_slot->release(); 575 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 576 577 td.attach_arena(*m_orig_arena, m_orig_slot_index); 578 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 579 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 580 } 581 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 582 } 583 584 private: 585 execution_data_ext m_orig_execute_data_ext{}; 586 arena* m_orig_arena{ nullptr }; 587 observer_proxy* m_orig_last_observer{ nullptr }; 588 task_dispatcher* m_task_dispatcher{ nullptr }; 589 unsigned m_orig_slot_index{}; 590 bool m_orig_fifo_tasks_allowed{}; 591 bool m_orig_critical_task_allowed{}; 592 }; 593 594 class delegated_task : public d1::task { 595 d1::delegate_base& m_delegate; 596 concurrent_monitor& m_monitor; 597 d1::wait_context& m_wait_ctx; 598 std::atomic<bool> m_completed; 599 d1::task* execute(d1::execution_data& ed) override { 600 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 601 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 602 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 603 "The execute data shall point to the current task dispatcher execute data"); 604 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 605 606 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 607 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 608 try_call([&] { 609 m_delegate(); 610 }).on_completion([&] { 611 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 612 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 613 }); 614 615 finalize(); 616 return nullptr; 617 } 618 d1::task* cancel(d1::execution_data&) override { 619 finalize(); 620 return nullptr; 621 } 622 void finalize() { 623 m_wait_ctx.release(); // must precede the wakeup 624 m_monitor.notify([this](std::uintptr_t ctx) { 625 return ctx == std::uintptr_t(&m_delegate); 626 }); // do not relax, it needs a fence! 627 m_completed.store(true, std::memory_order_release); 628 } 629 public: 630 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 631 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 632 ~delegated_task() { 633 // The destructor can be called earlier than the m_monitor is notified 634 // because the waiting thread can be released after m_wait_ctx.release_wait. 635 // To close that race we wait for the m_completed signal. 636 spin_wait_until_eq(m_completed, true); 637 } 638 }; 639 640 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 641 arena* a = ta.my_arena.load(std::memory_order_relaxed); 642 __TBB_ASSERT(a != nullptr, nullptr); 643 thread_data* td = governor::get_thread_data(); 644 645 bool same_arena = td->my_arena == a; 646 std::size_t index1 = td->my_arena_index; 647 if (!same_arena) { 648 index1 = a->occupy_free_slot</*as_worker */false>(*td); 649 if (index1 == arena::out_of_arena) { 650 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 651 d1::wait_context wo(1); 652 d1::task_group_context exec_context(d1::task_group_context::isolated); 653 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 654 655 delegated_task dt(d, a->my_exit_monitors, wo); 656 a->enqueue_task( dt, exec_context, *td); 657 size_t index2 = arena::out_of_arena; 658 do { 659 a->my_exit_monitors.prepare_wait(waiter); 660 if (!wo.continue_execution()) { 661 a->my_exit_monitors.cancel_wait(waiter); 662 break; 663 } 664 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 665 if (index2 != arena::out_of_arena) { 666 a->my_exit_monitors.cancel_wait(waiter); 667 nested_arena_context scope(*td, *a, index2 ); 668 r1::wait(wo, exec_context); 669 __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred 670 break; 671 } 672 a->my_exit_monitors.commit_wait(waiter); 673 } while (wo.continue_execution()); 674 if (index2 == arena::out_of_arena) { 675 // notify a waiting thread even if this thread did not enter arena, 676 // in case it was woken by a leaving thread but did not need to enter 677 a->my_exit_monitors.notify_one(); // do not relax! 678 } 679 // process possible exception 680 if (exec_context.my_exception) { 681 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 682 exec_context.my_exception->throw_self(); 683 } 684 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 685 return; 686 } // if (index1 == arena::out_of_arena) 687 } // if (!same_arena) 688 689 context_guard_helper</*report_tasks=*/false> context_guard; 690 context_guard.set_ctx(a->my_default_ctx); 691 nested_arena_context scope(*td, *a, index1); 692 #if _WIN64 693 try { 694 #endif 695 d(); 696 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 697 #if _WIN64 698 } catch (...) { 699 context_guard.restore_default(); 700 throw; 701 } 702 #endif 703 } 704 705 void task_arena_impl::wait(d1::task_arena_base& ta) { 706 arena* a = ta.my_arena.load(std::memory_order_relaxed); 707 __TBB_ASSERT(a != nullptr, nullptr); 708 thread_data* td = governor::get_thread_data(); 709 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 710 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 711 if (a->my_max_num_workers != 0) { 712 while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) { 713 yield(); 714 } 715 } 716 } 717 718 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 719 arena* a = nullptr; 720 if( ta ) // for special cases of ta->max_concurrency() 721 a = ta->my_arena.load(std::memory_order_relaxed); 722 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 723 a = td->my_arena; // the current arena if any 724 725 if( a ) { // Get parameters from the arena 726 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL ); 727 return a->my_num_reserved_slots + a->my_max_num_workers 728 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 729 + (a->my_local_concurrency_flag.test() ? 1 : 0) 730 #endif 731 ; 732 } 733 734 if (ta && ta->my_max_concurrency == 1) { 735 return 1; 736 } 737 738 #if __TBB_ARENA_BINDING 739 if (ta) { 740 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 741 d1::constraints arena_constraints = d1::constraints{} 742 .set_numa_id(ta->my_numa_id) 743 .set_core_type(ta->core_type()) 744 .set_max_threads_per_core(ta->max_threads_per_core()); 745 return (int)default_concurrency(arena_constraints); 746 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 747 return (int)default_concurrency(ta->my_numa_id); 748 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 749 } 750 #endif /*!__TBB_ARENA_BINDING*/ 751 752 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL ); 753 return int(governor::default_num_threads()); 754 } 755 756 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 757 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 758 thread_data* tls = governor::get_thread_data(); 759 assert_pointers_valid(tls, tls->my_task_dispatcher); 760 task_dispatcher* dispatcher = tls->my_task_dispatcher; 761 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 762 try_call([&] { 763 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 764 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 765 // Save the current isolation value and set new one 766 previous_isolation = dispatcher->set_isolation(current_isolation); 767 // Isolation within this callable 768 d(); 769 }).on_completion([&] { 770 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL); 771 dispatcher->set_isolation(previous_isolation); 772 }); 773 } 774 775 } // namespace r1 776 } // namespace detail 777 } // namespace tbb 778