xref: /oneTBB/src/tbb/governor.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "governor.h"
18 #include "main.h"
19 #include "thread_data.h"
20 #include "market.h"
21 #include "arena.h"
22 #include "dynamic_link.h"
23 
24 #include "oneapi/tbb/task_group.h"
25 #include "oneapi/tbb/global_control.h"
26 #include "oneapi/tbb/tbb_allocator.h"
27 #include "oneapi/tbb/info.h"
28 
29 #include "task_dispatcher.h"
30 
31 #include <cstdio>
32 #include <cstdlib>
33 #include <cstring>
34 #include <atomic>
35 #include <algorithm>
36 
37 namespace tbb {
38 namespace detail {
39 namespace r1 {
40 
41 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
42 //! global_control.cpp contains definition
43 bool remove_and_check_if_empty(d1::global_control& gc);
44 bool is_present(d1::global_control& gc);
45 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
46 
47 namespace rml {
48 tbb_server* make_private_server( tbb_client& client );
49 } // namespace rml
50 
51 //------------------------------------------------------------------------
52 // governor
53 //------------------------------------------------------------------------
54 
55 void governor::acquire_resources () {
56 #if __TBB_USE_POSIX
57     int status = theTLS.create(auto_terminate);
58 #else
59     int status = theTLS.create();
60 #endif
61     if( status )
62         handle_perror(status, "TBB failed to initialize task scheduler TLS\n");
63     detect_cpu_features(cpu_features);
64     is_rethrow_broken = gcc_rethrow_exception_broken();
65 }
66 
67 void governor::release_resources () {
68     theRMLServerFactory.close();
69     destroy_process_mask();
70 
71     __TBB_ASSERT(!(__TBB_InitOnce::initialization_done() && theTLS.get()), "TBB is unloaded while thread data still alive?");
72 
73     int status = theTLS.destroy();
74     if( status )
75         runtime_warning("failed to destroy task scheduler TLS: %s", std::strerror(status));
76     dynamic_unlink_all();
77 }
78 
79 rml::tbb_server* governor::create_rml_server ( rml::tbb_client& client ) {
80     rml::tbb_server* server = NULL;
81     if( !UsePrivateRML ) {
82         ::rml::factory::status_type status = theRMLServerFactory.make_server( server, client );
83         if( status != ::rml::factory::st_success ) {
84             UsePrivateRML = true;
85             runtime_warning( "rml::tbb_factory::make_server failed with status %x, falling back on private rml", status );
86         }
87     }
88     if ( !server ) {
89         __TBB_ASSERT( UsePrivateRML, NULL );
90         server = rml::make_private_server( client );
91     }
92     __TBB_ASSERT( server, "Failed to create RML server" );
93     return server;
94 }
95 
96 void governor::one_time_init() {
97     if ( !__TBB_InitOnce::initialization_done() ) {
98         DoOneTimeInitialization();
99     }
100 }
101 
102 /*
103     There is no portable way to get stack base address in Posix, however the modern
104     Linux versions provide pthread_attr_np API that can be used  to obtain thread's
105     stack size and base address. Unfortunately even this function does not provide
106     enough information for the main thread on IA-64 architecture (RSE spill area
107     and memory stack are allocated as two separate discontinuous chunks of memory),
108     and there is no portable way to discern the main and the secondary threads.
109     Thus for macOS* and IA-64 architecture for Linux* OS we use the TBB worker stack size for
110     all threads and use the current stack top as the stack base. This simplified
111     approach is based on the following assumptions:
112     1) If the default stack size is insufficient for the user app needs, the
113     required amount will be explicitly specified by the user at the point of the
114     TBB scheduler initialization (as an argument to tbb::task_scheduler_init
115     constructor).
116     2) When an external thread initializes the scheduler, it has enough space on its
117     stack. Here "enough" means "at least as much as worker threads have".
118     3) If the user app strives to conserve the memory by cutting stack size, it
119     should do this for TBB workers too (as in the #1).
120 */
121 static std::uintptr_t get_stack_base(std::size_t stack_size) {
122     // Stacks are growing top-down. Highest address is called "stack base",
123     // and the lowest is "stack limit".
124 #if USE_WINTHREAD
125     suppress_unused_warning(stack_size);
126     NT_TIB* pteb = (NT_TIB*)NtCurrentTeb();
127     __TBB_ASSERT(&pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB");
128     return pteb->StackBase;
129 #else /* USE_PTHREAD */
130     // There is no portable way to get stack base address in Posix, so we use
131     // non-portable method (on all modern Linux) or the simplified approach
132     // based on the common sense assumptions. The most important assumption
133     // is that the main thread's stack size is not less than that of other threads.
134 
135     // Points to the lowest addressable byte of a stack.
136     void* stack_limit = nullptr;
137 #if __linux__ && !__bg__
138     size_t np_stack_size = 0;
139     pthread_attr_t np_attr_stack;
140     if (0 == pthread_getattr_np(pthread_self(), &np_attr_stack)) {
141         if (0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size)) {
142             __TBB_ASSERT( &stack_limit > stack_limit, "stack size must be positive" );
143         }
144         pthread_attr_destroy(&np_attr_stack);
145     }
146 #endif /* __linux__ */
147     std::uintptr_t stack_base{};
148     if (stack_limit) {
149         stack_base = reinterpret_cast<std::uintptr_t>(stack_limit) + stack_size;
150     } else {
151         // Use an anchor as a base stack address.
152         int anchor{};
153         stack_base = reinterpret_cast<std::uintptr_t>(&anchor);
154     }
155     return stack_base;
156 #endif /* USE_PTHREAD */
157 }
158 
159 void governor::init_external_thread() {
160     one_time_init();
161     // Create new scheduler instance with arena
162     int num_slots = default_num_threads();
163     // TODO_REVAMP: support an external thread without an implicit arena
164     int num_reserved_slots = 1;
165     unsigned arena_priority_level = 1; // corresponds to tbb::task_arena::priority::normal
166     std::size_t stack_size = 0;
167     arena& a = *market::create_arena(num_slots, num_reserved_slots, arena_priority_level, stack_size);
168     // We need an internal reference to the market. TODO: is it legacy?
169     market::global_market(false);
170     // External thread always occupies the first slot
171     thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
172     td.attach_arena(a, /*slot index*/ 0);
173 
174     stack_size = a.my_market->worker_stack_size();
175     std::uintptr_t stack_base = get_stack_base(stack_size);
176     task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
177     task_disp.set_stealing_threshold(calculate_stealing_threshold(stack_base, stack_size));
178     td.attach_task_dispatcher(task_disp);
179 
180     td.my_arena_slot->occupy();
181     a.my_market->add_external_thread(td);
182     set_thread_data(td);
183 }
184 
185 void governor::auto_terminate(void* tls) {
186     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr ||
187         get_thread_data_if_initialized() == tls, NULL);
188     if (tls) {
189         thread_data* td = static_cast<thread_data*>(tls);
190 
191         // Only external thread can be inside an arena during termination.
192         if (td->my_arena_slot) {
193             arena* a = td->my_arena;
194             market* m = a->my_market;
195 
196             a->my_observers.notify_exit_observers(td->my_last_observer, td->my_is_worker);
197 
198             td->my_task_dispatcher->m_stealing_threshold = 0;
199             td->detach_task_dispatcher();
200             td->my_arena_slot->release();
201             // Release an arena
202             a->on_thread_leaving<arena::ref_external>();
203 
204             m->remove_external_thread(*td);
205             // If there was an associated arena, it added a public market reference
206             m->release( /*is_public*/ true, /*blocking_terminate*/ false);
207         }
208 
209         td->~thread_data();
210         cache_aligned_deallocate(td);
211 
212         clear_thread_data();
213     }
214     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr, NULL);
215 }
216 
217 void governor::initialize_rml_factory () {
218     ::rml::factory::status_type res = theRMLServerFactory.open();
219     UsePrivateRML = res != ::rml::factory::st_success;
220 }
221 
222 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
223 void __TBB_EXPORTED_FUNC get(d1::task_scheduler_handle& handle) {
224     handle.m_ctl = new(allocate_memory(sizeof(global_control))) global_control(global_control::scheduler_handle, 1);
225 }
226 
227 void release_impl(d1::task_scheduler_handle& handle) {
228     if (handle.m_ctl != nullptr) {
229         handle.m_ctl->~global_control();
230         deallocate_memory(handle.m_ctl);
231         handle.m_ctl = nullptr;
232     }
233 }
234 
235 bool finalize_impl(d1::task_scheduler_handle& handle) {
236     market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex );
237     bool ok = true; // ok if theMarket does not exist yet
238     market* m = market::theMarket; // read the state of theMarket
239     if (m != nullptr) {
240         lock.release();
241         __TBB_ASSERT(is_present(*handle.m_ctl), "finalize or release was already called on this object");
242         thread_data* td = governor::get_thread_data_if_initialized();
243         if (td) {
244             task_dispatcher* task_disp = td->my_task_dispatcher;
245             __TBB_ASSERT(task_disp, nullptr);
246             if (task_disp->m_properties.outermost && !td->my_is_worker) { // is not inside a parallel region
247                 governor::auto_terminate(td);
248             }
249         }
250         if (remove_and_check_if_empty(*handle.m_ctl)) {
251             ok = m->release(/*is_public*/ true, /*blocking_terminate*/ true);
252         } else {
253             ok = false;
254         }
255     }
256     return ok;
257 }
258 
259 bool __TBB_EXPORTED_FUNC finalize(d1::task_scheduler_handle& handle, std::intptr_t mode) {
260     if (mode == d1::release_nothrowing) {
261         release_impl(handle);
262         return true;
263     } else {
264         bool ok = finalize_impl(handle);
265         // TODO: it is unsafe when finalize is called concurrently and further library unload
266         release_impl(handle);
267         if (mode == d1::finalize_throwing && !ok) {
268             throw_exception(exception_id::unsafe_wait);
269         }
270         return ok;
271     }
272 }
273 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
274 
275 #if __TBB_ARENA_BINDING
276 
277 #if __TBB_WEAK_SYMBOLS_PRESENT
278 #pragma weak __TBB_internal_initialize_system_topology
279 #pragma weak __TBB_internal_allocate_binding_handler
280 #pragma weak __TBB_internal_deallocate_binding_handler
281 #pragma weak __TBB_internal_apply_affinity
282 #pragma weak __TBB_internal_restore_affinity
283 #pragma weak __TBB_internal_get_default_concurrency
284 
285 extern "C" {
286 void __TBB_internal_initialize_system_topology(
287     size_t groups_num,
288     int& numa_nodes_count, int*& numa_indexes_list,
289     int& core_types_count, int*& core_types_indexes_list
290 );
291 
292 //TODO: consider renaming to `create_binding_handler` and `destroy_binding_handler`
293 binding_handler* __TBB_internal_allocate_binding_handler( int slot_num, int numa_id, int core_type_id, int max_threads_per_core );
294 void __TBB_internal_deallocate_binding_handler( binding_handler* handler_ptr );
295 
296 void __TBB_internal_apply_affinity( binding_handler* handler_ptr, int slot_num );
297 void __TBB_internal_restore_affinity( binding_handler* handler_ptr, int slot_num );
298 
299 int __TBB_internal_get_default_concurrency( int numa_id, int core_type_id, int max_threads_per_core );
300 }
301 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */
302 
303 // Stubs that will be used if TBBbind library is unavailable.
304 static binding_handler* dummy_allocate_binding_handler ( int, int, int, int ) { return nullptr; }
305 static void dummy_deallocate_binding_handler ( binding_handler* ) { }
306 static void dummy_apply_affinity ( binding_handler*, int ) { }
307 static void dummy_restore_affinity ( binding_handler*, int ) { }
308 static int dummy_get_default_concurrency( int, int, int ) { return governor::default_num_threads(); }
309 
310 // Handlers for communication with TBBbind
311 static void (*initialize_system_topology_ptr)(
312     size_t groups_num,
313     int& numa_nodes_count, int*& numa_indexes_list,
314     int& core_types_count, int*& core_types_indexes_list
315 ) = nullptr;
316 
317 static binding_handler* (*allocate_binding_handler_ptr)( int slot_num, int numa_id, int core_type_id, int max_threads_per_core )
318     = dummy_allocate_binding_handler;
319 static void (*deallocate_binding_handler_ptr)( binding_handler* handler_ptr )
320     = dummy_deallocate_binding_handler;
321 static void (*apply_affinity_ptr)( binding_handler* handler_ptr, int slot_num )
322     = dummy_apply_affinity;
323 static void (*restore_affinity_ptr)( binding_handler* handler_ptr, int slot_num )
324     = dummy_restore_affinity;
325 int (*get_default_concurrency_ptr)( int numa_id, int core_type_id, int max_threads_per_core )
326     = dummy_get_default_concurrency;
327 
328 #if _WIN32 || _WIN64 || __linux__
329 // Table describing how to link the handlers.
330 static const dynamic_link_descriptor TbbBindLinkTable[] = {
331     DLD(__TBB_internal_initialize_system_topology, initialize_system_topology_ptr),
332     DLD(__TBB_internal_allocate_binding_handler, allocate_binding_handler_ptr),
333     DLD(__TBB_internal_deallocate_binding_handler, deallocate_binding_handler_ptr),
334     DLD(__TBB_internal_apply_affinity, apply_affinity_ptr),
335     DLD(__TBB_internal_restore_affinity, restore_affinity_ptr),
336     DLD(__TBB_internal_get_default_concurrency, get_default_concurrency_ptr)
337 };
338 
339 static const unsigned LinkTableSize = sizeof(TbbBindLinkTable) / sizeof(dynamic_link_descriptor);
340 
341 #if TBB_USE_DEBUG
342 #define DEBUG_SUFFIX "_debug"
343 #else
344 #define DEBUG_SUFFIX
345 #endif /* TBB_USE_DEBUG */
346 
347 #if _WIN32 || _WIN64
348 #define LIBRARY_EXTENSION ".dll"
349 #define LIBRARY_PREFIX
350 #elif __linux__
351 #define LIBRARY_EXTENSION __TBB_STRING(.so.3)
352 #define LIBRARY_PREFIX "lib"
353 #endif /* __linux__ */
354 
355 #define TBBBIND_NAME LIBRARY_PREFIX "tbbbind" DEBUG_SUFFIX LIBRARY_EXTENSION
356 #define TBBBIND_2_0_NAME LIBRARY_PREFIX "tbbbind_2_0" DEBUG_SUFFIX LIBRARY_EXTENSION
357 #define TBBBIND_2_4_NAME LIBRARY_PREFIX "tbbbind_2_4" DEBUG_SUFFIX LIBRARY_EXTENSION
358 #endif /* _WIN32 || _WIN64 || __linux__ */
359 
360 // Representation of system hardware topology information on the TBB side.
361 // System topology may be initialized by third-party component (e.g. hwloc)
362 // or just filled in with default stubs.
363 namespace system_topology {
364 
365 constexpr int automatic = -1;
366 
367 static std::atomic<do_once_state> initialization_state;
368 
369 namespace {
370 int  numa_nodes_count = 0;
371 int* numa_nodes_indexes = nullptr;
372 
373 int  core_types_count = 0;
374 int* core_types_indexes = nullptr;
375 
376 const char* load_tbbbind_shared_object() {
377 #if _WIN32 || _WIN64 || __linux__
378 #if _WIN32 && !_WIN64
379     // For 32-bit Windows applications, process affinity masks can only support up to 32 logical CPUs.
380     SYSTEM_INFO si;
381     GetNativeSystemInfo(&si);
382     if (si.dwNumberOfProcessors > 32) return nullptr;
383 #endif /* _WIN32 && !_WIN64 */
384     for (const auto& tbbbind_version : {TBBBIND_2_4_NAME, TBBBIND_2_0_NAME, TBBBIND_NAME}) {
385         if (dynamic_link(tbbbind_version, TbbBindLinkTable, LinkTableSize)) {
386             return tbbbind_version;
387         }
388     }
389 #endif /* _WIN32 || _WIN64 || __linux__ */
390     return nullptr;
391 }
392 
393 int processor_groups_num() {
394 #if _WIN32
395     return NumberOfProcessorGroups();
396 #else
397     // Stub to improve code readability by reducing number of the compile-time conditions
398     return 1;
399 #endif
400 }
401 } // internal namespace
402 
403 // Tries to load TBBbind library API, if success, gets NUMA topology information from it,
404 // in another case, fills NUMA topology by stubs.
405 void initialization_impl() {
406     governor::one_time_init();
407 
408     if (const char* tbbbind_name = load_tbbbind_shared_object()) {
409         initialize_system_topology_ptr(
410             processor_groups_num(),
411             numa_nodes_count, numa_nodes_indexes,
412             core_types_count, core_types_indexes
413         );
414 
415         PrintExtraVersionInfo("TBBBIND", tbbbind_name);
416         return;
417     }
418 
419     static int dummy_index = automatic;
420 
421     numa_nodes_count = 1;
422     numa_nodes_indexes = &dummy_index;
423 
424     core_types_count = 1;
425     core_types_indexes = &dummy_index;
426 
427     PrintExtraVersionInfo("TBBBIND", "UNAVAILABLE");
428 }
429 
430 void initialize() {
431     atomic_do_once(initialization_impl, initialization_state);
432 }
433 } // namespace system_topology
434 
435 binding_handler* construct_binding_handler(int slot_num, int numa_id, int core_type_id, int max_threads_per_core) {
436     system_topology::initialize();
437     return allocate_binding_handler_ptr(slot_num, numa_id, core_type_id, max_threads_per_core);
438 }
439 
440 void destroy_binding_handler(binding_handler* handler_ptr) {
441     __TBB_ASSERT(deallocate_binding_handler_ptr, "tbbbind loading was not performed");
442     deallocate_binding_handler_ptr(handler_ptr);
443 }
444 
445 void apply_affinity_mask(binding_handler* handler_ptr, int slot_index) {
446     __TBB_ASSERT(slot_index >= 0, "Negative thread index");
447     __TBB_ASSERT(apply_affinity_ptr, "tbbbind loading was not performed");
448     apply_affinity_ptr(handler_ptr, slot_index);
449 }
450 
451 void restore_affinity_mask(binding_handler* handler_ptr, int slot_index) {
452     __TBB_ASSERT(slot_index >= 0, "Negative thread index");
453     __TBB_ASSERT(restore_affinity_ptr, "tbbbind loading was not performed");
454     restore_affinity_ptr(handler_ptr, slot_index);
455 }
456 
457 unsigned __TBB_EXPORTED_FUNC numa_node_count() {
458     system_topology::initialize();
459     return system_topology::numa_nodes_count;
460 }
461 
462 void __TBB_EXPORTED_FUNC fill_numa_indices(int* index_array) {
463     system_topology::initialize();
464     std::memcpy(index_array, system_topology::numa_nodes_indexes, system_topology::numa_nodes_count * sizeof(int));
465 }
466 
467 int __TBB_EXPORTED_FUNC numa_default_concurrency(int node_id) {
468     if (node_id >= 0) {
469         system_topology::initialize();
470         int result = get_default_concurrency_ptr(
471             node_id,
472             /*core_type*/system_topology::automatic,
473             /*threads_per_core*/system_topology::automatic
474         );
475         if (result > 0) return result;
476     }
477     return governor::default_num_threads();
478 }
479 
480 unsigned __TBB_EXPORTED_FUNC core_type_count(intptr_t /*reserved*/) {
481     system_topology::initialize();
482     return system_topology::core_types_count;
483 }
484 
485 void __TBB_EXPORTED_FUNC fill_core_type_indices(int* index_array, intptr_t /*reserved*/) {
486     system_topology::initialize();
487     std::memcpy(index_array, system_topology::core_types_indexes, system_topology::core_types_count * sizeof(int));
488 }
489 
490 void constraints_assertion(d1::constraints c) {
491     bool is_topology_initialized = system_topology::initialization_state == do_once_state::initialized;
492     __TBB_ASSERT_RELEASE(c.max_threads_per_core == system_topology::automatic || c.max_threads_per_core > 0,
493         "Wrong max_threads_per_core constraints field value.");
494 
495     auto numa_nodes_begin = system_topology::numa_nodes_indexes;
496     auto numa_nodes_end = system_topology::numa_nodes_indexes + system_topology::numa_nodes_count;
497     __TBB_ASSERT_RELEASE(
498         c.numa_id == system_topology::automatic ||
499         (is_topology_initialized && std::find(numa_nodes_begin, numa_nodes_end, c.numa_id) != numa_nodes_end),
500         "The constraints::numa_id value is not known to the library. Use tbb::info::numa_nodes() to get the list of possible values.");
501 
502     int* core_types_begin = system_topology::core_types_indexes;
503     int* core_types_end = system_topology::core_types_indexes + system_topology::core_types_count;
504     __TBB_ASSERT_RELEASE(c.core_type == system_topology::automatic ||
505         (is_topology_initialized && std::find(core_types_begin, core_types_end, c.core_type) != core_types_end),
506         "The constraints::core_type value is not known to the library. Use tbb::info::core_types() to get the list of possible values.");
507 }
508 
509 int __TBB_EXPORTED_FUNC constraints_default_concurrency(const d1::constraints& c, intptr_t /*reserved*/) {
510     constraints_assertion(c);
511 
512     if (c.numa_id >= 0 || c.core_type >= 0 || c.max_threads_per_core > 0) {
513         system_topology::initialize();
514         return get_default_concurrency_ptr(c.numa_id, c.core_type, c.max_threads_per_core);
515     }
516     return governor::default_num_threads();
517 }
518 
519 int __TBB_EXPORTED_FUNC constraints_threads_per_core(const d1::constraints&, intptr_t /*reserved*/) {
520     return system_topology::automatic;
521 }
522 #endif /* __TBB_ARENA_BINDING */
523 
524 } // namespace r1
525 } // namespace detail
526 } // namespace tbb
527