xref: /oneTBB/src/tbb/governor.cpp (revision 055cc6ea)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "governor.h"
18 #include "main.h"
19 #include "thread_data.h"
20 #include "market.h"
21 #include "arena.h"
22 #include "dynamic_link.h"
23 #include "concurrent_monitor.h"
24 
25 #include "oneapi/tbb/task_group.h"
26 #include "oneapi/tbb/global_control.h"
27 #include "oneapi/tbb/tbb_allocator.h"
28 #include "oneapi/tbb/info.h"
29 
30 #include "task_dispatcher.h"
31 
32 #include <cstdio>
33 #include <cstdlib>
34 #include <cstring>
35 #include <atomic>
36 #include <algorithm>
37 
38 namespace tbb {
39 namespace detail {
40 namespace r1 {
41 
42 void clear_address_waiter_table();
43 
44 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
45 //! global_control.cpp contains definition
46 bool remove_and_check_if_empty(d1::global_control& gc);
47 bool is_present(d1::global_control& gc);
48 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
49 
50 namespace rml {
51 tbb_server* make_private_server( tbb_client& client );
52 } // namespace rml
53 
54 //------------------------------------------------------------------------
55 // governor
56 //------------------------------------------------------------------------
57 
58 void governor::acquire_resources () {
59 #if __TBB_USE_POSIX
60     int status = theTLS.create(auto_terminate);
61 #else
62     int status = theTLS.create();
63 #endif
64     if( status )
65         handle_perror(status, "TBB failed to initialize task scheduler TLS\n");
66     detect_cpu_features(cpu_features);
67 
68     is_rethrow_broken = gcc_rethrow_exception_broken();
69 }
70 
71 void governor::release_resources () {
72     theRMLServerFactory.close();
73     destroy_process_mask();
74 
75     __TBB_ASSERT(!(__TBB_InitOnce::initialization_done() && theTLS.get()), "TBB is unloaded while thread data still alive?");
76 
77     int status = theTLS.destroy();
78     if( status )
79         runtime_warning("failed to destroy task scheduler TLS: %s", std::strerror(status));
80     clear_address_waiter_table();
81 
82     dynamic_unlink_all();
83 }
84 
85 rml::tbb_server* governor::create_rml_server ( rml::tbb_client& client ) {
86     rml::tbb_server* server = NULL;
87     if( !UsePrivateRML ) {
88         ::rml::factory::status_type status = theRMLServerFactory.make_server( server, client );
89         if( status != ::rml::factory::st_success ) {
90             UsePrivateRML = true;
91             runtime_warning( "rml::tbb_factory::make_server failed with status %x, falling back on private rml", status );
92         }
93     }
94     if ( !server ) {
95         __TBB_ASSERT( UsePrivateRML, NULL );
96         server = rml::make_private_server( client );
97     }
98     __TBB_ASSERT( server, "Failed to create RML server" );
99     return server;
100 }
101 
102 void governor::one_time_init() {
103     if ( !__TBB_InitOnce::initialization_done() ) {
104         DoOneTimeInitialization();
105     }
106 }
107 
108 /*
109     There is no portable way to get stack base address in Posix, however the modern
110     Linux versions provide pthread_attr_np API that can be used  to obtain thread's
111     stack size and base address. Unfortunately even this function does not provide
112     enough information for the main thread on IA-64 architecture (RSE spill area
113     and memory stack are allocated as two separate discontinuous chunks of memory),
114     and there is no portable way to discern the main and the secondary threads.
115     Thus for macOS* and IA-64 architecture for Linux* OS we use the TBB worker stack size for
116     all threads and use the current stack top as the stack base. This simplified
117     approach is based on the following assumptions:
118     1) If the default stack size is insufficient for the user app needs, the
119     required amount will be explicitly specified by the user at the point of the
120     TBB scheduler initialization (as an argument to tbb::task_scheduler_init
121     constructor).
122     2) When an external thread initializes the scheduler, it has enough space on its
123     stack. Here "enough" means "at least as much as worker threads have".
124     3) If the user app strives to conserve the memory by cutting stack size, it
125     should do this for TBB workers too (as in the #1).
126 */
127 static std::uintptr_t get_stack_base(std::size_t stack_size) {
128     // Stacks are growing top-down. Highest address is called "stack base",
129     // and the lowest is "stack limit".
130 #if USE_WINTHREAD
131     suppress_unused_warning(stack_size);
132     NT_TIB* pteb = (NT_TIB*)NtCurrentTeb();
133     __TBB_ASSERT(&pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB");
134     return pteb->StackBase;
135 #else /* USE_PTHREAD */
136     // There is no portable way to get stack base address in Posix, so we use
137     // non-portable method (on all modern Linux) or the simplified approach
138     // based on the common sense assumptions. The most important assumption
139     // is that the main thread's stack size is not less than that of other threads.
140 
141     // Points to the lowest addressable byte of a stack.
142     void* stack_limit = nullptr;
143 #if __linux__ && !__bg__
144     size_t np_stack_size = 0;
145     pthread_attr_t np_attr_stack;
146     if (0 == pthread_getattr_np(pthread_self(), &np_attr_stack)) {
147         if (0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size)) {
148             __TBB_ASSERT( &stack_limit > stack_limit, "stack size must be positive" );
149         }
150         pthread_attr_destroy(&np_attr_stack);
151     }
152 #endif /* __linux__ */
153     std::uintptr_t stack_base{};
154     if (stack_limit) {
155         stack_base = reinterpret_cast<std::uintptr_t>(stack_limit) + stack_size;
156     } else {
157         // Use an anchor as a base stack address.
158         int anchor{};
159         stack_base = reinterpret_cast<std::uintptr_t>(&anchor);
160     }
161     return stack_base;
162 #endif /* USE_PTHREAD */
163 }
164 
165 void governor::init_external_thread() {
166     one_time_init();
167     // Create new scheduler instance with arena
168     int num_slots = default_num_threads();
169     // TODO_REVAMP: support an external thread without an implicit arena
170     int num_reserved_slots = 1;
171     unsigned arena_priority_level = 1; // corresponds to tbb::task_arena::priority::normal
172     std::size_t stack_size = 0;
173     arena& a = *market::create_arena(num_slots, num_reserved_slots, arena_priority_level, stack_size);
174     // We need an internal reference to the market. TODO: is it legacy?
175     market::global_market(false);
176     // External thread always occupies the first slot
177     thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
178     td.attach_arena(a, /*slot index*/ 0);
179     __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr);
180 
181     stack_size = a.my_market->worker_stack_size();
182     std::uintptr_t stack_base = get_stack_base(stack_size);
183     task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
184     task_disp.set_stealing_threshold(calculate_stealing_threshold(stack_base, stack_size));
185     td.attach_task_dispatcher(task_disp);
186 
187     td.my_arena_slot->occupy();
188     a.my_market->add_external_thread(td);
189     set_thread_data(td);
190 }
191 
192 void governor::auto_terminate(void* tls) {
193     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr ||
194         get_thread_data_if_initialized() == tls, NULL);
195     if (tls) {
196         thread_data* td = static_cast<thread_data*>(tls);
197 
198         // Only external thread can be inside an arena during termination.
199         if (td->my_arena_slot) {
200             arena* a = td->my_arena;
201             market* m = a->my_market;
202 
203             a->my_observers.notify_exit_observers(td->my_last_observer, td->my_is_worker);
204 
205             td->my_task_dispatcher->m_stealing_threshold = 0;
206             td->detach_task_dispatcher();
207             td->my_arena_slot->release();
208             // Release an arena
209             a->on_thread_leaving<arena::ref_external>();
210 
211             m->remove_external_thread(*td);
212             // If there was an associated arena, it added a public market reference
213             m->release( /*is_public*/ true, /*blocking_terminate*/ false);
214         }
215 
216         td->~thread_data();
217         cache_aligned_deallocate(td);
218 
219         clear_thread_data();
220     }
221     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr, NULL);
222 }
223 
224 void governor::initialize_rml_factory () {
225     ::rml::factory::status_type res = theRMLServerFactory.open();
226     UsePrivateRML = res != ::rml::factory::st_success;
227 }
228 
229 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
230 void __TBB_EXPORTED_FUNC get(d1::task_scheduler_handle& handle) {
231     handle.m_ctl = new(allocate_memory(sizeof(global_control))) global_control(global_control::scheduler_handle, 1);
232 }
233 
234 void release_impl(d1::task_scheduler_handle& handle) {
235     if (handle.m_ctl != nullptr) {
236         handle.m_ctl->~global_control();
237         deallocate_memory(handle.m_ctl);
238         handle.m_ctl = nullptr;
239     }
240 }
241 
242 bool finalize_impl(d1::task_scheduler_handle& handle) {
243     __TBB_ASSERT_RELEASE(handle, "trying to finalize with null handle");
244     market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex );
245     bool ok = true; // ok if theMarket does not exist yet
246     market* m = market::theMarket; // read the state of theMarket
247     if (m != nullptr) {
248         lock.release();
249         __TBB_ASSERT(is_present(*handle.m_ctl), "finalize or release was already called on this object");
250         thread_data* td = governor::get_thread_data_if_initialized();
251         if (td) {
252             task_dispatcher* task_disp = td->my_task_dispatcher;
253             __TBB_ASSERT(task_disp, nullptr);
254             if (task_disp->m_properties.outermost && !td->my_is_worker) { // is not inside a parallel region
255                 governor::auto_terminate(td);
256             }
257         }
258         if (remove_and_check_if_empty(*handle.m_ctl)) {
259             ok = m->release(/*is_public*/ true, /*blocking_terminate*/ true);
260         } else {
261             ok = false;
262         }
263     }
264     return ok;
265 }
266 
267 bool __TBB_EXPORTED_FUNC finalize(d1::task_scheduler_handle& handle, std::intptr_t mode) {
268     if (mode == d1::release_nothrowing) {
269         release_impl(handle);
270         return true;
271     } else {
272         bool ok = finalize_impl(handle);
273         // TODO: it is unsafe when finalize is called concurrently and further library unload
274         release_impl(handle);
275         if (mode == d1::finalize_throwing && !ok) {
276             throw_exception(exception_id::unsafe_wait);
277         }
278         return ok;
279     }
280 }
281 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
282 
283 #if __TBB_ARENA_BINDING
284 
285 #if __TBB_WEAK_SYMBOLS_PRESENT
286 #pragma weak __TBB_internal_initialize_system_topology
287 #pragma weak __TBB_internal_allocate_binding_handler
288 #pragma weak __TBB_internal_deallocate_binding_handler
289 #pragma weak __TBB_internal_apply_affinity
290 #pragma weak __TBB_internal_restore_affinity
291 #pragma weak __TBB_internal_get_default_concurrency
292 
293 extern "C" {
294 void __TBB_internal_initialize_system_topology(
295     size_t groups_num,
296     int& numa_nodes_count, int*& numa_indexes_list,
297     int& core_types_count, int*& core_types_indexes_list
298 );
299 
300 //TODO: consider renaming to `create_binding_handler` and `destroy_binding_handler`
301 binding_handler* __TBB_internal_allocate_binding_handler( int slot_num, int numa_id, int core_type_id, int max_threads_per_core );
302 void __TBB_internal_deallocate_binding_handler( binding_handler* handler_ptr );
303 
304 void __TBB_internal_apply_affinity( binding_handler* handler_ptr, int slot_num );
305 void __TBB_internal_restore_affinity( binding_handler* handler_ptr, int slot_num );
306 
307 int __TBB_internal_get_default_concurrency( int numa_id, int core_type_id, int max_threads_per_core );
308 }
309 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */
310 
311 // Stubs that will be used if TBBbind library is unavailable.
312 static binding_handler* dummy_allocate_binding_handler ( int, int, int, int ) { return nullptr; }
313 static void dummy_deallocate_binding_handler ( binding_handler* ) { }
314 static void dummy_apply_affinity ( binding_handler*, int ) { }
315 static void dummy_restore_affinity ( binding_handler*, int ) { }
316 static int dummy_get_default_concurrency( int, int, int ) { return governor::default_num_threads(); }
317 
318 // Handlers for communication with TBBbind
319 static void (*initialize_system_topology_ptr)(
320     size_t groups_num,
321     int& numa_nodes_count, int*& numa_indexes_list,
322     int& core_types_count, int*& core_types_indexes_list
323 ) = nullptr;
324 
325 static binding_handler* (*allocate_binding_handler_ptr)( int slot_num, int numa_id, int core_type_id, int max_threads_per_core )
326     = dummy_allocate_binding_handler;
327 static void (*deallocate_binding_handler_ptr)( binding_handler* handler_ptr )
328     = dummy_deallocate_binding_handler;
329 static void (*apply_affinity_ptr)( binding_handler* handler_ptr, int slot_num )
330     = dummy_apply_affinity;
331 static void (*restore_affinity_ptr)( binding_handler* handler_ptr, int slot_num )
332     = dummy_restore_affinity;
333 int (*get_default_concurrency_ptr)( int numa_id, int core_type_id, int max_threads_per_core )
334     = dummy_get_default_concurrency;
335 
336 #if _WIN32 || _WIN64 || __unix__
337 // Table describing how to link the handlers.
338 static const dynamic_link_descriptor TbbBindLinkTable[] = {
339     DLD(__TBB_internal_initialize_system_topology, initialize_system_topology_ptr),
340     DLD(__TBB_internal_allocate_binding_handler, allocate_binding_handler_ptr),
341     DLD(__TBB_internal_deallocate_binding_handler, deallocate_binding_handler_ptr),
342     DLD(__TBB_internal_apply_affinity, apply_affinity_ptr),
343     DLD(__TBB_internal_restore_affinity, restore_affinity_ptr),
344     DLD(__TBB_internal_get_default_concurrency, get_default_concurrency_ptr)
345 };
346 
347 static const unsigned LinkTableSize = sizeof(TbbBindLinkTable) / sizeof(dynamic_link_descriptor);
348 
349 #if TBB_USE_DEBUG
350 #define DEBUG_SUFFIX "_debug"
351 #else
352 #define DEBUG_SUFFIX
353 #endif /* TBB_USE_DEBUG */
354 
355 #if _WIN32 || _WIN64
356 #define LIBRARY_EXTENSION ".dll"
357 #define LIBRARY_PREFIX
358 #elif __unix__
359 #define LIBRARY_EXTENSION __TBB_STRING(.so.3)
360 #define LIBRARY_PREFIX "lib"
361 #endif /* __unix__ */
362 
363 #define TBBBIND_NAME LIBRARY_PREFIX "tbbbind" DEBUG_SUFFIX LIBRARY_EXTENSION
364 #define TBBBIND_2_0_NAME LIBRARY_PREFIX "tbbbind_2_0" DEBUG_SUFFIX LIBRARY_EXTENSION
365 
366 #define TBBBIND_2_5_NAME LIBRARY_PREFIX "tbbbind_2_5" DEBUG_SUFFIX LIBRARY_EXTENSION
367 #endif /* _WIN32 || _WIN64 || __unix__ */
368 
369 // Representation of system hardware topology information on the TBB side.
370 // System topology may be initialized by third-party component (e.g. hwloc)
371 // or just filled in with default stubs.
372 namespace system_topology {
373 
374 constexpr int automatic = -1;
375 
376 static std::atomic<do_once_state> initialization_state;
377 
378 namespace {
379 int  numa_nodes_count = 0;
380 int* numa_nodes_indexes = nullptr;
381 
382 int  core_types_count = 0;
383 int* core_types_indexes = nullptr;
384 
385 const char* load_tbbbind_shared_object() {
386 #if _WIN32 || _WIN64 || __unix__
387 #if _WIN32 && !_WIN64
388     // For 32-bit Windows applications, process affinity masks can only support up to 32 logical CPUs.
389     SYSTEM_INFO si;
390     GetNativeSystemInfo(&si);
391     if (si.dwNumberOfProcessors > 32) return nullptr;
392 #endif /* _WIN32 && !_WIN64 */
393     for (const auto& tbbbind_version : {TBBBIND_2_5_NAME, TBBBIND_2_0_NAME, TBBBIND_NAME}) {
394         if (dynamic_link(tbbbind_version, TbbBindLinkTable, LinkTableSize, nullptr, DYNAMIC_LINK_LOCAL_BINDING)) {
395             return tbbbind_version;
396         }
397     }
398 #endif /* _WIN32 || _WIN64 || __unix__ */
399     return nullptr;
400 }
401 
402 int processor_groups_num() {
403 #if _WIN32
404     return NumberOfProcessorGroups();
405 #else
406     // Stub to improve code readability by reducing number of the compile-time conditions
407     return 1;
408 #endif
409 }
410 } // internal namespace
411 
412 // Tries to load TBBbind library API, if success, gets NUMA topology information from it,
413 // in another case, fills NUMA topology by stubs.
414 void initialization_impl() {
415     governor::one_time_init();
416 
417     if (const char* tbbbind_name = load_tbbbind_shared_object()) {
418         initialize_system_topology_ptr(
419             processor_groups_num(),
420             numa_nodes_count, numa_nodes_indexes,
421             core_types_count, core_types_indexes
422         );
423 
424         PrintExtraVersionInfo("TBBBIND", tbbbind_name);
425         return;
426     }
427 
428     static int dummy_index = automatic;
429 
430     numa_nodes_count = 1;
431     numa_nodes_indexes = &dummy_index;
432 
433     core_types_count = 1;
434     core_types_indexes = &dummy_index;
435 
436     PrintExtraVersionInfo("TBBBIND", "UNAVAILABLE");
437 }
438 
439 void initialize() {
440     atomic_do_once(initialization_impl, initialization_state);
441 }
442 } // namespace system_topology
443 
444 binding_handler* construct_binding_handler(int slot_num, int numa_id, int core_type_id, int max_threads_per_core) {
445     system_topology::initialize();
446     return allocate_binding_handler_ptr(slot_num, numa_id, core_type_id, max_threads_per_core);
447 }
448 
449 void destroy_binding_handler(binding_handler* handler_ptr) {
450     __TBB_ASSERT(deallocate_binding_handler_ptr, "tbbbind loading was not performed");
451     deallocate_binding_handler_ptr(handler_ptr);
452 }
453 
454 void apply_affinity_mask(binding_handler* handler_ptr, int slot_index) {
455     __TBB_ASSERT(slot_index >= 0, "Negative thread index");
456     __TBB_ASSERT(apply_affinity_ptr, "tbbbind loading was not performed");
457     apply_affinity_ptr(handler_ptr, slot_index);
458 }
459 
460 void restore_affinity_mask(binding_handler* handler_ptr, int slot_index) {
461     __TBB_ASSERT(slot_index >= 0, "Negative thread index");
462     __TBB_ASSERT(restore_affinity_ptr, "tbbbind loading was not performed");
463     restore_affinity_ptr(handler_ptr, slot_index);
464 }
465 
466 unsigned __TBB_EXPORTED_FUNC numa_node_count() {
467     system_topology::initialize();
468     return system_topology::numa_nodes_count;
469 }
470 
471 void __TBB_EXPORTED_FUNC fill_numa_indices(int* index_array) {
472     system_topology::initialize();
473     std::memcpy(index_array, system_topology::numa_nodes_indexes, system_topology::numa_nodes_count * sizeof(int));
474 }
475 
476 int __TBB_EXPORTED_FUNC numa_default_concurrency(int node_id) {
477     if (node_id >= 0) {
478         system_topology::initialize();
479         int result = get_default_concurrency_ptr(
480             node_id,
481             /*core_type*/system_topology::automatic,
482             /*threads_per_core*/system_topology::automatic
483         );
484         if (result > 0) return result;
485     }
486     return governor::default_num_threads();
487 }
488 
489 unsigned __TBB_EXPORTED_FUNC core_type_count(intptr_t /*reserved*/) {
490     system_topology::initialize();
491     return system_topology::core_types_count;
492 }
493 
494 void __TBB_EXPORTED_FUNC fill_core_type_indices(int* index_array, intptr_t /*reserved*/) {
495     system_topology::initialize();
496     std::memcpy(index_array, system_topology::core_types_indexes, system_topology::core_types_count * sizeof(int));
497 }
498 
499 void constraints_assertion(d1::constraints c) {
500     bool is_topology_initialized = system_topology::initialization_state == do_once_state::initialized;
501     __TBB_ASSERT_RELEASE(c.max_threads_per_core == system_topology::automatic || c.max_threads_per_core > 0,
502         "Wrong max_threads_per_core constraints field value.");
503 
504     auto numa_nodes_begin = system_topology::numa_nodes_indexes;
505     auto numa_nodes_end = system_topology::numa_nodes_indexes + system_topology::numa_nodes_count;
506     __TBB_ASSERT_RELEASE(
507         c.numa_id == system_topology::automatic ||
508         (is_topology_initialized && std::find(numa_nodes_begin, numa_nodes_end, c.numa_id) != numa_nodes_end),
509         "The constraints::numa_id value is not known to the library. Use tbb::info::numa_nodes() to get the list of possible values.");
510 
511     int* core_types_begin = system_topology::core_types_indexes;
512     int* core_types_end = system_topology::core_types_indexes + system_topology::core_types_count;
513     __TBB_ASSERT_RELEASE(c.core_type == system_topology::automatic ||
514         (is_topology_initialized && std::find(core_types_begin, core_types_end, c.core_type) != core_types_end),
515         "The constraints::core_type value is not known to the library. Use tbb::info::core_types() to get the list of possible values.");
516 }
517 
518 int __TBB_EXPORTED_FUNC constraints_default_concurrency(const d1::constraints& c, intptr_t /*reserved*/) {
519     constraints_assertion(c);
520 
521     if (c.numa_id >= 0 || c.core_type >= 0 || c.max_threads_per_core > 0) {
522         system_topology::initialize();
523         return get_default_concurrency_ptr(c.numa_id, c.core_type, c.max_threads_per_core);
524     }
525     return governor::default_num_threads();
526 }
527 
528 int __TBB_EXPORTED_FUNC constraints_threads_per_core(const d1::constraints&, intptr_t /*reserved*/) {
529     return system_topology::automatic;
530 }
531 #endif /* __TBB_ARENA_BINDING */
532 
533 } // namespace r1
534 } // namespace detail
535 } // namespace tbb
536