1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "task_dispatcher.h"
18 #include "governor.h"
19 #include "threading_control.h"
20 #include "arena.h"
21 #include "itt_notify.h"
22 #include "semaphore.h"
23 #include "waiters.h"
24 #include "oneapi/tbb/detail/_task.h"
25 #include "oneapi/tbb/info.h"
26 #include "oneapi/tbb/tbb_allocator.h"
27
28 #include <atomic>
29 #include <cstring>
30 #include <functional>
31
32 namespace tbb {
33 namespace detail {
34 namespace r1 {
35
36 #if __TBB_ARENA_BINDING
37 class numa_binding_observer : public tbb::task_scheduler_observer {
38 binding_handler* my_binding_handler;
39 public:
numa_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)40 numa_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core )
41 : task_scheduler_observer(*ta)
42 , my_binding_handler(construct_binding_handler(num_slots, numa_id, core_type, max_threads_per_core))
43 {}
44
on_scheduler_entry(bool)45 void on_scheduler_entry( bool ) override {
46 apply_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
47 }
48
on_scheduler_exit(bool)49 void on_scheduler_exit( bool ) override {
50 restore_affinity_mask(my_binding_handler, this_task_arena::current_thread_index());
51 }
52
~numa_binding_observer()53 ~numa_binding_observer() override{
54 destroy_binding_handler(my_binding_handler);
55 }
56 };
57
construct_binding_observer(d1::task_arena * ta,int num_slots,int numa_id,core_type_id core_type,int max_threads_per_core)58 numa_binding_observer* construct_binding_observer( d1::task_arena* ta, int num_slots, int numa_id, core_type_id core_type, int max_threads_per_core ) {
59 numa_binding_observer* binding_observer = nullptr;
60 if ((core_type >= 0 && core_type_count() > 1) || (numa_id >= 0 && numa_node_count() > 1) || max_threads_per_core > 0) {
61 binding_observer = new(allocate_memory(sizeof(numa_binding_observer))) numa_binding_observer(ta, num_slots, numa_id, core_type, max_threads_per_core);
62 __TBB_ASSERT(binding_observer, "Failure during NUMA binding observer allocation and construction");
63 binding_observer->observe(true);
64 }
65 return binding_observer;
66 }
67
destroy_binding_observer(numa_binding_observer * binding_observer)68 void destroy_binding_observer( numa_binding_observer* binding_observer ) {
69 __TBB_ASSERT(binding_observer, "Trying to deallocate nullptr pointer");
70 binding_observer->observe(false);
71 binding_observer->~numa_binding_observer();
72 deallocate_memory(binding_observer);
73 }
74 #endif /*!__TBB_ARENA_BINDING*/
75
on_thread_leaving(unsigned ref_param)76 void arena::on_thread_leaving(unsigned ref_param) {
77 //
78 // Implementation of arena destruction synchronization logic contained various
79 // bugs/flaws at the different stages of its evolution, so below is a detailed
80 // description of the issues taken into consideration in the framework of the
81 // current design.
82 //
83 // In case of using fire-and-forget tasks (scheduled via task::enqueue())
84 // external thread is allowed to leave its arena before all its work is executed,
85 // and market may temporarily revoke all workers from this arena. Since revoked
86 // workers never attempt to reset arena state to EMPTY and cancel its request
87 // to RML for threads, the arena object is destroyed only when both the last
88 // thread is leaving it and arena's state is EMPTY (that is its external thread
89 // left and it does not contain any work).
90 // Thus resetting arena to EMPTY state (as earlier TBB versions did) should not
91 // be done here (or anywhere else in the external thread to that matter); doing so
92 // can result either in arena's premature destruction (at least without
93 // additional costly checks in workers) or in unnecessary arena state changes
94 // (and ensuing workers migration).
95 //
96 // A worker that checks for work presence and transitions arena to the EMPTY
97 // state (in snapshot taking procedure arena::out_of_work()) updates
98 // arena::my_pool_state first and only then arena::my_num_workers_requested.
99 // So the check for work absence must be done against the latter field.
100 //
101 // In a time window between decrementing the active threads count and checking
102 // if there is an outstanding request for workers. New worker thread may arrive,
103 // finish remaining work, set arena state to empty, and leave decrementing its
104 // refcount and destroying. Then the current thread will destroy the arena
105 // the second time. To preclude it a local copy of the outstanding request
106 // value can be stored before decrementing active threads count.
107 //
108 // But this technique may cause two other problem. When the stored request is
109 // zero, it is possible that arena still has threads and they can generate new
110 // tasks and thus re-establish non-zero requests. Then all the threads can be
111 // revoked (as described above) leaving this thread the last one, and causing
112 // it to destroy non-empty arena.
113 //
114 // The other problem takes place when the stored request is non-zero. Another
115 // thread may complete the work, set arena state to empty, and leave without
116 // arena destruction before this thread decrements the refcount. This thread
117 // cannot destroy the arena either. Thus the arena may be "orphaned".
118 //
119 // In both cases we cannot dereference arena pointer after the refcount is
120 // decremented, as our arena may already be destroyed.
121 //
122 // If this is the external thread, the market is protected by refcount to it.
123 // In case of workers market's liveness is ensured by the RML connection
124 // rundown protocol, according to which the client (i.e. the market) lives
125 // until RML server notifies it about connection termination, and this
126 // notification is fired only after all workers return into RML.
127 //
128 // Thus if we decremented refcount to zero we ask the market to check arena
129 // state (including the fact if it is alive) under the lock.
130 //
131
132 __TBB_ASSERT(my_references.load(std::memory_order_relaxed) >= ref_param, "broken arena reference counter");
133
134 // When there is no workers someone must free arena, as
135 // without workers, no one calls out_of_work().
136 if (ref_param == ref_external && !my_mandatory_concurrency.test()) {
137 out_of_work();
138 }
139
140 threading_control* tc = my_threading_control;
141 auto tc_client_snapshot = tc->prepare_client_destruction(my_tc_client);
142 // Release our reference to sync with destroy_client
143 unsigned remaining_ref = my_references.fetch_sub(ref_param, std::memory_order_release) - ref_param;
144 // do not access `this` it might be destroyed already
145 if (remaining_ref == 0) {
146 if (tc->try_destroy_client(tc_client_snapshot)) {
147 // We are requested to destroy ourself
148 free_arena();
149 }
150 }
151 }
152
occupy_free_slot_in_range(thread_data & tls,std::size_t lower,std::size_t upper)153 std::size_t arena::occupy_free_slot_in_range( thread_data& tls, std::size_t lower, std::size_t upper ) {
154 if ( lower >= upper ) return out_of_arena;
155 // Start search for an empty slot from the one we occupied the last time
156 std::size_t index = tls.my_arena_index;
157 if ( index < lower || index >= upper ) index = tls.my_random.get() % (upper - lower) + lower;
158 __TBB_ASSERT( index >= lower && index < upper, nullptr);
159 // Find a free slot
160 for ( std::size_t i = index; i < upper; ++i )
161 if (my_slots[i].try_occupy()) return i;
162 for ( std::size_t i = lower; i < index; ++i )
163 if (my_slots[i].try_occupy()) return i;
164 return out_of_arena;
165 }
166
167 template <bool as_worker>
occupy_free_slot(thread_data & tls)168 std::size_t arena::occupy_free_slot(thread_data& tls) {
169 // Firstly, external threads try to occupy reserved slots
170 std::size_t index = as_worker ? out_of_arena : occupy_free_slot_in_range( tls, 0, my_num_reserved_slots );
171 if ( index == out_of_arena ) {
172 // Secondly, all threads try to occupy all non-reserved slots
173 index = occupy_free_slot_in_range(tls, my_num_reserved_slots, my_num_slots );
174 // Likely this arena is already saturated
175 if ( index == out_of_arena )
176 return out_of_arena;
177 }
178
179 atomic_update( my_limit, (unsigned)(index + 1), std::less<unsigned>() );
180 return index;
181 }
182
calculate_stealing_threshold()183 std::uintptr_t arena::calculate_stealing_threshold() {
184 stack_anchor_type anchor;
185 return r1::calculate_stealing_threshold(reinterpret_cast<std::uintptr_t>(&anchor), my_threading_control->worker_stack_size());
186 }
187
process(thread_data & tls)188 void arena::process(thread_data& tls) {
189 governor::set_thread_data(tls); // TODO: consider moving to create_one_job.
190 __TBB_ASSERT( is_alive(my_guard), nullptr);
191 __TBB_ASSERT( my_num_slots >= 1, nullptr);
192
193 std::size_t index = occupy_free_slot</*as_worker*/true>(tls);
194 if (index == out_of_arena) {
195 on_thread_leaving(ref_worker);
196 return;
197 }
198
199 my_tc_client.get_pm_client()->register_thread();
200
201 __TBB_ASSERT( index >= my_num_reserved_slots, "Workers cannot occupy reserved slots" );
202 tls.attach_arena(*this, index);
203 // worker thread enters the dispatch loop to look for a work
204 tls.my_inbox.set_is_idle(true);
205 if (tls.my_arena_slot->is_task_pool_published()) {
206 tls.my_inbox.set_is_idle(false);
207 }
208
209 task_dispatcher& task_disp = tls.my_arena_slot->default_task_dispatcher();
210 tls.enter_task_dispatcher(task_disp, calculate_stealing_threshold());
211 __TBB_ASSERT(task_disp.can_steal(), nullptr);
212
213 __TBB_ASSERT( !tls.my_last_observer, "There cannot be notified local observers when entering arena" );
214 my_observers.notify_entry_observers(tls.my_last_observer, tls.my_is_worker);
215
216 // Waiting on special object tied to this arena
217 outermost_worker_waiter waiter(*this);
218 d1::task* t = tls.my_task_dispatcher->local_wait_for_all(nullptr, waiter);
219 // For purposes of affinity support, the slot's mailbox is considered idle while no thread is
220 // attached to it.
221 tls.my_inbox.set_is_idle(true);
222
223 __TBB_ASSERT_EX(t == nullptr, "Outermost worker must not leave dispatch loop with a task");
224 __TBB_ASSERT(governor::is_thread_data_set(&tls), nullptr);
225 __TBB_ASSERT(tls.my_task_dispatcher == &task_disp, nullptr);
226
227 my_observers.notify_exit_observers(tls.my_last_observer, tls.my_is_worker);
228 tls.my_last_observer = nullptr;
229
230 tls.leave_task_dispatcher();
231
232 // Arena slot detach (arena may be used in market::process)
233 // TODO: Consider moving several calls below into a new method(e.g.detach_arena).
234 tls.my_arena_slot->release();
235 tls.my_arena_slot = nullptr;
236 tls.my_inbox.detach();
237 __TBB_ASSERT(tls.my_inbox.is_idle_state(true), nullptr);
238 __TBB_ASSERT(is_alive(my_guard), nullptr);
239
240 my_tc_client.get_pm_client()->unregister_thread();
241
242 // In contrast to earlier versions of TBB (before 3.0 U5) now it is possible
243 // that arena may be temporarily left unpopulated by threads. See comments in
244 // arena::on_thread_leaving() for more details.
245 on_thread_leaving(ref_worker);
246 __TBB_ASSERT(tls.my_arena == this, "my_arena is used as a hint when searching the arena to join");
247 }
248
arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)249 arena::arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned priority_level) {
250 __TBB_ASSERT( !my_guard, "improperly allocated arena?" );
251 __TBB_ASSERT( sizeof(my_slots[0]) % cache_line_size()==0, "arena::slot size not multiple of cache line size" );
252 __TBB_ASSERT( is_aligned(this, cache_line_size()), "arena misaligned" );
253 my_threading_control = control;
254 my_limit = 1;
255 // Two slots are mandatory: for the external thread, and for 1 worker (required to support starvation resistant tasks).
256 my_num_slots = num_arena_slots(num_slots, num_reserved_slots);
257 my_num_reserved_slots = num_reserved_slots;
258 my_max_num_workers = num_slots-num_reserved_slots;
259 my_priority_level = priority_level;
260 my_references = ref_external; // accounts for the external thread
261 my_observers.my_arena = this;
262 my_co_cache.init(4 * num_slots);
263 __TBB_ASSERT ( my_max_num_workers <= my_num_slots, nullptr);
264 // Initialize the default context. It should be allocated before task_dispatch construction.
265 my_default_ctx = new (cache_aligned_allocate(sizeof(d1::task_group_context)))
266 d1::task_group_context{ d1::task_group_context::isolated, d1::task_group_context::fp_settings };
267 // Construct slots. Mark internal synchronization elements for the tools.
268 task_dispatcher* base_td_pointer = reinterpret_cast<task_dispatcher*>(my_slots + my_num_slots);
269 for( unsigned i = 0; i < my_num_slots; ++i ) {
270 // __TBB_ASSERT( !my_slots[i].my_scheduler && !my_slots[i].task_pool, nullptr);
271 __TBB_ASSERT( !my_slots[i].task_pool_ptr, nullptr);
272 __TBB_ASSERT( !my_slots[i].my_task_pool_size, nullptr);
273 mailbox(i).construct();
274 my_slots[i].init_task_streams(i);
275 my_slots[i].my_default_task_dispatcher = new(base_td_pointer + i) task_dispatcher(this);
276 my_slots[i].my_is_occupied.store(false, std::memory_order_relaxed);
277 }
278 my_fifo_task_stream.initialize(my_num_slots);
279 my_resume_task_stream.initialize(my_num_slots);
280 #if __TBB_PREVIEW_CRITICAL_TASKS
281 my_critical_task_stream.initialize(my_num_slots);
282 #endif
283 my_mandatory_requests = 0;
284 }
285
allocate_arena(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned priority_level)286 arena& arena::allocate_arena(threading_control* control, unsigned num_slots, unsigned num_reserved_slots,
287 unsigned priority_level)
288 {
289 __TBB_ASSERT( sizeof(base_type) + sizeof(arena_slot) == sizeof(arena), "All arena data fields must go to arena_base" );
290 __TBB_ASSERT( sizeof(base_type) % cache_line_size() == 0, "arena slots area misaligned: wrong padding" );
291 __TBB_ASSERT( sizeof(mail_outbox) == max_nfs_size, "Mailbox padding is wrong" );
292 std::size_t n = allocation_size(num_arena_slots(num_slots, num_reserved_slots));
293 unsigned char* storage = (unsigned char*)cache_aligned_allocate(n);
294 // Zero all slots to indicate that they are empty
295 std::memset( storage, 0, n );
296
297 return *new( storage + num_arena_slots(num_slots, num_reserved_slots) * sizeof(mail_outbox) )
298 arena(control, num_slots, num_reserved_slots, priority_level);
299 }
300
free_arena()301 void arena::free_arena () {
302 __TBB_ASSERT( is_alive(my_guard), nullptr);
303 __TBB_ASSERT( !my_references.load(std::memory_order_relaxed), "There are threads in the dying arena" );
304 __TBB_ASSERT( !my_total_num_workers_requested && !my_num_workers_allotted, "Dying arena requests workers" );
305 __TBB_ASSERT( is_empty(), "Inconsistent state of a dying arena" );
306 #if __TBB_ARENA_BINDING
307 if (my_numa_binding_observer != nullptr) {
308 destroy_binding_observer(my_numa_binding_observer);
309 my_numa_binding_observer = nullptr;
310 }
311 #endif /*__TBB_ARENA_BINDING*/
312 poison_value( my_guard );
313 for ( unsigned i = 0; i < my_num_slots; ++i ) {
314 // __TBB_ASSERT( !my_slots[i].my_scheduler, "arena slot is not empty" );
315 // TODO: understand the assertion and modify
316 // __TBB_ASSERT( my_slots[i].task_pool == EmptyTaskPool, nullptr);
317 __TBB_ASSERT( my_slots[i].head == my_slots[i].tail, nullptr); // TODO: replace by is_quiescent_local_task_pool_empty
318 my_slots[i].free_task_pool();
319 mailbox(i).drain();
320 my_slots[i].my_default_task_dispatcher->~task_dispatcher();
321 }
322 __TBB_ASSERT(my_fifo_task_stream.empty(), "Not all enqueued tasks were executed");
323 __TBB_ASSERT(my_resume_task_stream.empty(), "Not all enqueued tasks were executed");
324 // Cleanup coroutines/schedulers cache
325 my_co_cache.cleanup();
326 my_default_ctx->~task_group_context();
327 cache_aligned_deallocate(my_default_ctx);
328 #if __TBB_PREVIEW_CRITICAL_TASKS
329 __TBB_ASSERT( my_critical_task_stream.empty(), "Not all critical tasks were executed");
330 #endif
331 // Clear enfources synchronization with observe(false)
332 my_observers.clear();
333
334 void* storage = &mailbox(my_num_slots-1);
335 __TBB_ASSERT( my_references.load(std::memory_order_relaxed) == 0, nullptr);
336 this->~arena();
337 #if TBB_USE_ASSERT > 1
338 std::memset( storage, 0, allocation_size(my_num_slots) );
339 #endif /* TBB_USE_ASSERT */
340 cache_aligned_deallocate( storage );
341 }
342
has_enqueued_tasks()343 bool arena::has_enqueued_tasks() {
344 return !my_fifo_task_stream.empty();
345 }
346
request_workers(int mandatory_delta,int workers_delta,bool wakeup_threads)347 void arena::request_workers(int mandatory_delta, int workers_delta, bool wakeup_threads) {
348 my_threading_control->adjust_demand(my_tc_client, mandatory_delta, workers_delta);
349
350 if (wakeup_threads) {
351 // Notify all sleeping threads that work has appeared in the arena.
352 get_waiting_threads_monitor().notify([&] (market_context context) {
353 return this == context.my_arena_addr;
354 });
355 }
356 }
357
has_tasks()358 bool arena::has_tasks() {
359 // TODO: rework it to return at least a hint about where a task was found; better if the task itself.
360 std::size_t n = my_limit.load(std::memory_order_acquire);
361 bool tasks_are_available = false;
362 for (std::size_t k = 0; k < n && !tasks_are_available; ++k) {
363 tasks_are_available = !my_slots[k].is_empty();
364 }
365 tasks_are_available = tasks_are_available || has_enqueued_tasks() || !my_resume_task_stream.empty();
366 #if __TBB_PREVIEW_CRITICAL_TASKS
367 tasks_are_available = tasks_are_available || !my_critical_task_stream.empty();
368 #endif
369 return tasks_are_available;
370 }
371
out_of_work()372 void arena::out_of_work() {
373 // We should try unset my_pool_state first due to keep arena invariants in consistent state
374 // Otherwise, we might have my_pool_state = false and my_mandatory_concurrency = true that is broken invariant
375 bool disable_mandatory = my_mandatory_concurrency.try_clear_if([this] { return !has_enqueued_tasks(); });
376 bool release_workers = my_pool_state.try_clear_if([this] { return !has_tasks(); });
377
378 if (disable_mandatory || release_workers) {
379 int mandatory_delta = disable_mandatory ? -1 : 0;
380 int workers_delta = release_workers ? -(int)my_max_num_workers : 0;
381
382 if (disable_mandatory && is_arena_workerless()) {
383 // We had set workers_delta to 1 when enabled mandatory concurrency, so revert it now
384 workers_delta = -1;
385 }
386 request_workers(mandatory_delta, workers_delta);
387 }
388 }
389
set_top_priority(bool is_top_priority)390 void arena::set_top_priority(bool is_top_priority) {
391 my_is_top_priority.store(is_top_priority, std::memory_order_relaxed);
392 }
393
is_top_priority() const394 bool arena::is_top_priority() const {
395 return my_is_top_priority.load(std::memory_order_relaxed);
396 }
397
try_join()398 bool arena::try_join() {
399 if (num_workers_active() < my_num_workers_allotted.load(std::memory_order_relaxed)) {
400 my_references += arena::ref_worker;
401 return true;
402 }
403 return false;
404 }
405
set_allotment(unsigned allotment)406 void arena::set_allotment(unsigned allotment) {
407 if (my_num_workers_allotted.load(std::memory_order_relaxed) != allotment) {
408 my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
409 }
410 }
411
update_concurrency(unsigned allotment)412 int arena::update_concurrency(unsigned allotment) {
413 int delta = allotment - my_num_workers_allotted.load(std::memory_order_relaxed);
414 if (delta != 0) {
415 my_num_workers_allotted.store(allotment, std::memory_order_relaxed);
416 }
417 return delta;
418 }
419
update_request(int mandatory_delta,int workers_delta)420 std::pair<int, int> arena::update_request(int mandatory_delta, int workers_delta) {
421 __TBB_ASSERT(-1 <= mandatory_delta && mandatory_delta <= 1, nullptr);
422
423 int min_workers_request = 0;
424 int max_workers_request = 0;
425
426 // Calculate min request
427 my_mandatory_requests += mandatory_delta;
428 min_workers_request = my_mandatory_requests > 0 ? 1 : 0;
429
430 // Calculate max request
431 my_total_num_workers_requested += workers_delta;
432 // Clamp worker request into interval [0, my_max_num_workers]
433 max_workers_request = clamp(my_total_num_workers_requested, 0,
434 min_workers_request > 0 && is_arena_workerless() ? 1 : (int)my_max_num_workers);
435
436 return { min_workers_request, max_workers_request };
437 }
438
get_waiting_threads_monitor()439 thread_control_monitor& arena::get_waiting_threads_monitor() {
440 return my_threading_control->get_waiting_threads_monitor();
441 }
442
enqueue_task(d1::task & t,d1::task_group_context & ctx,thread_data & td)443 void arena::enqueue_task(d1::task& t, d1::task_group_context& ctx, thread_data& td) {
444 task_group_context_impl::bind_to(ctx, &td);
445 task_accessor::context(t) = &ctx;
446 task_accessor::isolation(t) = no_isolation;
447 my_fifo_task_stream.push( &t, random_lane_selector(td.my_random) );
448 advertise_new_work<work_enqueued>();
449 }
450
create(threading_control * control,unsigned num_slots,unsigned num_reserved_slots,unsigned arena_priority_level,d1::constraints constraints)451 arena& arena::create(threading_control* control, unsigned num_slots, unsigned num_reserved_slots, unsigned arena_priority_level, d1::constraints constraints) {
452 __TBB_ASSERT(num_slots > 0, NULL);
453 __TBB_ASSERT(num_reserved_slots <= num_slots, NULL);
454 // Add public market reference for an external thread/task_arena (that adds an internal reference in exchange).
455 arena& a = arena::allocate_arena(control, num_slots, num_reserved_slots, arena_priority_level);
456 a.my_tc_client = control->create_client(a);
457 // We should not publish arena until all fields are initialized
458 control->publish_client(a.my_tc_client, constraints);
459 return a;
460 }
461
462 } // namespace r1
463 } // namespace detail
464 } // namespace tbb
465
466 // Enable task_arena.h
467 #include "oneapi/tbb/task_arena.h" // task_arena_base
468
469 namespace tbb {
470 namespace detail {
471 namespace r1 {
472
473 #if TBB_USE_ASSERT
assert_arena_priority_valid(tbb::task_arena::priority a_priority)474 void assert_arena_priority_valid( tbb::task_arena::priority a_priority ) {
475 bool is_arena_priority_correct =
476 a_priority == tbb::task_arena::priority::high ||
477 a_priority == tbb::task_arena::priority::normal ||
478 a_priority == tbb::task_arena::priority::low;
479 __TBB_ASSERT( is_arena_priority_correct,
480 "Task arena priority should be equal to one of the predefined values." );
481 }
482 #else
483 void assert_arena_priority_valid( tbb::task_arena::priority ) {}
484 #endif
485
arena_priority_level(tbb::task_arena::priority a_priority)486 unsigned arena_priority_level( tbb::task_arena::priority a_priority ) {
487 assert_arena_priority_valid( a_priority );
488 return d1::num_priority_levels - unsigned(int(a_priority) / d1::priority_stride);
489 }
490
arena_priority(unsigned priority_level)491 tbb::task_arena::priority arena_priority( unsigned priority_level ) {
492 auto priority = tbb::task_arena::priority(
493 (d1::num_priority_levels - priority_level) * d1::priority_stride
494 );
495 assert_arena_priority_valid( priority );
496 return priority;
497 }
498
499 struct task_arena_impl {
500 static void initialize(d1::task_arena_base&);
501 static void terminate(d1::task_arena_base&);
502 static bool attach(d1::task_arena_base&);
503 static void execute(d1::task_arena_base&, d1::delegate_base&);
504 static void wait(d1::task_arena_base&);
505 static int max_concurrency(const d1::task_arena_base*);
506 static void enqueue(d1::task&, d1::task_group_context*, d1::task_arena_base*);
507 };
508
initialize(d1::task_arena_base & ta)509 void __TBB_EXPORTED_FUNC initialize(d1::task_arena_base& ta) {
510 task_arena_impl::initialize(ta);
511 }
terminate(d1::task_arena_base & ta)512 void __TBB_EXPORTED_FUNC terminate(d1::task_arena_base& ta) {
513 task_arena_impl::terminate(ta);
514 }
attach(d1::task_arena_base & ta)515 bool __TBB_EXPORTED_FUNC attach(d1::task_arena_base& ta) {
516 return task_arena_impl::attach(ta);
517 }
execute(d1::task_arena_base & ta,d1::delegate_base & d)518 void __TBB_EXPORTED_FUNC execute(d1::task_arena_base& ta, d1::delegate_base& d) {
519 task_arena_impl::execute(ta, d);
520 }
wait(d1::task_arena_base & ta)521 void __TBB_EXPORTED_FUNC wait(d1::task_arena_base& ta) {
522 task_arena_impl::wait(ta);
523 }
524
max_concurrency(const d1::task_arena_base * ta)525 int __TBB_EXPORTED_FUNC max_concurrency(const d1::task_arena_base* ta) {
526 return task_arena_impl::max_concurrency(ta);
527 }
528
enqueue(d1::task & t,d1::task_arena_base * ta)529 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_arena_base* ta) {
530 task_arena_impl::enqueue(t, nullptr, ta);
531 }
532
enqueue(d1::task & t,d1::task_group_context & ctx,d1::task_arena_base * ta)533 void __TBB_EXPORTED_FUNC enqueue(d1::task& t, d1::task_group_context& ctx, d1::task_arena_base* ta) {
534 task_arena_impl::enqueue(t, &ctx, ta);
535 }
536
initialize(d1::task_arena_base & ta)537 void task_arena_impl::initialize(d1::task_arena_base& ta) {
538 // Enforce global market initialization to properly initialize soft limit
539 (void)governor::get_thread_data();
540 d1::constraints arena_constraints;
541
542 #if __TBB_ARENA_BINDING
543 arena_constraints = d1::constraints{}
544 .set_core_type(ta.core_type())
545 .set_max_threads_per_core(ta.max_threads_per_core())
546 .set_numa_id(ta.my_numa_id);
547 #endif /*__TBB_ARENA_BINDING*/
548
549 if (ta.my_max_concurrency < 1) {
550 #if __TBB_ARENA_BINDING
551 ta.my_max_concurrency = (int)default_concurrency(arena_constraints);
552 #else /*!__TBB_ARENA_BINDING*/
553 ta.my_max_concurrency = (int)governor::default_num_threads();
554 #endif /*!__TBB_ARENA_BINDING*/
555 }
556
557 __TBB_ASSERT(ta.my_arena.load(std::memory_order_relaxed) == nullptr, "Arena already initialized");
558 unsigned priority_level = arena_priority_level(ta.my_priority);
559 threading_control* thr_control = threading_control::register_public_reference();
560 arena& a = arena::create(thr_control, unsigned(ta.my_max_concurrency), ta.my_num_reserved_slots, priority_level, arena_constraints);
561
562 ta.my_arena.store(&a, std::memory_order_release);
563 #if __TBB_CPUBIND_PRESENT
564 a.my_numa_binding_observer = construct_binding_observer(
565 static_cast<d1::task_arena*>(&ta), a.my_num_slots, ta.my_numa_id, ta.core_type(), ta.max_threads_per_core());
566 #endif /*__TBB_CPUBIND_PRESENT*/
567 }
568
terminate(d1::task_arena_base & ta)569 void task_arena_impl::terminate(d1::task_arena_base& ta) {
570 arena* a = ta.my_arena.load(std::memory_order_relaxed);
571 assert_pointer_valid(a);
572 threading_control::unregister_public_reference(/*blocking_terminate=*/false);
573 a->on_thread_leaving(arena::ref_external);
574 ta.my_arena.store(nullptr, std::memory_order_relaxed);
575 }
576
attach(d1::task_arena_base & ta)577 bool task_arena_impl::attach(d1::task_arena_base& ta) {
578 __TBB_ASSERT(!ta.my_arena.load(std::memory_order_relaxed), nullptr);
579 thread_data* td = governor::get_thread_data_if_initialized();
580 if( td && td->my_arena ) {
581 arena* a = td->my_arena;
582 // There is an active arena to attach to.
583 // It's still used by s, so won't be destroyed right away.
584 __TBB_ASSERT(a->my_references > 0, nullptr);
585 a->my_references += arena::ref_external;
586 ta.my_num_reserved_slots = a->my_num_reserved_slots;
587 ta.my_priority = arena_priority(a->my_priority_level);
588 ta.my_max_concurrency = ta.my_num_reserved_slots + a->my_max_num_workers;
589 __TBB_ASSERT(arena::num_arena_slots(ta.my_max_concurrency, ta.my_num_reserved_slots) == a->my_num_slots, nullptr);
590 ta.my_arena.store(a, std::memory_order_release);
591 // increases threading_control's ref count for task_arena
592 threading_control::register_public_reference();
593 return true;
594 }
595 return false;
596 }
597
enqueue(d1::task & t,d1::task_group_context * c,d1::task_arena_base * ta)598 void task_arena_impl::enqueue(d1::task& t, d1::task_group_context* c, d1::task_arena_base* ta) {
599 thread_data* td = governor::get_thread_data(); // thread data is only needed for FastRandom instance
600 assert_pointer_valid(td, "thread_data pointer should not be null");
601 arena* a = ta ?
602 ta->my_arena.load(std::memory_order_relaxed)
603 : td->my_arena
604 ;
605 assert_pointer_valid(a, "arena pointer should not be null");
606 auto* ctx = c ? c : a->my_default_ctx;
607 assert_pointer_valid(ctx, "context pointer should not be null");
608 // Is there a better place for checking the state of ctx?
609 __TBB_ASSERT(!a->my_default_ctx->is_group_execution_cancelled(),
610 "The task will not be executed because its task_group_context is cancelled.");
611 a->enqueue_task(t, *ctx, *td);
612 }
613
614 class nested_arena_context : no_copy {
615 public:
nested_arena_context(thread_data & td,arena & nested_arena,std::size_t slot_index)616 nested_arena_context(thread_data& td, arena& nested_arena, std::size_t slot_index)
617 : m_orig_execute_data_ext(td.my_task_dispatcher->m_execute_data_ext)
618 {
619 if (td.my_arena != &nested_arena) {
620 m_orig_arena = td.my_arena;
621 m_orig_slot_index = td.my_arena_index;
622 m_orig_last_observer = td.my_last_observer;
623
624 td.detach_task_dispatcher();
625 td.attach_arena(nested_arena, slot_index);
626 if (td.my_inbox.is_idle_state(true))
627 td.my_inbox.set_is_idle(false);
628 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
629 td.enter_task_dispatcher(task_disp, m_orig_execute_data_ext.task_disp->m_stealing_threshold);
630
631 // If the calling thread occupies the slots out of external thread reserve we need to notify the
632 // market that this arena requires one worker less.
633 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
634 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ -1);
635 }
636
637 td.my_last_observer = nullptr;
638 // The task_arena::execute method considers each calling thread as an external thread.
639 td.my_arena->my_observers.notify_entry_observers(td.my_last_observer, /* worker*/false);
640 }
641
642 m_task_dispatcher = td.my_task_dispatcher;
643 m_orig_fifo_tasks_allowed = m_task_dispatcher->allow_fifo_task(true);
644 m_orig_critical_task_allowed = m_task_dispatcher->m_properties.critical_task_allowed;
645 m_task_dispatcher->m_properties.critical_task_allowed = true;
646
647 execution_data_ext& ed_ext = td.my_task_dispatcher->m_execute_data_ext;
648 ed_ext.context = td.my_arena->my_default_ctx;
649 ed_ext.original_slot = td.my_arena_index;
650 ed_ext.affinity_slot = d1::no_slot;
651 ed_ext.task_disp = td.my_task_dispatcher;
652 ed_ext.isolation = no_isolation;
653
654 __TBB_ASSERT(td.my_arena_slot, nullptr);
655 __TBB_ASSERT(td.my_arena_slot->is_occupied(), nullptr);
656 __TBB_ASSERT(td.my_task_dispatcher, nullptr);
657 }
~nested_arena_context()658 ~nested_arena_context() {
659 thread_data& td = *m_task_dispatcher->m_thread_data;
660 __TBB_ASSERT(governor::is_thread_data_set(&td), nullptr);
661 m_task_dispatcher->allow_fifo_task(m_orig_fifo_tasks_allowed);
662 m_task_dispatcher->m_properties.critical_task_allowed = m_orig_critical_task_allowed;
663 if (m_orig_arena) {
664 td.my_arena->my_observers.notify_exit_observers(td.my_last_observer, /*worker*/ false);
665 td.my_last_observer = m_orig_last_observer;
666
667 // Notify the market that this thread releasing a one slot
668 // that can be used by a worker thread.
669 if (td.my_arena_index >= td.my_arena->my_num_reserved_slots) {
670 td.my_arena->request_workers(/* mandatory_delta = */ 0, /* workers_delta = */ 1);
671 }
672
673 td.leave_task_dispatcher();
674 td.my_arena_slot->release();
675 td.my_arena->my_exit_monitors.notify_one(); // do not relax!
676
677 td.attach_arena(*m_orig_arena, m_orig_slot_index);
678 td.attach_task_dispatcher(*m_orig_execute_data_ext.task_disp);
679 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
680 }
681 td.my_task_dispatcher->m_execute_data_ext = m_orig_execute_data_ext;
682 }
683
684 private:
685 execution_data_ext m_orig_execute_data_ext{};
686 arena* m_orig_arena{ nullptr };
687 observer_proxy* m_orig_last_observer{ nullptr };
688 task_dispatcher* m_task_dispatcher{ nullptr };
689 unsigned m_orig_slot_index{};
690 bool m_orig_fifo_tasks_allowed{};
691 bool m_orig_critical_task_allowed{};
692 };
693
694 class delegated_task : public d1::task {
695 d1::delegate_base& m_delegate;
696 concurrent_monitor& m_monitor;
697 d1::wait_context& m_wait_ctx;
698 std::atomic<bool> m_completed;
execute(d1::execution_data & ed)699 d1::task* execute(d1::execution_data& ed) override {
700 const execution_data_ext& ed_ext = static_cast<const execution_data_ext&>(ed);
701 execution_data_ext orig_execute_data_ext = ed_ext.task_disp->m_execute_data_ext;
702 __TBB_ASSERT(&ed_ext.task_disp->m_execute_data_ext == &ed,
703 "The execute data shall point to the current task dispatcher execute data");
704 __TBB_ASSERT(ed_ext.task_disp->m_execute_data_ext.isolation == no_isolation, nullptr);
705
706 ed_ext.task_disp->m_execute_data_ext.context = ed_ext.task_disp->get_thread_data().my_arena->my_default_ctx;
707 bool fifo_task_allowed = ed_ext.task_disp->allow_fifo_task(true);
708 try_call([&] {
709 m_delegate();
710 }).on_completion([&] {
711 ed_ext.task_disp->m_execute_data_ext = orig_execute_data_ext;
712 ed_ext.task_disp->allow_fifo_task(fifo_task_allowed);
713 });
714
715 finalize();
716 return nullptr;
717 }
cancel(d1::execution_data &)718 d1::task* cancel(d1::execution_data&) override {
719 finalize();
720 return nullptr;
721 }
finalize()722 void finalize() {
723 m_wait_ctx.release(); // must precede the wakeup
724 m_monitor.notify([this] (std::uintptr_t ctx) {
725 return ctx == std::uintptr_t(&m_delegate);
726 }); // do not relax, it needs a fence!
727 m_completed.store(true, std::memory_order_release);
728 }
729 public:
delegated_task(d1::delegate_base & d,concurrent_monitor & s,d1::wait_context & wo)730 delegated_task(d1::delegate_base& d, concurrent_monitor& s, d1::wait_context& wo)
731 : m_delegate(d), m_monitor(s), m_wait_ctx(wo), m_completed{ false }{}
~delegated_task()732 ~delegated_task() override {
733 // The destructor can be called earlier than the m_monitor is notified
734 // because the waiting thread can be released after m_wait_ctx.release_wait.
735 // To close that race we wait for the m_completed signal.
736 spin_wait_until_eq(m_completed, true);
737 }
738 };
739
execute(d1::task_arena_base & ta,d1::delegate_base & d)740 void task_arena_impl::execute(d1::task_arena_base& ta, d1::delegate_base& d) {
741 arena* a = ta.my_arena.load(std::memory_order_relaxed);
742 __TBB_ASSERT(a != nullptr, nullptr);
743 thread_data* td = governor::get_thread_data();
744
745 bool same_arena = td->my_arena == a;
746 std::size_t index1 = td->my_arena_index;
747 if (!same_arena) {
748 index1 = a->occupy_free_slot</*as_worker */false>(*td);
749 if (index1 == arena::out_of_arena) {
750 concurrent_monitor::thread_context waiter((std::uintptr_t)&d);
751 d1::wait_context wo(1);
752 d1::task_group_context exec_context(d1::task_group_context::isolated);
753 task_group_context_impl::copy_fp_settings(exec_context, *a->my_default_ctx);
754
755 delegated_task dt(d, a->my_exit_monitors, wo);
756 a->enqueue_task( dt, exec_context, *td);
757 size_t index2 = arena::out_of_arena;
758 do {
759 a->my_exit_monitors.prepare_wait(waiter);
760 if (!wo.continue_execution()) {
761 a->my_exit_monitors.cancel_wait(waiter);
762 break;
763 }
764 index2 = a->occupy_free_slot</*as_worker*/false>(*td);
765 if (index2 != arena::out_of_arena) {
766 a->my_exit_monitors.cancel_wait(waiter);
767 nested_arena_context scope(*td, *a, index2 );
768 r1::wait(wo, exec_context);
769 __TBB_ASSERT(!exec_context.my_exception.load(std::memory_order_relaxed), nullptr); // exception can be thrown above, not deferred
770 break;
771 }
772 a->my_exit_monitors.commit_wait(waiter);
773 } while (wo.continue_execution());
774 if (index2 == arena::out_of_arena) {
775 // notify a waiting thread even if this thread did not enter arena,
776 // in case it was woken by a leaving thread but did not need to enter
777 a->my_exit_monitors.notify_one(); // do not relax!
778 }
779 // process possible exception
780 auto exception = exec_context.my_exception.load(std::memory_order_acquire);
781 if (exception) {
782 __TBB_ASSERT(exec_context.is_group_execution_cancelled(), "The task group context with an exception should be canceled.");
783 exception->throw_self();
784 }
785 __TBB_ASSERT(governor::is_thread_data_set(td), nullptr);
786 return;
787 } // if (index1 == arena::out_of_arena)
788 } // if (!same_arena)
789
790 context_guard_helper</*report_tasks=*/false> context_guard;
791 context_guard.set_ctx(a->my_default_ctx);
792 nested_arena_context scope(*td, *a, index1);
793 #if _WIN64
794 try {
795 #endif
796 d();
797 __TBB_ASSERT(same_arena || governor::is_thread_data_set(td), nullptr);
798 #if _WIN64
799 } catch (...) {
800 context_guard.restore_default();
801 throw;
802 }
803 #endif
804 }
805
wait(d1::task_arena_base & ta)806 void task_arena_impl::wait(d1::task_arena_base& ta) {
807 arena* a = ta.my_arena.load(std::memory_order_relaxed);
808 __TBB_ASSERT(a != nullptr, nullptr);
809 thread_data* td = governor::get_thread_data();
810 __TBB_ASSERT_EX(td, "Scheduler is not initialized");
811 __TBB_ASSERT(td->my_arena != a || td->my_arena_index == 0, "internal_wait is not supported within a worker context" );
812 if (a->my_max_num_workers != 0) {
813 while (a->num_workers_active() || !a->is_empty()) {
814 yield();
815 }
816 }
817 }
818
max_concurrency(const d1::task_arena_base * ta)819 int task_arena_impl::max_concurrency(const d1::task_arena_base *ta) {
820 arena* a = nullptr;
821 if( ta ) // for special cases of ta->max_concurrency()
822 a = ta->my_arena.load(std::memory_order_relaxed);
823 else if( thread_data* td = governor::get_thread_data_if_initialized() )
824 a = td->my_arena; // the current arena if any
825
826 if( a ) { // Get parameters from the arena
827 __TBB_ASSERT( !ta || ta->my_max_concurrency==1, nullptr);
828 int mandatory_worker = 0;
829 if (a->is_arena_workerless() && a->my_num_reserved_slots == 1) {
830 mandatory_worker = a->my_mandatory_concurrency.test() ? 1 : 0;
831 }
832 return a->my_num_reserved_slots + a->my_max_num_workers + mandatory_worker;
833 }
834
835 if (ta && ta->my_max_concurrency == 1) {
836 return 1;
837 }
838
839 #if __TBB_ARENA_BINDING
840 if (ta) {
841 d1::constraints arena_constraints = d1::constraints{}
842 .set_numa_id(ta->my_numa_id)
843 .set_core_type(ta->core_type())
844 .set_max_threads_per_core(ta->max_threads_per_core());
845 return (int)default_concurrency(arena_constraints);
846 }
847 #endif /*!__TBB_ARENA_BINDING*/
848
849 __TBB_ASSERT(!ta || ta->my_max_concurrency==d1::task_arena_base::automatic, nullptr);
850 return int(governor::default_num_threads());
851 }
852
isolate_within_arena(d1::delegate_base & d,std::intptr_t isolation)853 void isolate_within_arena(d1::delegate_base& d, std::intptr_t isolation) {
854 // TODO: Decide what to do if the scheduler is not initialized. Is there a use case for it?
855 thread_data* tls = governor::get_thread_data();
856 assert_pointers_valid(tls, tls->my_task_dispatcher);
857 task_dispatcher* dispatcher = tls->my_task_dispatcher;
858 isolation_type previous_isolation = dispatcher->m_execute_data_ext.isolation;
859 try_call([&] {
860 // We temporarily change the isolation tag of the currently running task. It will be restored in the destructor of the guard.
861 isolation_type current_isolation = isolation ? isolation : reinterpret_cast<isolation_type>(&d);
862 // Save the current isolation value and set new one
863 previous_isolation = dispatcher->set_isolation(current_isolation);
864 // Isolation within this callable
865 d();
866 }).on_completion([&] {
867 __TBB_ASSERT(governor::get_thread_data()->my_task_dispatcher == dispatcher, nullptr);
868 dispatcher->set_isolation(previous_isolation);
869 });
870 }
871
872 } // namespace r1
873 } // namespace detail
874 } // namespace tbb
875