1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "arena.h" 20 #include "itt_notify.h" 21 #include "semaphore.h" 22 #include "waiters.h" 23 #include "oneapi/tbb/detail/_task.h" 24 #include "oneapi/tbb/info.h" 25 #include "oneapi/tbb/tbb_allocator.h" 26 27 #include <atomic> 28 #include <cstring> 29 #include <functional> 30 31 namespace tbb { 32 namespace detail { 33 namespace r1 { 34 35 #if __TBB_ARENA_BINDING 36 class numa_binding_observer : public tbb::task_scheduler_observer { 37 binding_handler* my_binding_handler; 38 public: 39 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) 40 : task_scheduler_observer(*ta) 41 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core)) 42 {} 43 44 void on_scheduler_entry( bool ) override { 45 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 46 } 47 48 void on_scheduler_exit( bool ) override { 49 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 50 } 51 52 ~numa_binding_observer(){ 53 destroy_binding_handler(my_binding_handler); 54 } 55 }; 56 57 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) { 58 numa_binding_observer* binding_observer = nullptr; 59 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) { 60 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core); 61 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 62 binding_observer->observe(true); 63 } 64 return binding_observer; 65 } 66 67 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 68 __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer"); 69 binding_observer->observe(false); 70 binding_observer->~numa_binding_observer(); 71 deallocate_memory(binding_observer); 72 } 73 #endif /*!__TBB_ARENA_BINDING*/ 74 75 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 76 if ( lower >= upper ) return out_of_arena; 77 // Start search for an empty slot from the one we occupied the last time 78 std::size_t index = tls.my_arena_index; 79 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 80 __TBB_ASSERT( index >= lower && index < upper, NULL ); 81 // Find a free slot 82 for ( std::size_t i = index; i < upper; ++i ) 83 if (my_slots[i].try_occupy()) return i; 84 for ( std::size_t i = lower; i < index; ++i ) 85 if (my_slots[i].try_occupy()) return i; 86 return out_of_arena; 87 } 88 89 template <bool as_worker> 90 std::size_t arena::occupy_free_slot(thread_data& tls) { 91 // Firstly, external threads try to occupy reserved slots 92 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 93 if ( index == out_of_arena ) { 94 // Secondly, all threads try to occupy all non-reserved slots 95 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 96 // Likely this arena is already saturated 97 if ( index == out_of_arena ) 98 return out_of_arena; 99 } 100 101 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 102 return index; 103 } 104 105 std::uintptr_t arena::calculate_stealing_threshold() { 106 stack_anchor_type anchor; 107 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size()); 108 } 109 110 void arena::process(thread_data& tls) { 111 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 112 __TBB_ASSERT( is_alive(my_guard), nullptr); 113 __TBB_ASSERT( my_num_slots > 1, nullptr); 114 115 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 116 if (index == out_of_arena) { 117 on_thread_leaving<ref_worker>(); 118 return; 119 } 120 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 121 tls.attach_arena(*this, index); 122 123 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 124 task_disp.set_stealing_threshold(calculate_stealing_threshold()); 125 __TBB_ASSERT(task_disp.can_steal(), nullptr); 126 tls.attach_task_dispatcher(task_disp); 127 128 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 129 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 130 131 // Waiting on special object tied to this arena 132 outermost_worker_waiter waiter(*this); 133 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 134 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 135 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 136 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 137 138 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 139 tls.my_last_observer = nullptr; 140 141 task_disp.set_stealing_threshold(0); 142 tls.detach_task_dispatcher(); 143 144 // Arena slot detach (arena may be used in market::process) 145 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 146 tls.my_arena_slot->release(); 147 tls.my_arena_slot = nullptr; 148 tls.my_inbox.detach(); 149 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 150 __TBB_ASSERT(is_alive(my_guard), nullptr); 151 152 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 153 // that arena may be temporarily left unpopulated by threads. See comments in 154 // arena::on_thread_leaving() for more details. 155 on_thread_leaving<ref_worker>(); 156 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 157 } 158 159 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level ) 160 { 161 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 162 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 163 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 164 my_market = &m; 165 my_limit = 1; 166 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks). 167 my_num_slots = num_arena_slots(num_slots); 168 my_num_reserved_slots = num_reserved_slots; 169 my_max_num_workers = num_slots-num_reserved_slots; 170 my_priority_level = priority_level; 171 my_references = ref_external; // accounts for the external thread 172 my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed); 173 my_observers.my_arena = this; 174 my_co_cache.init(4 * num_slots); 175 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL ); 176 // Initialize the default context. It should be allocated before task_dispatch construction. 177 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 178 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 179 // Construct slots. Mark internal synchronization elements for the tools. 180 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 181 for( unsigned i = 0; i < my_num_slots; ++i ) { 182 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL ); 183 __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL ); 184 __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL ); 185 mailbox(i).construct(); 186 my_slots[i].init_task_streams(i); 187 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 188 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 189 } 190 my_fifo_task_stream.initialize(my_num_slots); 191 my_resume_task_stream.initialize(my_num_slots); 192 #if __TBB_PREVIEW_CRITICAL_TASKS 193 my_critical_task_stream.initialize(my_num_slots); 194 #endif 195 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 196 my_local_concurrency_requests = 0; 197 my_local_concurrency_flag.clear(); 198 my_global_concurrency_mode.store(false, std::memory_order_relaxed); 199 #endif 200 } 201 202 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, 203 unsigned priority_level ) 204 { 205 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 206 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 207 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 208 std::size_t n = allocation_size(num_arena_slots(num_slots)); 209 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 210 // Zero all slots to indicate that they are empty 211 std::memset( storage, 0, n ); 212 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) 213 arena(m, num_slots, num_reserved_slots, priority_level); 214 } 215 216 void arena::free_arena () { 217 __TBB_ASSERT( is_alive(my_guard), NULL ); 218 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 219 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 220 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, 221 "Inconsistent state of a dying arena" ); 222 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 223 __TBB_ASSERT( !my_global_concurrency_mode, NULL ); 224 #endif 225 poison_value( my_guard ); 226 std::intptr_t drained = 0; 227 for ( unsigned i = 0; i < my_num_slots; ++i ) { 228 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 229 // TODO: understand the assertion and modify 230 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL ); 231 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty 232 my_slots[i].free_task_pool(); 233 drained += mailbox(i).drain(); 234 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 235 } 236 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 237 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 238 // Cleanup coroutines/schedulers cache 239 my_co_cache.cleanup(); 240 my_default_ctx->~task_group_context(); 241 cache_aligned_deallocate(my_default_ctx); 242 #if __TBB_PREVIEW_CRITICAL_TASKS 243 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 244 #endif 245 // remove an internal reference 246 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); 247 if ( !my_observers.empty() ) { 248 my_observers.clear(); 249 } 250 void* storage = &mailbox(my_num_slots-1); 251 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL ); 252 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL ); 253 this->~arena(); 254 #if TBB_USE_ASSERT > 1 255 std::memset( storage, 0, allocation_size(my_num_slots) ); 256 #endif /* TBB_USE_ASSERT */ 257 cache_aligned_deallocate( storage ); 258 } 259 260 bool arena::has_enqueued_tasks() { 261 return !my_fifo_task_stream.empty(); 262 } 263 264 bool arena::is_out_of_work() { 265 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 266 if (my_local_concurrency_flag.try_clear_if([this] { 267 return !has_enqueued_tasks(); 268 })) { 269 my_market->adjust_demand(*this, /* delta = */ -1, /* mandatory = */ true); 270 } 271 #endif 272 273 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 274 switch (my_pool_state.load(std::memory_order_acquire)) { 275 case SNAPSHOT_EMPTY: 276 return true; 277 case SNAPSHOT_FULL: { 278 // Use unique id for "busy" in order to avoid ABA problems. 279 const pool_state_t busy = pool_state_t(&busy); 280 // Helper for CAS execution 281 pool_state_t expected_state; 282 283 // Request permission to take snapshot 284 expected_state = SNAPSHOT_FULL; 285 if (my_pool_state.compare_exchange_strong(expected_state, busy)) { 286 // Got permission. Take the snapshot. 287 // NOTE: This is not a lock, as the state can be set to FULL at 288 // any moment by a thread that spawns/enqueues new task. 289 std::size_t n = my_limit.load(std::memory_order_acquire); 290 // Make local copies of volatile parameters. Their change during 291 // snapshot taking procedure invalidates the attempt, and returns 292 // this thread into the dispatch loop. 293 std::size_t k; 294 for (k = 0; k < n; ++k) { 295 if (my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool && 296 my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed)) 297 { 298 // k-th primary task pool is nonempty and does contain tasks. 299 break; 300 } 301 if (my_pool_state.load(std::memory_order_acquire) != busy) 302 return false; // the work was published 303 } 304 bool work_absent = k == n; 305 // Test and test-and-set. 306 if (my_pool_state.load(std::memory_order_acquire) == busy) { 307 bool no_stream_tasks = !has_enqueued_tasks() && my_resume_task_stream.empty(); 308 #if __TBB_PREVIEW_CRITICAL_TASKS 309 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty(); 310 #endif 311 work_absent = work_absent && no_stream_tasks; 312 if (work_absent) { 313 // save current demand value before setting SNAPSHOT_EMPTY, 314 // to avoid race with advertise_new_work. 315 int current_demand = (int)my_max_num_workers; 316 expected_state = busy; 317 if (my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_EMPTY)) { 318 // This thread transitioned pool to empty state, and thus is 319 // responsible for telling the market that there is no work to do. 320 my_market->adjust_demand(*this, -current_demand, /* mandatory = */ false); 321 return true; 322 } 323 return false; 324 } 325 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. 326 expected_state = busy; 327 my_pool_state.compare_exchange_strong(expected_state, SNAPSHOT_FULL); 328 } 329 } 330 return false; 331 } 332 default: 333 // Another thread is taking a snapshot. 334 return false; 335 } 336 } 337 338 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 339 task_group_context_impl::bind_to(ctx, &td); 340 task_accessor::context(t) = &ctx; 341 task_accessor::isolation(t) = no_isolation; 342 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 343 advertise_new_work<work_enqueued>(); 344 } 345 346 } // namespace r1 347 } // namespace detail 348 } // namespace tbb 349 350 // Enable task_arena.h 351 #include "oneapi/tbb/task_arena.h" // task_arena_base 352 353 namespace tbb { 354 namespace detail { 355 namespace r1 { 356 357 #if TBB_USE_ASSERT 358 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 359 bool is_arena_priority_correct = 360 a_priority == tbb::task_arena::priority::high || 361 a_priority == tbb::task_arena::priority::normal || 362 a_priority == tbb::task_arena::priority::low; 363 __TBB_ASSERT( is_arena_priority_correct, 364 "Task arena priority should be equal to one of the predefined values." ); 365 } 366 #else 367 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 368 #endif 369 370 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 371 assert_arena_priority_valid( a_priority ); 372 return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 373 } 374 375 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 376 auto priority = tbb::task_arena::priority( 377 (market::num_priority_levels - priority_level) * d1::priority_stride 378 ); 379 assert_arena_priority_valid( priority ); 380 return priority; 381 } 382 383 struct task_arena_impl { 384 static void initialize(d1::task_arena_base&); 385 static void terminate(d1::task_arena_base&); 386 static bool attach(d1::task_arena_base&); 387 static void execute(d1::task_arena_base&, d1::delegate_base&); 388 static void wait(d1::task_arena_base&); 389 static int max_concurrency(const d1::task_arena_base*); 390 static void enqueue(d1::task&, d1::task_arena_base*); 391 }; 392 393 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 394 task_arena_impl::initialize(ta); 395 } 396 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 397 task_arena_impl::terminate(ta); 398 } 399 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 400 return task_arena_impl::attach(ta); 401 } 402 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 403 task_arena_impl::execute(ta, d); 404 } 405 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 406 task_arena_impl::wait(ta); 407 } 408 409 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 410 return task_arena_impl::max_concurrency(ta); 411 } 412 413 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 414 task_arena_impl::enqueue(t, ta); 415 } 416 417 void task_arena_impl::initialize(d1::task_arena_base& ta) { 418 governor::one_time_init(); 419 if (ta.my_max_concurrency < 1) { 420 #if __TBB_ARENA_BINDING 421 422 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 423 d1::constraints arena_constraints = d1::constraints{} 424 .set_core_type(ta.core_type()) 425 .set_max_threads_per_core(ta.max_threads_per_core()) 426 .set_numa_id(ta.my_numa_id); 427 ta.my_max_concurrency = (int)default_concurrency(arena_constraints); 428 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 429 ta.my_max_concurrency = (int)default_concurrency(ta.my_numa_id); 430 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 431 432 #else /*!__TBB_ARENA_BINDING*/ 433 ta.my_max_concurrency = (int)governor::default_num_threads(); 434 #endif /*!__TBB_ARENA_BINDING*/ 435 } 436 437 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized"); 438 unsigned priority_level = arena_priority_level(ta.my_priority); 439 arena* a = market::create_arena(ta.my_max_concurrency, ta.my_num_reserved_slots, priority_level, /* stack_size = */ 0); 440 ta.my_arena.store(a, std::memory_order_release); 441 // add an internal market reference; a public reference was added in create_arena 442 market::global_market( /*is_public=*/false); 443 #if __TBB_ARENA_BINDING 444 a->my_numa_binding_observer = construct_binding_observer( 445 static_cast<d1::task_arena*>(&ta), a->my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core()); 446 #endif /*__TBB_ARENA_BINDING*/ 447 } 448 449 void task_arena_impl::terminate(d1::task_arena_base& ta) { 450 arena* a = ta.my_arena.load(std::memory_order_relaxed); 451 assert_pointer_valid(a); 452 #if __TBB_ARENA_BINDING 453 if(a->my_numa_binding_observer != nullptr ) { 454 destroy_binding_observer(a->my_numa_binding_observer); 455 a->my_numa_binding_observer = nullptr; 456 } 457 #endif /*__TBB_ARENA_BINDING*/ 458 a->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); 459 a->on_thread_leaving<arena::ref_external>(); 460 ta.my_arena.store(nullptr, std::memory_order_relaxed); 461 } 462 463 bool task_arena_impl::attach(d1::task_arena_base& ta) { 464 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr); 465 thread_data* td = governor::get_thread_data_if_initialized(); 466 if( td && td->my_arena ) { 467 arena* a = td->my_arena; 468 // There is an active arena to attach to. 469 // It's still used by s, so won't be destroyed right away. 470 __TBB_ASSERT(a->my_references > 0, NULL ); 471 a->my_references += arena::ref_external; 472 ta.my_num_reserved_slots = a->my_num_reserved_slots; 473 ta.my_priority = arena_priority(a->my_priority_level); 474 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers; 475 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == a->my_num_slots, NULL); 476 ta.my_arena.store(a, std::memory_order_release); 477 // increases market's ref count for task_arena 478 market::global_market( /*is_public=*/true ); 479 return true; 480 } 481 return false; 482 } 483 484 void task_arena_impl::enqueue(d1::task& t, d1::task_arena_base* ta) { 485 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 486 arena* a = ta->my_arena.load(std::memory_order_relaxed); 487 assert_pointers_valid(ta, a, a->my_default_ctx, td); 488 // Is there a better place for checking the state of my_default_ctx? 489 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(), 490 "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?"); 491 a->enqueue_task(t, *a->my_default_ctx, *td); 492 } 493 494 class nested_arena_context : no_copy { 495 public: 496 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 497 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 498 { 499 if (td.my_arena != &nested_arena) { 500 m_orig_arena = td.my_arena; 501 m_orig_slot_index = td.my_arena_index; 502 m_orig_last_observer = td.my_last_observer; 503 504 td.detach_task_dispatcher(); 505 td.attach_arena(nested_arena, slot_index); 506 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 507 task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold); 508 td.attach_task_dispatcher(task_disp); 509 510 // If the calling thread occupies the slots out of external thread reserve we need to notify the 511 // market that this arena requires one worker less. 512 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 513 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ -1, /* mandatory = */ false); 514 } 515 516 td.my_last_observer = nullptr; 517 // The task_arena::execute method considers each calling thread as an external thread. 518 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 519 } 520 521 m_task_dispatcher = td.my_task_dispatcher; 522 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 523 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 524 m_task_dispatcher->m_properties.critical_task_allowed = true; 525 526 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 527 ed_ext.context = td.my_arena->my_default_ctx; 528 ed_ext.original_slot = td.my_arena_index; 529 ed_ext.affinity_slot = d1::no_slot; 530 ed_ext.task_disp = td.my_task_dispatcher; 531 ed_ext.isolation = no_isolation; 532 533 __TBB_ASSERT(td.my_arena_slot, nullptr); 534 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 535 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 536 } 537 ~nested_arena_context() { 538 thread_data& td = *m_task_dispatcher->m_thread_data; 539 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 540 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 541 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 542 if (m_orig_arena) { 543 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 544 td.my_last_observer = m_orig_last_observer; 545 546 // Notify the market that this thread releasing a one slot 547 // that can be used by a worker thread. 548 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 549 td.my_arena->my_market->adjust_demand(*td.my_arena, /* delta = */ 1, /* mandatory = */ false); 550 } 551 552 td.my_task_dispatcher->set_stealing_threshold(0); 553 td.detach_task_dispatcher(); 554 td.my_arena_slot->release(); 555 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 556 557 td.attach_arena(*m_orig_arena, m_orig_slot_index); 558 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 559 } 560 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 561 } 562 563 private: 564 execution_data_ext m_orig_execute_data_ext{}; 565 arena* m_orig_arena{ nullptr }; 566 observer_proxy* m_orig_last_observer{ nullptr }; 567 task_dispatcher* m_task_dispatcher{ nullptr }; 568 unsigned m_orig_slot_index{}; 569 bool m_orig_fifo_tasks_allowed{}; 570 bool m_orig_critical_task_allowed{}; 571 }; 572 573 class delegated_task : public d1::task { 574 d1::delegate_base& m_delegate; 575 concurrent_monitor& m_monitor; 576 d1::wait_context& m_wait_ctx; 577 std::atomic<bool> m_completed; 578 d1::task* execute(d1::execution_data& ed) override { 579 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 580 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 581 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 582 "The execute data shall point to the current task dispatcher execute data"); 583 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 584 585 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 586 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 587 try_call([&] { 588 m_delegate(); 589 }).on_completion([&] { 590 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 591 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 592 }); 593 594 finalize(); 595 return nullptr; 596 } 597 d1::task* cancel(d1::execution_data&) override { 598 finalize(); 599 return nullptr; 600 } 601 void finalize() { 602 m_wait_ctx.release(); // must precede the wakeup 603 m_monitor.notify([this](std::uintptr_t ctx) { 604 return ctx == std::uintptr_t(&m_delegate); 605 }); // do not relax, it needs a fence! 606 m_completed.store(true, std::memory_order_release); 607 } 608 public: 609 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 610 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 611 ~delegated_task() { 612 // The destructor can be called earlier than the m_monitor is notified 613 // because the waiting thread can be released after m_wait_ctx.release_wait. 614 // To close that race we wait for the m_completed signal. 615 spin_wait_until_eq(m_completed, true); 616 } 617 }; 618 619 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 620 arena* a = ta.my_arena.load(std::memory_order_relaxed); 621 __TBB_ASSERT(a != nullptr, nullptr); 622 thread_data* td = governor::get_thread_data(); 623 624 bool same_arena = td->my_arena == a; 625 std::size_t index1 = td->my_arena_index; 626 if (!same_arena) { 627 index1 = a->occupy_free_slot</*as_worker */false>(*td); 628 if (index1 == arena::out_of_arena) { 629 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 630 d1::wait_context wo(1); 631 d1::task_group_context exec_context(d1::task_group_context::isolated); 632 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx); 633 634 delegated_task dt(d, a->my_exit_monitors, wo); 635 a->enqueue_task( dt, exec_context, *td); 636 size_t index2 = arena::out_of_arena; 637 do { 638 a->my_exit_monitors.prepare_wait(waiter); 639 if (!wo.continue_execution()) { 640 a->my_exit_monitors.cancel_wait(waiter); 641 break; 642 } 643 index2 = a->occupy_free_slot</*as_worker*/false>(*td); 644 if (index2 != arena::out_of_arena) { 645 a->my_exit_monitors.cancel_wait(waiter); 646 nested_arena_context scope(*td, *a, index2 ); 647 r1::wait(wo, exec_context); 648 __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred 649 break; 650 } 651 a->my_exit_monitors.commit_wait(waiter); 652 } while (wo.continue_execution()); 653 if (index2 == arena::out_of_arena) { 654 // notify a waiting thread even if this thread did not enter arena, 655 // in case it was woken by a leaving thread but did not need to enter 656 a->my_exit_monitors.notify_one(); // do not relax! 657 } 658 // process possible exception 659 if (exec_context.my_exception) { 660 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 661 exec_context.my_exception->throw_self(); 662 } 663 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 664 return; 665 } // if (index1 == arena::out_of_arena) 666 } // if (!same_arena) 667 668 context_guard_helper</*report_tasks=*/false> context_guard; 669 context_guard.set_ctx(a->my_default_ctx); 670 nested_arena_context scope(*td, *a, index1); 671 #if _WIN64 672 try { 673 #endif 674 d(); 675 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 676 #if _WIN64 677 } catch (...) { 678 context_guard.restore_default(); 679 throw; 680 } 681 #endif 682 } 683 684 void task_arena_impl::wait(d1::task_arena_base& ta) { 685 arena* a = ta.my_arena.load(std::memory_order_relaxed); 686 __TBB_ASSERT(a != nullptr, nullptr); 687 thread_data* td = governor::get_thread_data(); 688 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 689 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 690 if (a->my_max_num_workers != 0) { 691 while (a->num_workers_active() || a->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) { 692 yield(); 693 } 694 } 695 } 696 697 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 698 arena* a = nullptr; 699 if( ta ) // for special cases of ta->max_concurrency() 700 a = ta->my_arena.load(std::memory_order_relaxed); 701 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 702 a = td->my_arena; // the current arena if any 703 704 if( a ) { // Get parameters from the arena 705 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL ); 706 return a->my_num_reserved_slots + a->my_max_num_workers 707 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 708 + (a->my_local_concurrency_flag.test() ? 1 : 0) 709 #endif 710 ; 711 } 712 713 if (ta && ta->my_max_concurrency == 1) { 714 return 1; 715 } 716 717 #if __TBB_ARENA_BINDING 718 if (ta) { 719 #if __TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT 720 d1::constraints arena_constraints = d1::constraints{} 721 .set_numa_id(ta->my_numa_id) 722 .set_core_type(ta->core_type()) 723 .set_max_threads_per_core(ta->max_threads_per_core()); 724 return (int)default_concurrency(arena_constraints); 725 #else /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 726 return (int)default_concurrency(ta->my_numa_id); 727 #endif /*!__TBB_PREVIEW_TASK_ARENA_CONSTRAINTS_EXTENSION_PRESENT*/ 728 } 729 #endif /*!__TBB_ARENA_BINDING*/ 730 731 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL ); 732 return int(governor::default_num_threads()); 733 } 734 735 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 736 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 737 thread_data* tls = governor::get_thread_data(); 738 assert_pointers_valid(tls, tls->my_task_dispatcher); 739 task_dispatcher* dispatcher = tls->my_task_dispatcher; 740 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 741 try_call([&] { 742 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 743 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 744 // Save the current isolation value and set new one 745 previous_isolation = dispatcher->set_isolation(current_isolation); 746 // Isolation within this callable 747 d(); 748 }).on_completion([&] { 749 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL); 750 dispatcher->set_isolation(previous_isolation); 751 }); 752 } 753 754 } // namespace r1 755 } // namespace detail 756 } // namespace tbb 757 758