1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "arena.h" 20 #include "itt_notify.h" 21 #include "semaphore.h" 22 #include "waiters.h" 23 #include "oneapi/tbb/detail/_task.h" 24 #include "oneapi/tbb/info.h" 25 #include "oneapi/tbb/tbb_allocator.h" 26 27 #include <atomic> 28 #include <cstring> 29 #include <functional> 30 31 namespace tbb { 32 namespace detail { 33 namespace r1 { 34 35 #if __TBB_ARENA_BINDING 36 class numa_binding_observer : public tbb::task_scheduler_observer { 37 binding_handler* my_binding_handler; 38 public: 39 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 40 : task_scheduler_observer(*ta) 41 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 42 {} 43 44 void on_scheduler_entry( bool ) override { 45 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 46 } 47 48 void on_scheduler_exit( bool ) override { 49 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 50 } 51 52 ~numa_binding_observer(){ 53 destroy_binding_handler(my_binding_handler); 54 } 55 }; 56 57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 58 numa_binding_observer* binding_observer = nullptr; 59 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 60 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 61 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 62 binding_observer->observe(true); 63 } 64 return binding_observer; 65 } 66 67 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 68 __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer"); 69 binding_observer->observe(false); 70 binding_observer->~numa_binding_observer(); 71 deallocate_memory(binding_observer); 72 } 73 #endif /*!__TBB_ARENA_BINDING*/ 74 75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 76 if ( lower >= upper ) return out_of_arena; 77 // Start search for an empty slot from the one we occupied the last time 78 std::size_t index = tls.my_arena_index; 79 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 80 __TBB_ASSERT( index >= lower && index < upper, NULL ); 81 // Find a free slot 82 for ( std::size_t i = index; i < upper; ++i ) 83 if (my_slots[i].try_occupy()) return i; 84 for ( std::size_t i = lower; i < index; ++i ) 85 if (my_slots[i].try_occupy()) return i; 86 return out_of_arena; 87 } 88 89 template <bool as_worker> 90 std::size_t arena::occupy_free_slot(thread_data& tls) { 91 // Firstly, external threads try to occupy reserved slots 92 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 93 if ( index == out_of_arena ) { 94 // Secondly, all threads try to occupy all non-reserved slots 95 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 96 // Likely this arena is already saturated 97 if ( index == out_of_arena ) 98 return out_of_arena; 99 } 100 101 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 102 return index; 103 } 104 105 std::uintptr_t arena::calculate_stealing_threshold() { 106 stack_anchor_type anchor; 107 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size()); 108 } 109 110 void arena::process(thread_data& tls) { 111 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 112 __TBB_ASSERT( is_alive(my_guard), nullptr); 113 __TBB_ASSERT( my_num_slots > 1, nullptr); 114 115 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 116 if (index == out_of_arena) { 117 on_thread_leaving<ref_worker>(); 118 return; 119 } 120 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 121 tls.attach_arena(*this, index); 122 // worker thread enters the dispatch loop to look for a work 123 tls.my_inbox.set_is_idle(true); 124 if (tls.my_arena_slot->is_task_pool_published()) { 125 tls.my_inbox.set_is_idle(false); 126 } 127 128 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 129 task_disp.set_stealing_threshold(calculate_stealing_threshold()); 130 __TBB_ASSERT(task_disp.can_steal(), nullptr); 131 tls.attach_task_dispatcher(task_disp); 132 133 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 134 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 135 136 // Waiting on special object tied to this arena 137 outermost_worker_waiter waiter(*this); 138 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 139 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is 140 // attached to it. 141 tls.my_inbox.set_is_idle(true); 142 143 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 144 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 145 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 146 147 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 148 tls.my_last_observer = nullptr; 149 150 task_disp.set_stealing_threshold(0); 151 tls.detach_task_dispatcher(); 152 153 // Arena slot detach (arena may be used in market::process) 154 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 155 tls.my_arena_slot->release(); 156 tls.my_arena_slot = nullptr; 157 tls.my_inbox.detach(); 158 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 159 __TBB_ASSERT(is_alive(my_guard), nullptr); 160 161 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 162 // that arena may be temporarily left unpopulated by threads. See comments in 163 // arena::on_thread_leaving() for more details. 164 on_thread_leaving<ref_worker>(); 165 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 166 } 167 168 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level ) 169 { 170 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 171 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 172 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 173 my_market = &m; 174 my_limit = 1; 175 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 176 my_num_slots = num_arena_slots(num_slots); 177 my_num_reserved_slots = num_reserved_slots; 178 my_max_num_workers = num_slots-num_reserved_slots; 179 my_priority_level = priority_level; 180 my_references = ref_external; // accounts for the external thread 181 my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed); 182 my_observers.my_arena = this; 183 my_co_cache.init(4 * num_slots); 184 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL ); 185 // Initialize the default context. It should be allocated before task_dispatch construction. 186 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 187 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 188 // Construct slots. Mark internal synchronization elements for the tools. 189 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 190 for( unsigned i = 0; i < my_num_slots; ++i ) { 191 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL ); 192 __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL ); 193 __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL ); 194 mailbox(i).construct(); 195 my_slots[i].init_task_streams(i); 196 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 197 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 198 } 199 my_fifo_task_stream.initialize(my_num_slots); 200 my_resume_task_stream.initialize(my_num_slots); 201 #if __TBB_PREVIEW_CRITICAL_TASKS 202 my_critical_task_stream.initialize(my_num_slots); 203 #endif 204 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 205 my_local_concurrency_requests = 0; 206 my_local_concurrency_flag.clear(); 207 my_global_concurrency_mode.store(false, std::memory_order_relaxed); 208 #endif 209 } 210 211 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, 212 unsigned priority_level ) 213 { 214 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 215 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 216 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 217 std::size_t n = allocation_size(num_arena_slots(num_slots)); 218 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 219 // Zero all slots to indicate that they are empty 220 std::memset( storage, 0, n ); 221 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) 222 arena(m, num_slots, num_reserved_slots, priority_level); 223 } 224 225 void arena::free_arena () { 226 __TBB_ASSERT( is_alive(my_guard), NULL ); 227 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 228 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 229 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, 230 "Inconsistent state of a dying arena" ); 231 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 232 __TBB_ASSERT( !my_global_concurrency_mode, NULL ); 233 #endif 234 poison_value( my_guard ); 235 std::intptr_t drained = 0; 236 for ( unsigned i = 0; i < my_num_slots; ++i ) { 237 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 238 // TODO: understand the assertion and modify 239 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL ); 240 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty 241 my_slots[i].free_task_pool(); 242 drained += mailbox(i).drain(); 243 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 244 } 245 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 246 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 247 // Cleanup coroutines/schedulers cache 248 my_co_cache.cleanup(); 249 my_default_ctx->~task_group_context(); 250 cache_aligned_deallocate(my_default_ctx); 251 #if __TBB_PREVIEW_CRITICAL_TASKS 252 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 253 #endif 254 // remove an internal reference 255 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); 256 if ( !my_observers.empty() ) { 257 my_observers.clear(); 258 } 259 void* storage = &mailbox(my_num_slots-1); 260 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL ); 261 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL ); 262 this->~arena(); 263 #if TBB_USE_ASSERT > 1 264 std::memset( storage, 0, allocation_size(my_num_slots) ); 265 #endif /* TBB_USE_ASSERT */ 266 cache_aligned_deallocate( storage ); 267 } 268 269 bool arena::has_enqueued_tasks() { 270 return !my_fifo_task_stream.empty(); 271 } 272 273 bool arena::is_out_of_work() { 274 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 275 if (my_local_concurrency_flag.try_clear_if([this] { 276 return !has_enqueued_tasks(); 277 })) { 278 my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true); 279 } 280 #endif 281 282 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 283 switch (my_pool_state.load(std::memory_order_acquire)) { 284 case SNAPSHOT_EMPTY: 285 return true; 286 case SNAPSHOT_FULL: { 287 // Use unique id for "busy" in order to avoid ABA problems. 288 const pool_state_t busy = pool_state_t(&busy); 289 // Helper for CAS execution 290 pool_state_t expected_state; 291 292 // Request permission to take snapshot 293 expected_state = SNAPSHOT_FULL; 294 if (my_pool_state.compare_exchange_strong(expected_state, busy)) { 295 // Got permission. Take the snapshot. 296 // NOTE: This is not a lock, as the state can be set to FULL at 297 // any moment by a thread that spawns/enqueues new task. 298 std::size_t n = my_limit.load(std::memory_order_acquire); 299 // Make local copies of volatile parameters. Their change during 300 // snapshot taking procedure invalidates the attempt, and returns 301 // this thread into the dispatch loop. 302 std::size_t k; 303 for (k = 0; k < n; ++k) { 304 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool && 305 my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed)) 306 { 307 // k-th primary task pool is nonempty and does contain tasks. 308 break; 309 } 310 if (my_pool_state.load(std::memory_order_acquire) != busy) 311 return false; // the work was published 312 } 313 bool work_absent = k == n; 314 // Test and test-and-set. 315 if (my_pool_state.load(std::memory_order_acquire) == busy) { 316 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty(); 317 #if __TBB_PREVIEW_CRITICAL_TASKS 318 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty(); 319 #endif 320 work_absent = work_absent && no_stream_tasks; 321 if (work_absent) { 322 // save current demand value before setting SNAPSHOT_EMPTY, 323 // to avoid race with advertise_new_work. 324 int current_demand = (int)my_max_num_workers; 325 expected_state = busy; 326 if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) { 327 // This thread transitioned pool to empty state, and thus is 328 // responsible for telling the market that there is no work to do. 329 my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false); 330 return true; 331 } 332 return false; 333 } 334 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. 335 expected_state = busy; 336 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL); 337 } 338 } 339 return false; 340 } 341 default: 342 // Another thread is taking a snapshot. 343 return false; 344 } 345 } 346 347 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 348 task_group_context_impl::bind_to(ctx, &td); 349 task_accessor::context(t) = &ctx; 350 task_accessor::isolation(t) = no_isolation; 351 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 352 advertise_new_work<work_enqueued>(); 353 } 354 355 } // namespace r1 356 } // namespace detail 357 } // namespace tbb 358 359 // Enable task_arena.h 360 #include "oneapi/tbb/task_arena.h" // task_arena_base 361 362 namespace tbb { 363 namespace detail { 364 namespace r1 { 365 366 #if TBB_USE_ASSERT 367 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 368 bool is_arena_priority_correct = 369 a_priority == tbb::task_arena::priority::high || 370 a_priority == tbb::task_arena::priority::normal || 371 a_priority == tbb::task_arena::priority::low; 372 __TBB_ASSERT( is_arena_priority_correct, 373 "Task arena priority should be equal to one of the predefined values." ); 374 } 375 #else 376 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 377 #endif 378 379 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 380 assert_arena_priority_valid( a_priority ); 381 return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 382 } 383 384 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 385 auto priority = tbb::task_arena::priority( 386 (market::num_priority_levels - priority_level) * d1::priority_stride 387 ); 388 assert_arena_priority_valid( priority ); 389 return priority; 390 } 391 392 struct task_arena_impl { 393 static void initialize(d1::task_arena_base&); 394 static void terminate(d1::task_arena_base&); 395 static bool attach(d1::task_arena_base&); 396 static void execute(d1::task_arena_base&, d1::delegate_base&); 397 static void wait(d1::task_arena_base&); 398 static int max_concurrency(const d1::task_arena_base*); 399 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*); 400 }; 401 402 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 403 task_arena_impl::initialize(ta); 404 } 405 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 406 task_arena_impl::terminate(ta); 407 } 408 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 409 return task_arena_impl::attach(ta); 410 } 411 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 412 task_arena_impl::execute(ta, d); 413 } 414 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 415 task_arena_impl::wait(ta); 416 } 417 418 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 419 return task_arena_impl::max_concurrency(ta); 420 } 421 422 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 423 task_arena_impl::enqueue(t, nullptr, ta); 424 } 425 426 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) { 427 task_arena_impl::enqueue(t, &ctx, ta); 428 } 429 430 void task_arena_impl::initialize(d1::task_arena_base& ta) { 431 governor::one_time_init(); 432 if (ta.my_max_concurrency < 1) { 433 #if __TBB_ARENA_BINDING 434 435 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 436 d1::constraints arena_constraints = d1::constraints{} 437 .set_core_type(ta.core_type()) 438 .set_max_threads_per_core(ta.max_threads_per_core()) 439 .set_numa_id(ta.my_numa_id); 440 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 441 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 442 ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id); 443 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 444 445 #else /*!__TBB_ARENA_BINDING*/ 446 ta.my_max_concurrency = (int)governor::default_num_threads(); 447 #endif /*!__TBB_ARENA_BINDING*/ 448 } 449 450 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 451 unsigned priority_level = arena_priority_level(ta.my_priority); 452 arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0); 453 ta.my_arena.store(a, std::memory_order_release); 454 // add an internal market reference; a public reference was added in create_arena 455 market::global_market( /*is_public=*/false); 456 #if __TBB_ARENA_BINDING 457 a->my_numa_binding_observer = construct_binding_observer( 458 static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 459 #endif /*__TBB_ARENA_BINDING*/ 460 } 461 462 void task_arena_impl::terminate(d1::task_arena_base& ta) { 463 arena* a = ta.my_arena.load(std::memory_order_relaxed); 464 assert_pointer_valid(a); 465 #if __TBB_ARENA_BINDING 466 if(a->my_numa_binding_observer != nullptr ) { 467 destroy_binding_observer(a->my_numa_binding_observer); 468 a->my_numa_binding_observer = nullptr; 469 } 470 #endif /*__TBB_ARENA_BINDING*/ 471 a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); 472 a->on_thread_leaving<arena::ref_external>(); 473 ta.my_arena.store(nullptr, std::memory_order_relaxed); 474 } 475 476 bool task_arena_impl::attach(d1::task_arena_base& ta) { 477 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 478 thread_data* td = governor::get_thread_data_if_initialized(); 479 if( td && td->my_arena ) { 480 arena* a = td->my_arena; 481 // There is an active arena to attach to. 482 // It's still used by s, so won't be destroyed right away. 483 __TBB_ASSERT(a->my_references > 0, NULL ); 484 a->my_references += arena::ref_external; 485 ta.my_num_reserved_slots = a->my_num_reserved_slots; 486 ta.my_priority = arena_priority(a->my_priority_level); 487 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 488 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL); 489 ta.my_arena.store(a, std::memory_order_release); 490 // increases market's ref count for task_arena 491 market::global_market( /*is_public=*/true ); 492 return true; 493 } 494 return false; 495 } 496 497 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) { 498 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 499 assert_pointer_valid(td, "thread_data pointer should not be null"); 500 arena* a = ta ? 501 ta->my_arena.load(std::memory_order_relaxed) 502 : td->my_arena 503 ; 504 assert_pointer_valid(a, "arena pointer should not be null"); 505 auto* ctx = c ? c : a->my_default_ctx; 506 assert_pointer_valid(ctx, "context pointer should not be null"); 507 // Is there a better place for checking the state of ctx? 508 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 509 "The task will not be executed because its task_group_context is cancelled."); 510 a->enqueue_task(t, *ctx, *td); 511 } 512 513 class nested_arena_context : no_copy { 514 public: 515 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 516 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 517 { 518 if (td.my_arena != &nested_arena) { 519 m_orig_arena = td.my_arena; 520 m_orig_slot_index = td.my_arena_index; 521 m_orig_last_observer = td.my_last_observer; 522 523 td.detach_task_dispatcher(); 524 td.attach_arena(nested_arena, slot_index); 525 if (td.my_inbox.is_idle_state(true)) 526 td.my_inbox.set_is_idle(false); 527 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 528 task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold); 529 td.attach_task_dispatcher(task_disp); 530 531 // If the calling thread occupies the slots out of external thread reserve we need to notify the 532 // market that this arena requires one worker less. 533 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 534 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false); 535 } 536 537 td.my_last_observer = nullptr; 538 // The task_arena::execute method considers each calling thread as an external thread. 539 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 540 } 541 542 m_task_dispatcher = td.my_task_dispatcher; 543 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 544 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 545 m_task_dispatcher->m_properties.critical_task_allowed = true; 546 547 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 548 ed_ext.context = td.my_arena->my_default_ctx; 549 ed_ext.original_slot = td.my_arena_index; 550 ed_ext.affinity_slot = d1::no_slot; 551 ed_ext.task_disp = td.my_task_dispatcher; 552 ed_ext.isolation = no_isolation; 553 554 __TBB_ASSERT(td.my_arena_slot, nullptr); 555 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 556 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 557 } 558 ~nested_arena_context() { 559 thread_data& td = *m_task_dispatcher->m_thread_data; 560 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 561 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 562 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 563 if (m_orig_arena) { 564 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 565 td.my_last_observer = m_orig_last_observer; 566 567 // Notify the market that this thread releasing a one slot 568 // that can be used by a worker thread. 569 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 570 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false); 571 } 572 573 td.my_task_dispatcher->set_stealing_threshold(0); 574 td.detach_task_dispatcher(); 575 td.my_arena_slot->release(); 576 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 577 578 td.attach_arena(*m_orig_arena, m_orig_slot_index); 579 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 580 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 581 } 582 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 583 } 584 585 private: 586 execution_data_ext m_orig_execute_data_ext{}; 587 arena* m_orig_arena{ nullptr }; 588 observer_proxy* m_orig_last_observer{ nullptr }; 589 task_dispatcher* m_task_dispatcher{ nullptr }; 590 unsigned m_orig_slot_index{}; 591 bool m_orig_fifo_tasks_allowed{}; 592 bool m_orig_critical_task_allowed{}; 593 }; 594 595 class delegated_task : public d1::task { 596 d1::delegate_base& m_delegate; 597 concurrent_monitor& m_monitor; 598 d1::wait_context& m_wait_ctx; 599 std::atomic<bool> m_completed; 600 d1::task* execute(d1::execution_data& ed) override { 601 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 602 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 603 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 604 "The execute data shall point to the current task dispatcher execute data"); 605 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 606 607 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 608 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 609 try_call([&] { 610 m_delegate(); 611 }).on_completion([&] { 612 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 613 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 614 }); 615 616 finalize(); 617 return nullptr; 618 } 619 d1::task* cancel(d1::execution_data&) override { 620 finalize(); 621 return nullptr; 622 } 623 void finalize() { 624 m_wait_ctx.release(); // must precede the wakeup 625 m_monitor.notify([this](std::uintptr_t ctx) { 626 return ctx == std::uintptr_t(&m_delegate); 627 }); // do not relax, it needs a fence! 628 m_completed.store(true, std::memory_order_release); 629 } 630 public: 631 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 632 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 633 ~delegated_task() { 634 // The destructor can be called earlier than the m_monitor is notified 635 // because the waiting thread can be released after m_wait_ctx.release_wait. 636 // To close that race we wait for the m_completed signal. 637 spin_wait_until_eq(m_completed, true); 638 } 639 }; 640 641 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 642 arena* a = ta.my_arena.load(std::memory_order_relaxed); 643 __TBB_ASSERT(a != nullptr, nullptr); 644 thread_data* td = governor::get_thread_data(); 645 646 bool same_arena = td->my_arena == a; 647 std::size_t index1 = td->my_arena_index; 648 if (!same_arena) { 649 index1 = a->occupy_free_slot</*as_worker */false>(*td); 650 if (index1 == arena::out_of_arena) { 651 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 652 d1::wait_context wo(1); 653 d1::task_group_context exec_context(d1::task_group_context::isolated); 654 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 655 656 delegated_task dt(d, a->my_exit_monitors, wo); 657 a->enqueue_task( dt, exec_context, *td); 658 size_t index2 = arena::out_of_arena; 659 do { 660 a->my_exit_monitors.prepare_wait(waiter); 661 if (!wo.continue_execution()) { 662 a->my_exit_monitors.cancel_wait(waiter); 663 break; 664 } 665 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 666 if (index2 != arena::out_of_arena) { 667 a->my_exit_monitors.cancel_wait(waiter); 668 nested_arena_context scope(*td, *a, index2 ); 669 r1::wait(wo, exec_context); 670 __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred 671 break; 672 } 673 a->my_exit_monitors.commit_wait(waiter); 674 } while (wo.continue_execution()); 675 if (index2 == arena::out_of_arena) { 676 // notify a waiting thread even if this thread did not enter arena, 677 // in case it was woken by a leaving thread but did not need to enter 678 a->my_exit_monitors.notify_one(); // do not relax! 679 } 680 // process possible exception 681 if (exec_context.my_exception) { 682 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 683 exec_context.my_exception->throw_self(); 684 } 685 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 686 return; 687 } // if (index1 == arena::out_of_arena) 688 } // if (!same_arena) 689 690 context_guard_helper</*report_tasks=*/false> context_guard; 691 context_guard.set_ctx(a->my_default_ctx); 692 nested_arena_context scope(*td, *a, index1); 693 #if _WIN64 694 try { 695 #endif 696 d(); 697 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 698 #if _WIN64 699 } catch (...) { 700 context_guard.restore_default(); 701 throw; 702 } 703 #endif 704 } 705 706 void task_arena_impl::wait(d1::task_arena_base& ta) { 707 arena* a = ta.my_arena.load(std::memory_order_relaxed); 708 __TBB_ASSERT(a != nullptr, nullptr); 709 thread_data* td = governor::get_thread_data(); 710 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 711 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 712 if (a->my_max_num_workers != 0) { 713 while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) { 714 yield(); 715 } 716 } 717 } 718 719 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 720 arena* a = nullptr; 721 if( ta ) // for special cases of ta->max_concurrency() 722 a = ta->my_arena.load(std::memory_order_relaxed); 723 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 724 a = td->my_arena; // the current arena if any 725 726 if( a ) { // Get parameters from the arena 727 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL ); 728 return a->my_num_reserved_slots + a->my_max_num_workers 729 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 730 + (a->my_local_concurrency_flag.test() ? 1 : 0) 731 #endif 732 ; 733 } 734 735 if (ta && ta->my_max_concurrency == 1) { 736 return 1; 737 } 738 739 #if __TBB_ARENA_BINDING 740 if (ta) { 741 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 742 d1::constraints arena_constraints = d1::constraints{} 743 .set_numa_id(ta->my_numa_id) 744 .set_core_type(ta->core_type()) 745 .set_max_threads_per_core(ta->max_threads_per_core()); 746 return (int)default_concurrency(arena_constraints); 747 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 748 return (int)default_concurrency(ta->my_numa_id); 749 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 750 } 751 #endif /*!__TBB_ARENA_BINDING*/ 752 753 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL ); 754 return int(governor::default_num_threads()); 755 } 756 757 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 758 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 759 thread_data* tls = governor::get_thread_data(); 760 assert_pointers_valid(tls, tls->my_task_dispatcher); 761 task_dispatcher* dispatcher = tls->my_task_dispatcher; 762 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 763 try_call([&] { 764 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 765 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 766 // Save the current isolation value and set new one 767 previous_isolation = dispatcher->set_isolation(current_isolation); 768 // Isolation within this callable 769 d(); 770 }).on_completion([&] { 771 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL); 772 dispatcher->set_isolation(previous_isolation); 773 }); 774 } 775 776 } // namespace r1 777 } // namespace detail 778 } // namespace tbb 779 780