xref: /oneTBB/src/tbb/governor.cpp (revision 57f524ca)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "governor.h"
18 #include "main.h"
19 #include "thread_data.h"
20 #include "market.h"
21 #include "arena.h"
22 #include "dynamic_link.h"
23 #include "concurrent_monitor.h"
24 
25 #include "oneapi/tbb/task_group.h"
26 #include "oneapi/tbb/global_control.h"
27 #include "oneapi/tbb/tbb_allocator.h"
28 #include "oneapi/tbb/info.h"
29 
30 #include "task_dispatcher.h"
31 
32 #include <cstdio>
33 #include <cstdlib>
34 #include <cstring>
35 #include <atomic>
36 #include <algorithm>
37 
38 namespace tbb {
39 namespace detail {
40 namespace r1 {
41 
42 void clear_address_waiter_table();
43 
44 //! global_control.cpp contains definition
45 bool remove_and_check_if_empty(d1::global_control& gc);
46 bool is_present(d1::global_control& gc);
47 
48 namespace rml {
49 tbb_server* make_private_server( tbb_client& client );
50 } // namespace rml
51 
52 namespace system_topology {
53     void destroy();
54 }
55 
56 //------------------------------------------------------------------------
57 // governor
58 //------------------------------------------------------------------------
59 
60 void governor::acquire_resources () {
61 #if __TBB_USE_POSIX
62     int status = theTLS.create(auto_terminate);
63 #else
64     int status = theTLS.create();
65 #endif
66     if( status )
67         handle_perror(status, "TBB failed to initialize task scheduler TLS\n");
68     detect_cpu_features(cpu_features);
69 
70     is_rethrow_broken = gcc_rethrow_exception_broken();
71 }
72 
73 void governor::release_resources () {
74     theRMLServerFactory.close();
75     destroy_process_mask();
76 
77     __TBB_ASSERT(!(__TBB_InitOnce::initialization_done() && theTLS.get()), "TBB is unloaded while thread data still alive?");
78 
79     int status = theTLS.destroy();
80     if( status )
81         runtime_warning("failed to destroy task scheduler TLS: %s", std::strerror(status));
82     clear_address_waiter_table();
83 
84     system_topology::destroy();
85     dynamic_unlink_all();
86 }
87 
88 rml::tbb_server* governor::create_rml_server ( rml::tbb_client& client ) {
89     rml::tbb_server* server = nullptr;
90     if( !UsePrivateRML ) {
91         ::rml::factory::status_type status = theRMLServerFactory.make_server( server, client );
92         if( status != ::rml::factory::st_success ) {
93             UsePrivateRML = true;
94             runtime_warning( "rml::tbb_factory::make_server failed with status %x, falling back on private rml", status );
95         }
96     }
97     if ( !server ) {
98         __TBB_ASSERT( UsePrivateRML, nullptr);
99         server = rml::make_private_server( client );
100     }
101     __TBB_ASSERT( server, "Failed to create RML server" );
102     return server;
103 }
104 
105 void governor::one_time_init() {
106     if ( !__TBB_InitOnce::initialization_done() ) {
107         DoOneTimeInitialization();
108     }
109 }
110 
111 /*
112     There is no portable way to get stack base address in Posix, however the modern
113     Linux versions provide pthread_attr_np API that can be used  to obtain thread's
114     stack size and base address. Unfortunately even this function does not provide
115     enough information for the main thread on IA-64 architecture (RSE spill area
116     and memory stack are allocated as two separate discontinuous chunks of memory),
117     and there is no portable way to discern the main and the secondary threads.
118     Thus for macOS* and IA-64 architecture for Linux* OS we use the TBB worker stack size for
119     all threads and use the current stack top as the stack base. This simplified
120     approach is based on the following assumptions:
121     1) If the default stack size is insufficient for the user app needs, the
122     required amount will be explicitly specified by the user at the point of the
123     TBB scheduler initialization (as an argument to tbb::task_scheduler_init
124     constructor).
125     2) When an external thread initializes the scheduler, it has enough space on its
126     stack. Here "enough" means "at least as much as worker threads have".
127     3) If the user app strives to conserve the memory by cutting stack size, it
128     should do this for TBB workers too (as in the #1).
129 */
130 static std::uintptr_t get_stack_base(std::size_t stack_size) {
131     // Stacks are growing top-down. Highest address is called "stack base",
132     // and the lowest is "stack limit".
133 #if __TBB_USE_WINAPI
134     suppress_unused_warning(stack_size);
135     NT_TIB* pteb = (NT_TIB*)NtCurrentTeb();
136     __TBB_ASSERT(&pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB");
137     return reinterpret_cast<std::uintptr_t>(pteb->StackBase);
138 #else
139     // There is no portable way to get stack base address in Posix, so we use
140     // non-portable method (on all modern Linux) or the simplified approach
141     // based on the common sense assumptions. The most important assumption
142     // is that the main thread's stack size is not less than that of other threads.
143 
144     // Points to the lowest addressable byte of a stack.
145     void* stack_limit = nullptr;
146 #if __linux__ && !__bg__
147     size_t np_stack_size = 0;
148     pthread_attr_t np_attr_stack;
149     if (0 == pthread_getattr_np(pthread_self(), &np_attr_stack)) {
150         if (0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size)) {
151             __TBB_ASSERT( &stack_limit > stack_limit, "stack size must be positive" );
152         }
153         pthread_attr_destroy(&np_attr_stack);
154     }
155 #endif /* __linux__ */
156     std::uintptr_t stack_base{};
157     if (stack_limit) {
158         stack_base = reinterpret_cast<std::uintptr_t>(stack_limit) + stack_size;
159     } else {
160         // Use an anchor as a base stack address.
161         int anchor{};
162         stack_base = reinterpret_cast<std::uintptr_t>(&anchor);
163     }
164     return stack_base;
165 #endif /* __TBB_USE_WINAPI */
166 }
167 
168 #if (_WIN32||_WIN64) && !__TBB_DYNAMIC_LOAD_ENABLED
169 static void register_external_thread_destructor() {
170     struct thread_destructor {
171         ~thread_destructor() {
172             governor::terminate_external_thread();
173         }
174     };
175     // ~thread_destructor() will be call during the calling thread termination
176     static thread_local thread_destructor thr_destructor;
177 }
178 #endif // (_WIN32||_WIN64) && !__TBB_DYNAMIC_LOAD_ENABLED
179 
180 void governor::init_external_thread() {
181     one_time_init();
182     // Create new scheduler instance with arena
183     int num_slots = default_num_threads();
184     // TODO_REVAMP: support an external thread without an implicit arena
185     int num_reserved_slots = 1;
186     unsigned arena_priority_level = 1; // corresponds to tbb::task_arena::priority::normal
187     std::size_t stack_size = 0;
188     arena& a = *market::create_arena(num_slots, num_reserved_slots, arena_priority_level, stack_size);
189     // We need an internal reference to the market. TODO: is it legacy?
190     market::global_market(false);
191     // External thread always occupies the first slot
192     thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
193     td.attach_arena(a, /*slot index*/ 0);
194     __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
195 
196     stack_size = a.my_market->worker_stack_size();
197     std::uintptr_t stack_base = get_stack_base(stack_size);
198     task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
199     task_disp.set_stealing_threshold(calculate_stealing_threshold(stack_base, stack_size));
200     td.attach_task_dispatcher(task_disp);
201 
202     td.my_arena_slot->occupy();
203     a.my_market->add_external_thread(td);
204     set_thread_data(td);
205 #if (_WIN32||_WIN64) && !__TBB_DYNAMIC_LOAD_ENABLED
206     // The external thread destructor is called from dllMain but it is not available with a static build.
207     // Therefore, we need to register the current thread to call the destructor during thread termination.
208     register_external_thread_destructor();
209 #endif
210 }
211 
212 void governor::auto_terminate(void* tls) {
213     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr ||
214         get_thread_data_if_initialized() == tls, nullptr);
215     if (tls) {
216         thread_data* td = static_cast<thread_data*>(tls);
217 
218         // Only external thread can be inside an arena during termination.
219         if (td->my_arena_slot) {
220             arena* a = td->my_arena;
221             market* m = a->my_market;
222 
223             a->my_observers.notify_exit_observers(td->my_last_observer, td->my_is_worker);
224 
225             td->my_task_dispatcher->m_stealing_threshold = 0;
226             td->detach_task_dispatcher();
227             td->my_arena_slot->release();
228             // Release an arena
229             a->on_thread_leaving<arena::ref_external>();
230 
231             m->remove_external_thread(*td);
232             // If there was an associated arena, it added a public market reference
233             m->release( /*is_public*/ true, /*blocking_terminate*/ false);
234         }
235 
236         td->~thread_data();
237         cache_aligned_deallocate(td);
238 
239         clear_thread_data();
240     }
241     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr, nullptr);
242 }
243 
244 void governor::initialize_rml_factory () {
245     ::rml::factory::status_type res = theRMLServerFactory.open();
246     UsePrivateRML = res != ::rml::factory::st_success;
247 }
248 
249 void __TBB_EXPORTED_FUNC get(d1::task_scheduler_handle& handle) {
250     handle.m_ctl = new(allocate_memory(sizeof(global_control))) global_control(global_control::scheduler_handle, 1);
251 }
252 
253 void release_impl(d1::task_scheduler_handle& handle) {
254     if (handle.m_ctl != nullptr) {
255         handle.m_ctl->~global_control();
256         deallocate_memory(handle.m_ctl);
257         handle.m_ctl = nullptr;
258     }
259 }
260 
261 bool finalize_impl(d1::task_scheduler_handle& handle) {
262     __TBB_ASSERT_RELEASE(handle, "trying to finalize with null handle");
263     market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex );
264     bool ok = true; // ok if theMarket does not exist yet
265     market* m = market::theMarket; // read the state of theMarket
266     if (m != nullptr) {
267         lock.release();
268         __TBB_ASSERT(is_present(*handle.m_ctl), "finalize or release was already called on this object");
269         thread_data* td = governor::get_thread_data_if_initialized();
270         if (td) {
271             task_dispatcher* task_disp = td->my_task_dispatcher;
272             __TBB_ASSERT(task_disp, nullptr);
273             if (task_disp->m_properties.outermost && !td->my_is_worker) { // is not inside a parallel region
274                 governor::auto_terminate(td);
275             }
276         }
277         if (remove_and_check_if_empty(*handle.m_ctl)) {
278             ok = m->release(/*is_public*/ true, /*blocking_terminate*/ true);
279         } else {
280             ok = false;
281         }
282     }
283     return ok;
284 }
285 
286 bool __TBB_EXPORTED_FUNC finalize(d1::task_scheduler_handle& handle, std::intptr_t mode) {
287     if (mode == d1::release_nothrowing) {
288         release_impl(handle);
289         return true;
290     } else {
291         bool ok = finalize_impl(handle);
292         // TODO: it is unsafe when finalize is called concurrently and further library unload
293         release_impl(handle);
294         if (mode == d1::finalize_throwing && !ok) {
295             throw_exception(exception_id::unsafe_wait);
296         }
297         return ok;
298     }
299 }
300 
301 #if __TBB_ARENA_BINDING
302 
303 #if __TBB_WEAK_SYMBOLS_PRESENT
304 #pragma weak __TBB_internal_initialize_system_topology
305 #pragma weak __TBB_internal_destroy_system_topology
306 #pragma weak __TBB_internal_allocate_binding_handler
307 #pragma weak __TBB_internal_deallocate_binding_handler
308 #pragma weak __TBB_internal_apply_affinity
309 #pragma weak __TBB_internal_restore_affinity
310 #pragma weak __TBB_internal_get_default_concurrency
311 
312 extern "C" {
313 void __TBB_internal_initialize_system_topology(
314     size_t groups_num,
315     int& numa_nodes_count, int*& numa_indexes_list,
316     int& core_types_count, int*& core_types_indexes_list
317 );
318 void __TBB_internal_destroy_system_topology( );
319 
320 //TODO: consider renaming to `create_binding_handler` and `destroy_binding_handler`
321 binding_handler* __TBB_internal_allocate_binding_handler( int slot_num, int numa_id, int core_type_id, int max_threads_per_core );
322 void __TBB_internal_deallocate_binding_handler( binding_handler* handler_ptr );
323 
324 void __TBB_internal_apply_affinity( binding_handler* handler_ptr, int slot_num );
325 void __TBB_internal_restore_affinity( binding_handler* handler_ptr, int slot_num );
326 
327 int __TBB_internal_get_default_concurrency( int numa_id, int core_type_id, int max_threads_per_core );
328 }
329 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */
330 
331 // Stubs that will be used if TBBbind library is unavailable.
332 static void dummy_destroy_system_topology ( ) { }
333 static binding_handler* dummy_allocate_binding_handler ( int, int, int, int ) { return nullptr; }
334 static void dummy_deallocate_binding_handler ( binding_handler* ) { }
335 static void dummy_apply_affinity ( binding_handler*, int ) { }
336 static void dummy_restore_affinity ( binding_handler*, int ) { }
337 static int dummy_get_default_concurrency( int, int, int ) { return governor::default_num_threads(); }
338 
339 // Handlers for communication with TBBbind
340 static void (*initialize_system_topology_ptr)(
341     size_t groups_num,
342     int& numa_nodes_count, int*& numa_indexes_list,
343     int& core_types_count, int*& core_types_indexes_list
344 ) = nullptr;
345 static void (*destroy_system_topology_ptr)( ) = dummy_destroy_system_topology;
346 
347 static binding_handler* (*allocate_binding_handler_ptr)( int slot_num, int numa_id, int core_type_id, int max_threads_per_core )
348     = dummy_allocate_binding_handler;
349 static void (*deallocate_binding_handler_ptr)( binding_handler* handler_ptr )
350     = dummy_deallocate_binding_handler;
351 static void (*apply_affinity_ptr)( binding_handler* handler_ptr, int slot_num )
352     = dummy_apply_affinity;
353 static void (*restore_affinity_ptr)( binding_handler* handler_ptr, int slot_num )
354     = dummy_restore_affinity;
355 int (*get_default_concurrency_ptr)( int numa_id, int core_type_id, int max_threads_per_core )
356     = dummy_get_default_concurrency;
357 
358 #if _WIN32 || _WIN64 || __unix__
359 // Table describing how to link the handlers.
360 static const dynamic_link_descriptor TbbBindLinkTable[] = {
361     DLD(__TBB_internal_initialize_system_topology, initialize_system_topology_ptr),
362     DLD(__TBB_internal_destroy_system_topology, destroy_system_topology_ptr),
363     DLD(__TBB_internal_allocate_binding_handler, allocate_binding_handler_ptr),
364     DLD(__TBB_internal_deallocate_binding_handler, deallocate_binding_handler_ptr),
365     DLD(__TBB_internal_apply_affinity, apply_affinity_ptr),
366     DLD(__TBB_internal_restore_affinity, restore_affinity_ptr),
367     DLD(__TBB_internal_get_default_concurrency, get_default_concurrency_ptr)
368 };
369 
370 static const unsigned LinkTableSize = sizeof(TbbBindLinkTable) / sizeof(dynamic_link_descriptor);
371 
372 #if TBB_USE_DEBUG
373 #define DEBUG_SUFFIX "_debug"
374 #else
375 #define DEBUG_SUFFIX
376 #endif /* TBB_USE_DEBUG */
377 
378 #if _WIN32 || _WIN64
379 #define LIBRARY_EXTENSION ".dll"
380 #define LIBRARY_PREFIX
381 #elif __unix__
382 #define LIBRARY_EXTENSION __TBB_STRING(.so.3)
383 #define LIBRARY_PREFIX "lib"
384 #endif /* __unix__ */
385 
386 #define TBBBIND_NAME LIBRARY_PREFIX "tbbbind" DEBUG_SUFFIX LIBRARY_EXTENSION
387 #define TBBBIND_2_0_NAME LIBRARY_PREFIX "tbbbind_2_0" DEBUG_SUFFIX LIBRARY_EXTENSION
388 
389 #define TBBBIND_2_5_NAME LIBRARY_PREFIX "tbbbind_2_5" DEBUG_SUFFIX LIBRARY_EXTENSION
390 #endif /* _WIN32 || _WIN64 || __unix__ */
391 
392 // Representation of system hardware topology information on the TBB side.
393 // System topology may be initialized by third-party component (e.g. hwloc)
394 // or just filled in with default stubs.
395 namespace system_topology {
396 
397 constexpr int automatic = -1;
398 
399 static std::atomic<do_once_state> initialization_state;
400 
401 namespace {
402 int  numa_nodes_count = 0;
403 int* numa_nodes_indexes = nullptr;
404 
405 int  core_types_count = 0;
406 int* core_types_indexes = nullptr;
407 
408 const char* load_tbbbind_shared_object() {
409 #if _WIN32 || _WIN64 || __unix__
410 #if _WIN32 && !_WIN64
411     // For 32-bit Windows applications, process affinity masks can only support up to 32 logical CPUs.
412     SYSTEM_INFO si;
413     GetNativeSystemInfo(&si);
414     if (si.dwNumberOfProcessors > 32) return nullptr;
415 #endif /* _WIN32 && !_WIN64 */
416     for (const auto& tbbbind_version : {TBBBIND_2_5_NAME, TBBBIND_2_0_NAME, TBBBIND_NAME}) {
417         if (dynamic_link(tbbbind_version, TbbBindLinkTable, LinkTableSize, nullptr, DYNAMIC_LINK_LOCAL_BINDING)) {
418             return tbbbind_version;
419         }
420     }
421 #endif /* _WIN32 || _WIN64 || __unix__ */
422     return nullptr;
423 }
424 
425 int processor_groups_num() {
426 #if _WIN32
427     return NumberOfProcessorGroups();
428 #else
429     // Stub to improve code readability by reducing number of the compile-time conditions
430     return 1;
431 #endif
432 }
433 } // internal namespace
434 
435 // Tries to load TBBbind library API, if success, gets NUMA topology information from it,
436 // in another case, fills NUMA topology by stubs.
437 void initialization_impl() {
438     governor::one_time_init();
439 
440     if (const char* tbbbind_name = load_tbbbind_shared_object()) {
441         initialize_system_topology_ptr(
442             processor_groups_num(),
443             numa_nodes_count, numa_nodes_indexes,
444             core_types_count, core_types_indexes
445         );
446 
447         PrintExtraVersionInfo("TBBBIND", tbbbind_name);
448         return;
449     }
450 
451     static int dummy_index = automatic;
452 
453     numa_nodes_count = 1;
454     numa_nodes_indexes = &dummy_index;
455 
456     core_types_count = 1;
457     core_types_indexes = &dummy_index;
458 
459     PrintExtraVersionInfo("TBBBIND", "UNAVAILABLE");
460 }
461 
462 void initialize() {
463     atomic_do_once(initialization_impl, initialization_state);
464 }
465 
466 void destroy() {
467     destroy_system_topology_ptr();
468 }
469 } // namespace system_topology
470 
471 binding_handler* construct_binding_handler(int slot_num, int numa_id, int core_type_id, int max_threads_per_core) {
472     system_topology::initialize();
473     return allocate_binding_handler_ptr(slot_num, numa_id, core_type_id, max_threads_per_core);
474 }
475 
476 void destroy_binding_handler(binding_handler* handler_ptr) {
477     __TBB_ASSERT(deallocate_binding_handler_ptr, "tbbbind loading was not performed");
478     deallocate_binding_handler_ptr(handler_ptr);
479 }
480 
481 void apply_affinity_mask(binding_handler* handler_ptr, int slot_index) {
482     __TBB_ASSERT(slot_index >= 0, "Negative thread index");
483     __TBB_ASSERT(apply_affinity_ptr, "tbbbind loading was not performed");
484     apply_affinity_ptr(handler_ptr, slot_index);
485 }
486 
487 void restore_affinity_mask(binding_handler* handler_ptr, int slot_index) {
488     __TBB_ASSERT(slot_index >= 0, "Negative thread index");
489     __TBB_ASSERT(restore_affinity_ptr, "tbbbind loading was not performed");
490     restore_affinity_ptr(handler_ptr, slot_index);
491 }
492 
493 unsigned __TBB_EXPORTED_FUNC numa_node_count() {
494     system_topology::initialize();
495     return system_topology::numa_nodes_count;
496 }
497 
498 void __TBB_EXPORTED_FUNC fill_numa_indices(int* index_array) {
499     system_topology::initialize();
500     std::memcpy(index_array, system_topology::numa_nodes_indexes, system_topology::numa_nodes_count * sizeof(int));
501 }
502 
503 int __TBB_EXPORTED_FUNC numa_default_concurrency(int node_id) {
504     if (node_id >= 0) {
505         system_topology::initialize();
506         int result = get_default_concurrency_ptr(
507             node_id,
508             /*core_type*/system_topology::automatic,
509             /*threads_per_core*/system_topology::automatic
510         );
511         if (result > 0) return result;
512     }
513     return governor::default_num_threads();
514 }
515 
516 unsigned __TBB_EXPORTED_FUNC core_type_count(intptr_t /*reserved*/) {
517     system_topology::initialize();
518     return system_topology::core_types_count;
519 }
520 
521 void __TBB_EXPORTED_FUNC fill_core_type_indices(int* index_array, intptr_t /*reserved*/) {
522     system_topology::initialize();
523     std::memcpy(index_array, system_topology::core_types_indexes, system_topology::core_types_count * sizeof(int));
524 }
525 
526 void constraints_assertion(d1::constraints c) {
527     bool is_topology_initialized = system_topology::initialization_state == do_once_state::initialized;
528     __TBB_ASSERT_RELEASE(c.max_threads_per_core == system_topology::automatic || c.max_threads_per_core > 0,
529         "Wrong max_threads_per_core constraints field value.");
530 
531     auto numa_nodes_begin = system_topology::numa_nodes_indexes;
532     auto numa_nodes_end = system_topology::numa_nodes_indexes + system_topology::numa_nodes_count;
533     __TBB_ASSERT_RELEASE(
534         c.numa_id == system_topology::automatic ||
535         (is_topology_initialized && std::find(numa_nodes_begin, numa_nodes_end, c.numa_id) != numa_nodes_end),
536         "The constraints::numa_id value is not known to the library. Use tbb::info::numa_nodes() to get the list of possible values.");
537 
538     int* core_types_begin = system_topology::core_types_indexes;
539     int* core_types_end = system_topology::core_types_indexes + system_topology::core_types_count;
540     __TBB_ASSERT_RELEASE(c.core_type == system_topology::automatic ||
541         (is_topology_initialized && std::find(core_types_begin, core_types_end, c.core_type) != core_types_end),
542         "The constraints::core_type value is not known to the library. Use tbb::info::core_types() to get the list of possible values.");
543 }
544 
545 int __TBB_EXPORTED_FUNC constraints_default_concurrency(const d1::constraints& c, intptr_t /*reserved*/) {
546     constraints_assertion(c);
547 
548     if (c.numa_id >= 0 || c.core_type >= 0 || c.max_threads_per_core > 0) {
549         system_topology::initialize();
550         return get_default_concurrency_ptr(c.numa_id, c.core_type, c.max_threads_per_core);
551     }
552     return governor::default_num_threads();
553 }
554 
555 int __TBB_EXPORTED_FUNC constraints_threads_per_core(const d1::constraints&, intptr_t /*reserved*/) {
556     return system_topology::automatic;
557 }
558 #endif /* __TBB_ARENA_BINDING */
559 
560 } // namespace r1
561 } // namespace detail
562 } // namespace tbb
563