1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "arena.h" 20 #include "itt_notify.h" 21 #include "semaphore.h" 22 #include "waiters.h" 23 #include "oneapi/tbb/detail/_task.h" 24 #include "oneapi/tbb/info.h" 25 #include "oneapi/tbb/tbb_allocator.h" 26 27 #include <atomic> 28 #include <cstring> 29 #include <functional> 30 31 namespace tbb { 32 namespace detail { 33 namespace r1 { 34 35 #if __TBB_ARENA_BINDING 36 class numa_binding_observer : public tbb::task_scheduler_observer { 37 binding_handler* my_binding_handler; 38 public: 39 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 40 : task_scheduler_observer(*ta) 41 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 42 {} 43 44 void on_scheduler_entry( bool ) override { 45 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 46 } 47 48 void on_scheduler_exit( bool ) override { 49 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 50 } 51 52 ~numa_binding_observer() override{ 53 destroy_binding_handler(my_binding_handler); 54 } 55 }; 56 57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 58 numa_binding_observer* binding_observer = nullptr; 59 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 60 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 61 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 62 binding_observer->observe(true); 63 } 64 return binding_observer; 65 } 66 67 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 68 __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer"); 69 binding_observer->observe(false); 70 binding_observer->~numa_binding_observer(); 71 deallocate_memory(binding_observer); 72 } 73 #endif /*!__TBB_ARENA_BINDING*/ 74 75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 76 if ( lower >= upper ) return out_of_arena; 77 // Start search for an empty slot from the one we occupied the last time 78 std::size_t index = tls.my_arena_index; 79 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 80 __TBB_ASSERT( index >= lower && index < upper, nullptr); 81 // Find a free slot 82 for ( std::size_t i = index; i < upper; ++i ) 83 if (my_slots[i].try_occupy()) return i; 84 for ( std::size_t i = lower; i < index; ++i ) 85 if (my_slots[i].try_occupy()) return i; 86 return out_of_arena; 87 } 88 89 template <bool as_worker> 90 std::size_t arena::occupy_free_slot(thread_data& tls) { 91 // Firstly, external threads try to occupy reserved slots 92 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 93 if ( index == out_of_arena ) { 94 // Secondly, all threads try to occupy all non-reserved slots 95 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 96 // Likely this arena is already saturated 97 if ( index == out_of_arena ) 98 return out_of_arena; 99 } 100 101 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 102 return index; 103 } 104 105 std::uintptr_t arena::calculate_stealing_threshold() { 106 stack_anchor_type anchor; 107 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size()); 108 } 109 110 void arena::process(thread_data& tls) { 111 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 112 __TBB_ASSERT( is_alive(my_guard), nullptr); 113 __TBB_ASSERT( my_num_slots > 1, nullptr); 114 115 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 116 if (index == out_of_arena) { 117 on_thread_leaving<ref_worker>(); 118 return; 119 } 120 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 121 tls.attach_arena(*this, index); 122 // worker thread enters the dispatch loop to look for a work 123 tls.my_inbox.set_is_idle(true); 124 if (tls.my_arena_slot->is_task_pool_published()) { 125 tls.my_inbox.set_is_idle(false); 126 } 127 128 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 129 tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold()); 130 __TBB_ASSERT(task_disp.can_steal(), nullptr); 131 132 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 133 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 134 135 // Waiting on special object tied to this arena 136 outermost_worker_waiter waiter(*this); 137 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 138 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is 139 // attached to it. 140 tls.my_inbox.set_is_idle(true); 141 142 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 143 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 144 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 145 146 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 147 tls.my_last_observer = nullptr; 148 149 tls.leave_task_dispatcher(); 150 151 // Arena slot detach (arena may be used in market::process) 152 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 153 tls.my_arena_slot->release(); 154 tls.my_arena_slot = nullptr; 155 tls.my_inbox.detach(); 156 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 157 __TBB_ASSERT(is_alive(my_guard), nullptr); 158 159 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 160 // that arena may be temporarily left unpopulated by threads. See comments in 161 // arena::on_thread_leaving() for more details. 162 on_thread_leaving<ref_worker>(); 163 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 164 } 165 166 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level ) 167 { 168 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 169 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 170 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 171 my_market = &m; 172 my_limit = 1; 173 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 174 my_num_slots = num_arena_slots(num_slots); 175 my_num_reserved_slots = num_reserved_slots; 176 my_max_num_workers = num_slots-num_reserved_slots; 177 my_priority_level = priority_level; 178 my_references = ref_external; // accounts for the external thread 179 my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed); 180 my_observers.my_arena = this; 181 my_co_cache.init(4 * num_slots); 182 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr); 183 // Initialize the default context. It should be allocated before task_dispatch construction. 184 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 185 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 186 // Construct slots. Mark internal synchronization elements for the tools. 187 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 188 for( unsigned i = 0; i < my_num_slots; ++i ) { 189 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr); 190 __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr); 191 __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr); 192 mailbox(i).construct(); 193 my_slots[i].init_task_streams(i); 194 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 195 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 196 } 197 my_fifo_task_stream.initialize(my_num_slots); 198 my_resume_task_stream.initialize(my_num_slots); 199 #if __TBB_PREVIEW_CRITICAL_TASKS 200 my_critical_task_stream.initialize(my_num_slots); 201 #endif 202 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 203 my_local_concurrency_requests = 0; 204 my_local_concurrency_flag.clear(); 205 my_global_concurrency_mode.store(false, std::memory_order_relaxed); 206 #endif 207 } 208 209 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, 210 unsigned priority_level ) 211 { 212 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 213 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 214 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 215 std::size_t n = allocation_size(num_arena_slots(num_slots)); 216 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 217 // Zero all slots to indicate that they are empty 218 std::memset( storage, 0, n ); 219 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) 220 arena(m, num_slots, num_reserved_slots, priority_level); 221 } 222 223 void arena::free_arena () { 224 __TBB_ASSERT( is_alive(my_guard), nullptr); 225 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 226 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 227 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, 228 "Inconsistent state of a dying arena" ); 229 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 230 __TBB_ASSERT( !my_global_concurrency_mode, nullptr); 231 #endif 232 #if __TBB_ARENA_BINDING 233 if (my_numa_binding_observer != nullptr) { 234 destroy_binding_observer(my_numa_binding_observer); 235 my_numa_binding_observer = nullptr; 236 } 237 #endif /*__TBB_ARENA_BINDING*/ 238 poison_value( my_guard ); 239 for ( unsigned i = 0; i < my_num_slots; ++i ) { 240 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 241 // TODO: understand the assertion and modify 242 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr); 243 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty 244 my_slots[i].free_task_pool(); 245 mailbox(i).drain(); 246 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 247 } 248 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 249 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 250 // Cleanup coroutines/schedulers cache 251 my_co_cache.cleanup(); 252 my_default_ctx->~task_group_context(); 253 cache_aligned_deallocate(my_default_ctx); 254 #if __TBB_PREVIEW_CRITICAL_TASKS 255 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 256 #endif 257 // remove an internal reference 258 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); 259 260 // Clear enfources synchronization with observe(false) 261 my_observers.clear(); 262 263 void* storage = &mailbox(my_num_slots-1); 264 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr); 265 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, nullptr); 266 this->~arena(); 267 #if TBB_USE_ASSERT > 1 268 std::memset( storage, 0, allocation_size(my_num_slots) ); 269 #endif /* TBB_USE_ASSERT */ 270 cache_aligned_deallocate( storage ); 271 } 272 273 bool arena::has_enqueued_tasks() { 274 return !my_fifo_task_stream.empty(); 275 } 276 277 bool arena::is_out_of_work() { 278 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 279 if (my_local_concurrency_flag.try_clear_if([this] { 280 return !has_enqueued_tasks(); 281 })) { 282 my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true); 283 } 284 #endif 285 286 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 287 switch (my_pool_state.load(std::memory_order_acquire)) { 288 case SNAPSHOT_EMPTY: 289 return true; 290 case SNAPSHOT_FULL: { 291 // Use unique id for "busy" in order to avoid ABA problems. 292 const pool_state_t busy = pool_state_t(&busy); 293 // Helper for CAS execution 294 pool_state_t expected_state; 295 296 // Request permission to take snapshot 297 expected_state = SNAPSHOT_FULL; 298 if (my_pool_state.compare_exchange_strong(expected_state, busy)) { 299 // Got permission. Take the snapshot. 300 // NOTE: This is not a lock, as the state can be set to FULL at 301 // any moment by a thread that spawns/enqueues new task. 302 std::size_t n = my_limit.load(std::memory_order_acquire); 303 // Make local copies of volatile parameters. Their change during 304 // snapshot taking procedure invalidates the attempt, and returns 305 // this thread into the dispatch loop. 306 std::size_t k; 307 for (k = 0; k < n; ++k) { 308 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool && 309 my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed)) 310 { 311 // k-th primary task pool is nonempty and does contain tasks. 312 break; 313 } 314 if (my_pool_state.load(std::memory_order_acquire) != busy) 315 return false; // the work was published 316 } 317 bool work_absent = k == n; 318 // Test and test-and-set. 319 if (my_pool_state.load(std::memory_order_acquire) == busy) { 320 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty(); 321 #if __TBB_PREVIEW_CRITICAL_TASKS 322 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty(); 323 #endif 324 work_absent = work_absent && no_stream_tasks; 325 if (work_absent) { 326 // save current demand value before setting SNAPSHOT_EMPTY, 327 // to avoid race with advertise_new_work. 328 int current_demand = (int)my_max_num_workers; 329 expected_state = busy; 330 if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) { 331 // This thread transitioned pool to empty state, and thus is 332 // responsible for telling the market that there is no work to do. 333 my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false); 334 return true; 335 } 336 return false; 337 } 338 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. 339 expected_state = busy; 340 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL); 341 } 342 } 343 return false; 344 } 345 default: 346 // Another thread is taking a snapshot. 347 return false; 348 } 349 } 350 351 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 352 task_group_context_impl::bind_to(ctx, &td); 353 task_accessor::context(t) = &ctx; 354 task_accessor::isolation(t) = no_isolation; 355 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 356 advertise_new_work<work_enqueued>(); 357 } 358 359 } // namespace r1 360 } // namespace detail 361 } // namespace tbb 362 363 // Enable task_arena.h 364 #include "oneapi/tbb/task_arena.h" // task_arena_base 365 366 namespace tbb { 367 namespace detail { 368 namespace r1 { 369 370 #if TBB_USE_ASSERT 371 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 372 bool is_arena_priority_correct = 373 a_priority == tbb::task_arena::priority::high || 374 a_priority == tbb::task_arena::priority::normal || 375 a_priority == tbb::task_arena::priority::low; 376 __TBB_ASSERT( is_arena_priority_correct, 377 "Task arena priority should be equal to one of the predefined values." ); 378 } 379 #else 380 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 381 #endif 382 383 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 384 assert_arena_priority_valid( a_priority ); 385 return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 386 } 387 388 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 389 auto priority = tbb::task_arena::priority( 390 (market::num_priority_levels - priority_level) * d1::priority_stride 391 ); 392 assert_arena_priority_valid( priority ); 393 return priority; 394 } 395 396 struct task_arena_impl { 397 static void initialize(d1::task_arena_base&); 398 static void terminate(d1::task_arena_base&); 399 static bool attach(d1::task_arena_base&); 400 static void execute(d1::task_arena_base&, d1::delegate_base&); 401 static void wait(d1::task_arena_base&); 402 static int max_concurrency(const d1::task_arena_base*); 403 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); 404 }; 405 406 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 407 task_arena_impl::initialize(ta); 408 } 409 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 410 task_arena_impl::terminate(ta); 411 } 412 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 413 return task_arena_impl::attach(ta); 414 } 415 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 416 task_arena_impl::execute(ta, d); 417 } 418 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 419 task_arena_impl::wait(ta); 420 } 421 422 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 423 return task_arena_impl::max_concurrency(ta); 424 } 425 426 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 427 task_arena_impl::enqueue(t, nullptr, ta); 428 } 429 430 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) { 431 task_arena_impl::enqueue(t, &ctx, ta); 432 } 433 434 void task_arena_impl::initialize(d1::task_arena_base& ta) { 435 // Enforce global market initialization to properly initialize soft limit 436 (void)governor::get_thread_data(); 437 if (ta.my_max_concurrency < 1) { 438 #if __TBB_ARENA_BINDING 439 440 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 441 d1::constraints arena_constraints = d1::constraints{} 442 .set_core_type(ta.core_type()) 443 .set_max_threads_per_core(ta.max_threads_per_core()) 444 .set_numa_id(ta.my_numa_id); 445 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 446 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 447 ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id); 448 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 449 450 #else /*!__TBB_ARENA_BINDING*/ 451 ta.my_max_concurrency = (int)governor::default_num_threads(); 452 #endif /*!__TBB_ARENA_BINDING*/ 453 } 454 455 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 456 unsigned priority_level = arena_priority_level(ta.my_priority); 457 arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0); 458 ta.my_arena.store(a, std::memory_order_release); 459 // add an internal market reference; a public reference was added in create_arena 460 market::global_market( /*is_public=*/false); 461 #if __TBB_ARENA_BINDING 462 a->my_numa_binding_observer = construct_binding_observer( 463 static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 464 #endif /*__TBB_ARENA_BINDING*/ 465 } 466 467 void task_arena_impl::terminate(d1::task_arena_base& ta) { 468 arena* a = ta.my_arena.load(std::memory_order_relaxed); 469 assert_pointer_valid(a); 470 a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); 471 a->on_thread_leaving<arena::ref_external>(); 472 ta.my_arena.store(nullptr, std::memory_order_relaxed); 473 } 474 475 bool task_arena_impl::attach(d1::task_arena_base& ta) { 476 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 477 thread_data* td = governor::get_thread_data_if_initialized(); 478 if( td && td->my_arena ) { 479 arena* a = td->my_arena; 480 // There is an active arena to attach to. 481 // It's still used by s, so won't be destroyed right away. 482 __TBB_ASSERT(a->my_references > 0, nullptr); 483 a->my_references += arena::ref_external; 484 ta.my_num_reserved_slots = a->my_num_reserved_slots; 485 ta.my_priority = arena_priority(a->my_priority_level); 486 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 487 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, nullptr); 488 ta.my_arena.store(a, std::memory_order_release); 489 // increases market's ref count for task_arena 490 market::global_market( /*is_public=*/true ); 491 return true; 492 } 493 return false; 494 } 495 496 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) { 497 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 498 assert_pointer_valid(td, "thread_data pointer should not be null"); 499 arena* a = ta ? 500 ta->my_arena.load(std::memory_order_relaxed) 501 : td->my_arena 502 ; 503 assert_pointer_valid(a, "arena pointer should not be null"); 504 auto* ctx = c ? c : a->my_default_ctx; 505 assert_pointer_valid(ctx, "context pointer should not be null"); 506 // Is there a better place for checking the state of ctx? 507 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 508 "The task will not be executed because its task_group_context is cancelled."); 509 a->enqueue_task(t, *ctx, *td); 510 } 511 512 class nested_arena_context : no_copy { 513 public: 514 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 515 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 516 { 517 if (td.my_arena != &nested_arena) { 518 m_orig_arena = td.my_arena; 519 m_orig_slot_index = td.my_arena_index; 520 m_orig_last_observer = td.my_last_observer; 521 522 td.detach_task_dispatcher(); 523 td.attach_arena(nested_arena, slot_index); 524 if (td.my_inbox.is_idle_state(true)) 525 td.my_inbox.set_is_idle(false); 526 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 527 td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold); 528 529 // If the calling thread occupies the slots out of external thread reserve we need to notify the 530 // market that this arena requires one worker less. 531 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 532 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false); 533 } 534 535 td.my_last_observer = nullptr; 536 // The task_arena::execute method considers each calling thread as an external thread. 537 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 538 } 539 540 m_task_dispatcher = td.my_task_dispatcher; 541 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 542 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 543 m_task_dispatcher->m_properties.critical_task_allowed = true; 544 545 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 546 ed_ext.context = td.my_arena->my_default_ctx; 547 ed_ext.original_slot = td.my_arena_index; 548 ed_ext.affinity_slot = d1::no_slot; 549 ed_ext.task_disp = td.my_task_dispatcher; 550 ed_ext.isolation = no_isolation; 551 552 __TBB_ASSERT(td.my_arena_slot, nullptr); 553 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 554 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 555 } 556 ~nested_arena_context() { 557 thread_data& td = *m_task_dispatcher->m_thread_data; 558 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 559 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 560 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 561 if (m_orig_arena) { 562 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 563 td.my_last_observer = m_orig_last_observer; 564 565 // Notify the market that this thread releasing a one slot 566 // that can be used by a worker thread. 567 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 568 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false); 569 } 570 571 td.leave_task_dispatcher(); 572 td.my_arena_slot->release(); 573 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 574 575 td.attach_arena(*m_orig_arena, m_orig_slot_index); 576 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 577 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 578 } 579 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 580 } 581 582 private: 583 execution_data_ext m_orig_execute_data_ext{}; 584 arena* m_orig_arena{ nullptr }; 585 observer_proxy* m_orig_last_observer{ nullptr }; 586 task_dispatcher* m_task_dispatcher{ nullptr }; 587 unsigned m_orig_slot_index{}; 588 bool m_orig_fifo_tasks_allowed{}; 589 bool m_orig_critical_task_allowed{}; 590 }; 591 592 class delegated_task : public d1::task { 593 d1::delegate_base& m_delegate; 594 concurrent_monitor& m_monitor; 595 d1::wait_context& m_wait_ctx; 596 std::atomic<bool> m_completed; 597 d1::task* execute(d1::execution_data& ed) override { 598 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 599 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 600 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 601 "The execute data shall point to the current task dispatcher execute data"); 602 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 603 604 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 605 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 606 try_call([&] { 607 m_delegate(); 608 }).on_completion([&] { 609 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 610 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 611 }); 612 613 finalize(); 614 return nullptr; 615 } 616 d1::task* cancel(d1::execution_data&) override { 617 finalize(); 618 return nullptr; 619 } 620 void finalize() { 621 m_wait_ctx.release(); // must precede the wakeup 622 m_monitor.notify([this](std::uintptr_t ctx) { 623 return ctx == std::uintptr_t(&m_delegate); 624 }); // do not relax, it needs a fence! 625 m_completed.store(true, std::memory_order_release); 626 } 627 public: 628 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 629 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 630 ~delegated_task() override { 631 // The destructor can be called earlier than the m_monitor is notified 632 // because the waiting thread can be released after m_wait_ctx.release_wait. 633 // To close that race we wait for the m_completed signal. 634 spin_wait_until_eq(m_completed, true); 635 } 636 }; 637 638 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 639 arena* a = ta.my_arena.load(std::memory_order_relaxed); 640 __TBB_ASSERT(a != nullptr, nullptr); 641 thread_data* td = governor::get_thread_data(); 642 643 bool same_arena = td->my_arena == a; 644 std::size_t index1 = td->my_arena_index; 645 if (!same_arena) { 646 index1 = a->occupy_free_slot</*as_worker */false>(*td); 647 if (index1 == arena::out_of_arena) { 648 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 649 d1::wait_context wo(1); 650 d1::task_group_context exec_context(d1::task_group_context::isolated); 651 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 652 653 delegated_task dt(d, a->my_exit_monitors, wo); 654 a->enqueue_task( dt, exec_context, *td); 655 size_t index2 = arena::out_of_arena; 656 do { 657 a->my_exit_monitors.prepare_wait(waiter); 658 if (!wo.continue_execution()) { 659 a->my_exit_monitors.cancel_wait(waiter); 660 break; 661 } 662 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 663 if (index2 != arena::out_of_arena) { 664 a->my_exit_monitors.cancel_wait(waiter); 665 nested_arena_context scope(*td, *a, index2 ); 666 r1::wait(wo, exec_context); 667 __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred 668 break; 669 } 670 a->my_exit_monitors.commit_wait(waiter); 671 } while (wo.continue_execution()); 672 if (index2 == arena::out_of_arena) { 673 // notify a waiting thread even if this thread did not enter arena, 674 // in case it was woken by a leaving thread but did not need to enter 675 a->my_exit_monitors.notify_one(); // do not relax! 676 } 677 // process possible exception 678 auto exception = exec_context.my_exception.load(std::memory_order_acquire); 679 if (exception) { 680 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 681 exception->throw_self(); 682 } 683 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 684 return; 685 } // if (index1 == arena::out_of_arena) 686 } // if (!same_arena) 687 688 context_guard_helper</*report_tasks=*/false> context_guard; 689 context_guard.set_ctx(a->my_default_ctx); 690 nested_arena_context scope(*td, *a, index1); 691 #if _WIN64 692 try { 693 #endif 694 d(); 695 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 696 #if _WIN64 697 } catch (...) { 698 context_guard.restore_default(); 699 throw; 700 } 701 #endif 702 } 703 704 void task_arena_impl::wait(d1::task_arena_base& ta) { 705 arena* a = ta.my_arena.load(std::memory_order_relaxed); 706 __TBB_ASSERT(a != nullptr, nullptr); 707 thread_data* td = governor::get_thread_data(); 708 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 709 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 710 if (a->my_max_num_workers != 0) { 711 while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) { 712 yield(); 713 } 714 } 715 } 716 717 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 718 arena* a = nullptr; 719 if( ta ) // for special cases of ta->max_concurrency() 720 a = ta->my_arena.load(std::memory_order_relaxed); 721 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 722 a = td->my_arena; // the current arena if any 723 724 if( a ) { // Get parameters from the arena 725 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr); 726 return a->my_num_reserved_slots + a->my_max_num_workers 727 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 728 + (a->my_local_concurrency_flag.test() ? 1 : 0) 729 #endif 730 ; 731 } 732 733 if (ta && ta->my_max_concurrency == 1) { 734 return 1; 735 } 736 737 #if __TBB_ARENA_BINDING 738 if (ta) { 739 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 740 d1::constraints arena_constraints = d1::constraints{} 741 .set_numa_id(ta->my_numa_id) 742 .set_core_type(ta->core_type()) 743 .set_max_threads_per_core(ta->max_threads_per_core()); 744 return (int)default_concurrency(arena_constraints); 745 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 746 return (int)default_concurrency(ta->my_numa_id); 747 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 748 } 749 #endif /*!__TBB_ARENA_BINDING*/ 750 751 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr); 752 return int(governor::default_num_threads()); 753 } 754 755 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 756 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 757 thread_data* tls = governor::get_thread_data(); 758 assert_pointers_valid(tls, tls->my_task_dispatcher); 759 task_dispatcher* dispatcher = tls->my_task_dispatcher; 760 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 761 try_call([&] { 762 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 763 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 764 // Save the current isolation value and set new one 765 previous_isolation = dispatcher->set_isolation(current_isolation); 766 // Isolation within this callable 767 d(); 768 }).on_completion([&] { 769 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr); 770 dispatcher->set_isolation(previous_isolation); 771 }); 772 } 773 774 } // namespace r1 775 } // namespace detail 776 } // namespace tbb 777