1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "task_dispatcher.h" 18 #include "governor.h" 19 #include "arena.h" 20 #include "itt_notify.h" 21 #include "semaphore.h" 22 #include "waiters.h" 23 #include "oneapi/tbb/detail/_task.h" 24 #include "oneapi/tbb/tbb_allocator.h" 25 26 #include <atomic> 27 #include <cstring> 28 #include <functional> 29 30 namespace tbb { 31 namespace detail { 32 namespace r1 { 33 34 #if __TBB_NUMA_SUPPORT 35 class numa_binding_observer : public tbb::task_scheduler_observer { 36 int my_numa_node_id; 37 binding_handler* my_binding_handler; 38 public: 39 numa_binding_observer( d1::task_arena* ta, int numa_id, int num_slots ) 40 : task_scheduler_observer(*ta) 41 , my_numa_node_id(numa_id) 42 , my_binding_handler(construct_binding_handler(num_slots)) 43 {} 44 45 void on_scheduler_entry( bool ) override { 46 bind_thread_to_node( my_binding_handler, this_task_arena::current_thread_index(), my_numa_node_id); 47 } 48 49 void on_scheduler_exit( bool ) override { 50 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index()); 51 } 52 53 ~numa_binding_observer(){ 54 destroy_binding_handler(my_binding_handler); 55 } 56 }; 57 58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int numa_id, int num_slots ) { 59 numa_binding_observer* binding_observer = nullptr; 60 // numa_topology initialization will be lazily performed inside nodes_count() call 61 if (numa_id >= 0 && numa_node_count() > 1) { 62 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, numa_id, num_slots); 63 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction"); 64 binding_observer->observe(true); 65 } 66 return binding_observer; 67 } 68 69 void destroy_binding_observer( numa_binding_observer* binding_observer ) { 70 __TBB_ASSERT(binding_observer, "Trying to deallocate NULL pointer"); 71 binding_observer->observe(false); 72 binding_observer->~numa_binding_observer(); 73 deallocate_memory(binding_observer); 74 } 75 #endif 76 77 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) { 78 if ( lower >= upper ) return out_of_arena; 79 // Start search for an empty slot from the one we occupied the last time 80 std::size_t index = tls.my_arena_index; 81 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower; 82 __TBB_ASSERT( index >= lower && index < upper, NULL ); 83 // Find a free slot 84 for ( std::size_t i = index; i < upper; ++i ) 85 if (my_slots[i].try_occupy()) return i; 86 for ( std::size_t i = lower; i < index; ++i ) 87 if (my_slots[i].try_occupy()) return i; 88 return out_of_arena; 89 } 90 91 template <bool as_worker> 92 std::size_t arena::occupy_free_slot(thread_data& tls) { 93 // Firstly, masters try to occupy reserved slots 94 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots ); 95 if ( index == out_of_arena ) { 96 // Secondly, all threads try to occupy all non-reserved slots 97 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots ); 98 // Likely this arena is already saturated 99 if ( index == out_of_arena ) 100 return out_of_arena; 101 } 102 103 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() ); 104 return index; 105 } 106 107 std::uintptr_t arena::calculate_stealing_threshold() { 108 stack_anchor_type anchor; 109 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_market->worker_stack_size()); 110 } 111 112 void arena::process(thread_data& tls) { 113 governor::set_thread_data(tls); // TODO: consider moving to create_one_job. 114 __TBB_ASSERT( is_alive(my_guard), nullptr); 115 __TBB_ASSERT( my_num_slots > 1, nullptr); 116 117 std::size_t index = occupy_free_slot</*as_worker*/true>(tls); 118 if (index == out_of_arena) { 119 on_thread_leaving<ref_worker>(); 120 return; 121 } 122 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" ); 123 tls.attach_arena(*this, index); 124 125 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher(); 126 task_disp.set_stealing_threshold(calculate_stealing_threshold()); 127 __TBB_ASSERT(task_disp.can_steal(), nullptr); 128 tls.attach_task_dispatcher(task_disp); 129 130 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" ); 131 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker); 132 133 // Waiting on special object tied to this arena 134 outermost_worker_waiter waiter(*this); 135 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter); 136 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task"); 137 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr); 138 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr); 139 140 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker); 141 tls.my_last_observer = nullptr; 142 143 task_disp.set_stealing_threshold(0); 144 tls.detach_task_dispatcher(); 145 146 // Arena slot detach (arena may be used in market::process) 147 // TODO: Consider moving several calls below into a new method(e.g.detach_arena). 148 tls.my_arena_slot->release(); 149 tls.my_arena_slot = nullptr; 150 tls.my_inbox.detach(); 151 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr); 152 __TBB_ASSERT(is_alive(my_guard), nullptr); 153 154 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible 155 // that arena may be temporarily left unpopulated by threads. See comments in 156 // arena::on_thread_leaving() for more details. 157 on_thread_leaving<ref_worker>(); 158 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join"); 159 } 160 161 arena::arena ( market& m, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level ) 162 { 163 __TBB_ASSERT( !my_guard, "improperly allocated arena?" ); 164 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" ); 165 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" ); 166 my_market = &m; 167 my_limit = 1; 168 // Two slots are mandatory: for the master, and for 1 worker (required to support starvation resistant tasks). 169 my_num_slots = num_arena_slots(num_slots); 170 my_num_reserved_slots = num_reserved_slots; 171 my_max_num_workers = num_slots-num_reserved_slots; 172 my_priority_level = priority_level; 173 my_references = ref_external; // accounts for the master 174 my_aba_epoch = m.my_arenas_aba_epoch.load(std::memory_order_relaxed); 175 my_observers.my_arena = this; 176 my_co_cache.init(4 * num_slots); 177 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, NULL ); 178 // Initialize the default context. It should be allocated before task_dispatch construction. 179 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context))) 180 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings }; 181 // Construct slots. Mark internal synchronization elements for the tools. 182 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots); 183 for( unsigned i = 0; i < my_num_slots; ++i ) { 184 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, NULL ); 185 __TBB_ASSERT( !my_slots[i].task_pool_ptr, NULL ); 186 __TBB_ASSERT( !my_slots[i].my_task_pool_size, NULL ); 187 mailbox(i).construct(); 188 my_slots[i].init_task_streams(i); 189 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this); 190 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed); 191 } 192 my_fifo_task_stream.initialize(my_num_slots); 193 my_resume_task_stream.initialize(my_num_slots); 194 #if __TBB_PREVIEW_CRITICAL_TASKS 195 my_critical_task_stream.initialize(my_num_slots); 196 #endif 197 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 198 my_local_concurrency_mode = false; 199 my_global_concurrency_mode.store(false, std::memory_order_relaxed); 200 #endif 201 } 202 203 arena& arena::allocate_arena( market& m, unsigned num_slots, unsigned num_reserved_slots, 204 unsigned priority_level ) 205 { 206 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" ); 207 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" ); 208 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" ); 209 std::size_t n = allocation_size(num_arena_slots(num_slots)); 210 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n); 211 // Zero all slots to indicate that they are empty 212 std::memset( storage, 0, n ); 213 return *new( storage + num_arena_slots(num_slots) * sizeof(mail_outbox) ) 214 arena(m, num_slots, num_reserved_slots, priority_level); 215 } 216 217 void arena::free_arena () { 218 __TBB_ASSERT( is_alive(my_guard), NULL ); 219 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" ); 220 __TBB_ASSERT( !my_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" ); 221 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, 222 "Inconsistent state of a dying arena" ); 223 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 224 __TBB_ASSERT( !my_global_concurrency_mode, NULL ); 225 #endif 226 poison_value( my_guard ); 227 std::intptr_t drained = 0; 228 for ( unsigned i = 0; i < my_num_slots; ++i ) { 229 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" ); 230 // TODO: understand the assertion and modify 231 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, NULL ); 232 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, NULL ); // TODO: replace by is_quiescent_local_task_pool_empty 233 my_slots[i].free_task_pool(); 234 drained += mailbox(i).drain(); 235 my_slots[i].my_default_task_dispatcher->~task_dispatcher(); 236 } 237 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed"); 238 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed"); 239 // Cleanup coroutines/schedulers cache 240 my_co_cache.cleanup(); 241 my_default_ctx->~task_group_context(); 242 cache_aligned_deallocate(my_default_ctx); 243 #if __TBB_PREVIEW_CRITICAL_TASKS 244 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed"); 245 #endif 246 // remove an internal reference 247 my_market->release( /*is_public=*/false, /*blocking_terminate=*/false ); 248 if ( !my_observers.empty() ) { 249 my_observers.clear(); 250 } 251 void* storage = &mailbox(my_num_slots-1); 252 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, NULL ); 253 __TBB_ASSERT( my_pool_state.load(std::memory_order_relaxed) == SNAPSHOT_EMPTY || !my_max_num_workers, NULL ); 254 this->~arena(); 255 #if TBB_USE_ASSERT > 1 256 std::memset( storage, 0, allocation_size(my_num_slots) ); 257 #endif /* TBB_USE_ASSERT */ 258 cache_aligned_deallocate( storage ); 259 } 260 261 bool arena::has_enqueued_tasks() { 262 return !my_fifo_task_stream.empty(); 263 } 264 265 bool arena::is_out_of_work() { 266 // TODO: rework it to return at least a hint about where a task was found; better if the task itself. 267 for(;;) { 268 switch( my_pool_state.load(std::memory_order_acquire) ) { 269 case SNAPSHOT_EMPTY: 270 return true; 271 case SNAPSHOT_FULL: { 272 // Use unique id for "busy" in order to avoid ABA problems. 273 const pool_state_t busy = pool_state_t(&busy); 274 // Helper for CAS execution 275 pool_state_t expected_state; 276 277 // Request permission to take snapshot 278 expected_state = SNAPSHOT_FULL; 279 if( my_pool_state.compare_exchange_strong( expected_state, busy ) ) { 280 // Got permission. Take the snapshot. 281 // NOTE: This is not a lock, as the state can be set to FULL at 282 // any moment by a thread that spawns/enqueues new task. 283 std::size_t n = my_limit.load(std::memory_order_acquire); 284 // Make local copies of volatile parameters. Their change during 285 // snapshot taking procedure invalidates the attempt, and returns 286 // this thread into the dispatch loop. 287 std::size_t k; 288 for( k=0; k<n; ++k ) { 289 if( my_slots[k].task_pool.load(std::memory_order_relaxed) != EmptyTaskPool && 290 my_slots[k].head.load(std::memory_order_relaxed) < my_slots[k].tail.load(std::memory_order_relaxed) ) 291 { 292 // k-th primary task pool is nonempty and does contain tasks. 293 break; 294 } 295 if( my_pool_state.load(std::memory_order_acquire)!=busy ) 296 return false; // the work was published 297 } 298 __TBB_ASSERT( k <= n, NULL ); 299 bool work_absent = k == n; 300 // Test and test-and-set. 301 if( my_pool_state.load(std::memory_order_acquire)==busy ) { 302 bool no_stream_tasks = my_fifo_task_stream.empty() && my_resume_task_stream.empty(); 303 #if __TBB_PREVIEW_CRITICAL_TASKS 304 no_stream_tasks = no_stream_tasks && my_critical_task_stream.empty(); 305 #endif 306 work_absent = work_absent && no_stream_tasks; 307 if( work_absent ) { 308 // save current demand value before setting SNAPSHOT_EMPTY, 309 // to avoid race with advertise_new_work. 310 int current_demand = (int)my_max_num_workers; 311 expected_state = busy; 312 if( my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_EMPTY ) ) { 313 // This thread transitioned pool to empty state, and thus is 314 // responsible for telling the market that there is no work to do. 315 my_market->adjust_demand( *this, -current_demand ); 316 return true; 317 } 318 return false; 319 } 320 // Undo previous transition SNAPSHOT_FULL-->busy, unless another thread undid it. 321 expected_state = busy; 322 my_pool_state.compare_exchange_strong( expected_state, SNAPSHOT_FULL ); 323 } 324 } 325 return false; 326 } 327 default: 328 // Another thread is taking a snapshot. 329 return false; 330 } 331 } 332 } 333 334 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) { 335 task_group_context_impl::bind_to(ctx, &td); 336 task_accessor::context(t) = &ctx; 337 task_accessor::isolation(t) = no_isolation; 338 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) ); 339 advertise_new_work<work_enqueued>(); 340 } 341 342 } // namespace r1 343 } // namespace detail 344 } // namespace tbb 345 346 // Enable task_arena.h 347 #include "oneapi/tbb/task_arena.h" // task_arena_base 348 349 namespace tbb { 350 namespace detail { 351 namespace r1 { 352 353 #if TBB_USE_ASSERT 354 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) { 355 bool is_arena_priority_correct = 356 a_priority == tbb::task_arena::priority::high || 357 a_priority == tbb::task_arena::priority::normal || 358 a_priority == tbb::task_arena::priority::low; 359 __TBB_ASSERT( is_arena_priority_correct, 360 "Task arena priority should be equal to one of the predefined values." ); 361 } 362 #else 363 void assert_arena_priority_valid( tbb::task_arena::priority ) {} 364 #endif 365 366 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) { 367 assert_arena_priority_valid( a_priority ); 368 return market::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride); 369 } 370 371 tbb::task_arena::priority arena_priority( unsigned priority_level ) { 372 auto priority = tbb::task_arena::priority( 373 (market::num_priority_levels - priority_level) * d1::priority_stride 374 ); 375 assert_arena_priority_valid( priority ); 376 return priority; 377 } 378 379 struct task_arena_impl { 380 static void initialize(d1::task_arena_base&); 381 static void terminate(d1::task_arena_base&); 382 static bool attach(d1::task_arena_base&); 383 static void execute(d1::task_arena_base&, d1::delegate_base&); 384 static void wait(d1::task_arena_base&); 385 static int max_concurrency(const d1::task_arena_base*); 386 static void enqueue(d1::task&, d1::task_arena_base*); 387 }; 388 389 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) { 390 task_arena_impl::initialize(ta); 391 } 392 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) { 393 task_arena_impl::terminate(ta); 394 } 395 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) { 396 return task_arena_impl::attach(ta); 397 } 398 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) { 399 task_arena_impl::execute(ta, d); 400 } 401 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) { 402 task_arena_impl::wait(ta); 403 } 404 405 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) { 406 return task_arena_impl::max_concurrency(ta); 407 } 408 409 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) { 410 task_arena_impl::enqueue(t, ta); 411 } 412 413 void task_arena_impl::initialize(d1::task_arena_base& ta) { 414 governor::one_time_init(); 415 if (ta.my_max_concurrency < 1) { 416 #if __TBB_NUMA_SUPPORT 417 ta.my_max_concurrency = numa_default_concurrency(ta.my_numa_id); 418 #else /*__TBB_NUMA_SUPPORT*/ 419 ta.my_max_concurrency = (int)governor::default_num_threads(); 420 #endif /*__TBB_NUMA_SUPPORT*/ 421 } 422 __TBB_ASSERT(ta.my_master_slots <= (unsigned)ta.my_max_concurrency, 423 "Number of slots reserved for master should not exceed arena concurrency"); 424 425 __TBB_ASSERT(ta.my_arena == nullptr, "Arena already initialized"); 426 unsigned priority_level = arena_priority_level(ta.my_priority); 427 ta.my_arena = market::create_arena(ta.my_max_concurrency, ta.my_master_slots, priority_level, 0); 428 // add an internal market reference; a public reference was added in create_arena 429 market::global_market( /*is_public=*/false); 430 #if __TBB_NUMA_SUPPORT 431 ta.my_arena->my_numa_binding_observer = construct_binding_observer( 432 static_cast<d1::task_arena*>(&ta), ta.my_numa_id, ta.my_arena->my_num_slots); 433 #endif /*__TBB_NUMA_SUPPORT*/ 434 } 435 436 void task_arena_impl::terminate(d1::task_arena_base& ta) { 437 assert_pointer_valid(ta.my_arena); 438 #if __TBB_NUMA_SUPPORT 439 if(ta.my_arena->my_numa_binding_observer != nullptr ) { 440 destroy_binding_observer(ta.my_arena->my_numa_binding_observer); 441 ta.my_arena->my_numa_binding_observer = nullptr; 442 } 443 #endif /*__TBB_NUMA_SUPPORT*/ 444 ta.my_arena->my_market->release( /*is_public=*/true, /*blocking_terminate=*/false ); 445 ta.my_arena->on_thread_leaving<arena::ref_external>(); 446 ta.my_arena = nullptr; 447 } 448 449 bool task_arena_impl::attach(d1::task_arena_base& ta) { 450 __TBB_ASSERT(!ta.my_arena, nullptr); 451 thread_data* td = governor::get_thread_data_if_initialized(); 452 if( td && td->my_arena ) { 453 ta.my_arena = td->my_arena; 454 // There is an active arena to attach to. 455 // It's still used by s, so won't be destroyed right away. 456 __TBB_ASSERT(ta.my_arena->my_references > 0, NULL ); 457 ta.my_arena->my_references += arena::ref_external; 458 ta.my_master_slots = ta.my_arena->my_num_reserved_slots; 459 ta.my_priority = arena_priority(ta.my_arena->my_priority_level); 460 ta.my_max_concurrency = ta.my_master_slots + ta.my_arena->my_max_num_workers; 461 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency) == ta.my_arena->my_num_slots, NULL); 462 // increases market's ref count for task_arena 463 market::global_market( /*is_public=*/true ); 464 return true; 465 } 466 return false; 467 } 468 469 void task_arena_impl::enqueue(d1::task& t, d1::task_arena_base* ta) { 470 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance 471 assert_pointers_valid(ta, ta->my_arena, ta->my_arena->my_default_ctx, td); 472 // Is there a better place for checking the state of my_default_ctx? 473 __TBB_ASSERT(!ta->my_arena->my_default_ctx->is_group_execution_cancelled(), 474 "The task will not be executed because default task_group_context of task_arena is cancelled. Has previously enqueued task thrown an exception?"); 475 ta->my_arena->enqueue_task(t, *ta->my_arena->my_default_ctx, *td); 476 } 477 478 class nested_arena_context : no_copy { 479 public: 480 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index) 481 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext) 482 { 483 if (td.my_arena != &nested_arena) { 484 m_orig_arena = td.my_arena; 485 m_orig_slot_index = td.my_arena_index; 486 m_orig_last_observer = td.my_last_observer; 487 488 td.detach_task_dispatcher(); 489 td.attach_arena(nested_arena, slot_index); 490 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 491 task_disp.set_stealing_threshold(m_orig_execute_data_ext.task_disp->m_stealing_threshold); 492 td.attach_task_dispatcher(task_disp); 493 494 // If the calling thread occupies the slots out of master reserve we need to notify the 495 // market that this arena requires one worker less. 496 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 497 td.my_arena->my_market->adjust_demand(*td.my_arena, -1); 498 } 499 500 td.my_last_observer = nullptr; 501 // The task_arena::execute method considers each calling thread as a master. 502 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false); 503 } 504 505 m_task_dispatcher = td.my_task_dispatcher; 506 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true); 507 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed; 508 m_task_dispatcher->m_properties.critical_task_allowed = true; 509 510 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext; 511 ed_ext.context = td.my_arena->my_default_ctx; 512 ed_ext.original_slot = td.my_arena_index; 513 ed_ext.affinity_slot = d1::no_slot; 514 ed_ext.task_disp = td.my_task_dispatcher; 515 ed_ext.isolation = no_isolation; 516 517 __TBB_ASSERT(td.my_arena_slot, nullptr); 518 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr); 519 __TBB_ASSERT(td.my_task_dispatcher, nullptr); 520 } 521 ~nested_arena_context() { 522 thread_data& td = *m_task_dispatcher->m_thread_data; 523 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr); 524 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed); 525 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed; 526 if (m_orig_arena) { 527 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false); 528 td.my_last_observer = m_orig_last_observer; 529 530 // Notify the market that this thread releasing a one slot 531 // that can be used by a worker thread. 532 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) { 533 td.my_arena->my_market->adjust_demand(*td.my_arena, 1); 534 } 535 536 td.my_task_dispatcher->set_stealing_threshold(0); 537 td.detach_task_dispatcher(); 538 td.my_arena_slot->release(); 539 td.my_arena->my_exit_monitors.notify_one(); // do not relax! 540 541 td.attach_arena(*m_orig_arena, m_orig_slot_index); 542 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp); 543 } 544 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext; 545 } 546 547 private: 548 execution_data_ext m_orig_execute_data_ext{}; 549 arena* m_orig_arena{ nullptr }; 550 observer_proxy* m_orig_last_observer{ nullptr }; 551 task_dispatcher* m_task_dispatcher{ nullptr }; 552 unsigned m_orig_slot_index{}; 553 bool m_orig_fifo_tasks_allowed{}; 554 bool m_orig_critical_task_allowed{}; 555 }; 556 557 class delegated_task : public d1::task { 558 d1::delegate_base& m_delegate; 559 concurrent_monitor& m_monitor; 560 d1::wait_context& m_wait_ctx; 561 std::atomic<bool> m_completed; 562 d1::task* execute(d1::execution_data& ed) override { 563 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed); 564 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext; 565 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed, 566 "The execute data shall point to the current task dispatcher execute data"); 567 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr); 568 569 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx; 570 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true); 571 try_call([&] { 572 m_delegate(); 573 }).on_completion([&] { 574 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext; 575 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed); 576 }); 577 578 finalize(); 579 return nullptr; 580 } 581 d1::task* cancel(d1::execution_data&) override { 582 finalize(); 583 return nullptr; 584 } 585 void finalize() { 586 m_wait_ctx.release(); // must precede the wakeup 587 m_monitor.notify([this](std::uintptr_t ctx) { 588 return ctx == std::uintptr_t(&m_delegate); 589 }); // do not relax, it needs a fence! 590 m_completed.store(true, std::memory_order_release); 591 } 592 public: 593 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo) 594 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{} 595 ~delegated_task() { 596 // The destructor can be called earlier than the m_monitor is notified 597 // because the waiting thread can be released after m_wait_ctx.release_wait. 598 // To close that race we wait for the m_completed signal. 599 spin_wait_until_eq(m_completed, true); 600 } 601 }; 602 603 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) { 604 __TBB_ASSERT(ta.my_arena != nullptr, nullptr); 605 thread_data* td = governor::get_thread_data(); 606 607 bool same_arena = td->my_arena == ta.my_arena; 608 std::size_t index1 = td->my_arena_index; 609 if (!same_arena) { 610 index1 = ta.my_arena->occupy_free_slot</*as_worker */false>(*td); 611 if (index1 == arena::out_of_arena) { 612 concurrent_monitor::thread_context waiter((std::uintptr_t)&d); 613 d1::wait_context wo(1); 614 d1::task_group_context exec_context(d1::task_group_context::isolated); 615 task_group_context_impl::copy_fp_settings(exec_context, *ta.my_arena->my_default_ctx); 616 617 delegated_task dt(d, ta.my_arena->my_exit_monitors, wo); 618 ta.my_arena->enqueue_task( dt, exec_context, *td); 619 size_t index2 = arena::out_of_arena; 620 do { 621 ta.my_arena->my_exit_monitors.prepare_wait(waiter); 622 if (!wo.continue_execution()) { 623 ta.my_arena->my_exit_monitors.cancel_wait(waiter); 624 break; 625 } 626 index2 = ta.my_arena->occupy_free_slot</*as_worker*/false>(*td); 627 if (index2 != arena::out_of_arena) { 628 ta.my_arena->my_exit_monitors.cancel_wait(waiter); 629 nested_arena_context scope(*td, *ta.my_arena, index2 ); 630 r1::wait(wo, exec_context); 631 __TBB_ASSERT(!exec_context.my_exception, NULL); // exception can be thrown above, not deferred 632 break; 633 } 634 ta.my_arena->my_exit_monitors.commit_wait(waiter); 635 } while (wo.continue_execution()); 636 if (index2 == arena::out_of_arena) { 637 // notify a waiting thread even if this thread did not enter arena, 638 // in case it was woken by a leaving thread but did not need to enter 639 ta.my_arena->my_exit_monitors.notify_one(); // do not relax! 640 } 641 // process possible exception 642 if (exec_context.my_exception) { 643 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled."); 644 exec_context.my_exception->throw_self(); 645 } 646 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr); 647 return; 648 } // if (index1 == arena::out_of_arena) 649 } // if (!same_arena) 650 651 context_guard_helper</*report_tasks=*/false> context_guard; 652 context_guard.set_ctx(ta.my_arena->my_default_ctx); 653 nested_arena_context scope(*td, *ta.my_arena, index1); 654 #if _WIN64 655 try { 656 #endif 657 d(); 658 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr); 659 #if _WIN64 660 } catch (...) { 661 context_guard.restore_default(); 662 throw; 663 } 664 #endif 665 } 666 667 void task_arena_impl::wait(d1::task_arena_base& ta) { 668 __TBB_ASSERT(ta.my_arena != nullptr, nullptr); 669 thread_data* td = governor::get_thread_data(); 670 __TBB_ASSERT_EX(td, "Scheduler is not initialized"); 671 __TBB_ASSERT(td->my_arena != ta.my_arena || td->my_arena_index == 0, "internal_wait is not supported within a worker context" ); 672 if (ta.my_arena->my_max_num_workers != 0) { 673 while (ta.my_arena->num_workers_active() || ta.my_arena->my_pool_state.load(std::memory_order_acquire) != arena::SNAPSHOT_EMPTY) { 674 yield(); 675 } 676 } 677 } 678 679 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) { 680 arena* a = nullptr; 681 if( ta ) // for special cases of ta->max_concurrency() 682 a = ta->my_arena; 683 else if( thread_data* td = governor::get_thread_data_if_initialized() ) 684 a = td->my_arena; // the current arena if any 685 686 if( a ) { // Get parameters from the arena 687 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, NULL ); 688 return a->my_num_reserved_slots + a->my_max_num_workers; 689 } 690 691 if (ta && ta->my_max_concurrency == 1) { 692 return 1; 693 } 694 695 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, NULL ); 696 return int(governor::default_num_threads()); 697 } 698 699 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) { 700 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it? 701 thread_data* tls = governor::get_thread_data(); 702 assert_pointers_valid(tls, tls->my_task_dispatcher); 703 task_dispatcher* dispatcher = tls->my_task_dispatcher; 704 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation; 705 try_call([&] { 706 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard. 707 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d); 708 // Save the current isolation value and set new one 709 previous_isolation = dispatcher->set_isolation(current_isolation); 710 // Isolation within this callable 711 d(); 712 }).on_completion([&] { 713 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, NULL); 714 dispatcher->set_isolation(previous_isolation); 715 }); 716 } 717 718 } // namespace r1 719 } // namespace detail 720 } // namespace tbb 721 722