xref: /oneTBB/src/tbb/governor.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "governor.h"
18 #include "main.h"
19 #include "thread_data.h"
20 #include "market.h"
21 #include "arena.h"
22 #include "dynamic_link.h"
23 
24 #include "oneapi/tbb/task_group.h"
25 #include "oneapi/tbb/global_control.h"
26 #include "oneapi/tbb/tbb_allocator.h"
27 
28 #include "task_dispatcher.h"
29 
30 #include <cstdio>
31 #include <cstdlib>
32 #include <cstring>
33 #include <atomic>
34 
35 namespace tbb {
36 namespace detail {
37 namespace r1 {
38 
39 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
40 //! global_control.cpp contains definition
41 bool remove_and_check_if_empty(d1::global_control& gc);
42 bool is_present(d1::global_control& gc);
43 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
44 
45 namespace rml {
46 tbb_server* make_private_server( tbb_client& client );
47 } // namespace rml
48 
49 //------------------------------------------------------------------------
50 // governor
51 //------------------------------------------------------------------------
52 
53 void governor::acquire_resources () {
54 #if __TBB_USE_POSIX
55     int status = theTLS.create(auto_terminate);
56 #else
57     int status = theTLS.create();
58 #endif
59     if( status )
60         handle_perror(status, "TBB failed to initialize task scheduler TLS\n");
61     detect_cpu_features(cpu_features);
62     is_rethrow_broken = gcc_rethrow_exception_broken();
63 }
64 
65 void governor::release_resources () {
66     theRMLServerFactory.close();
67     destroy_process_mask();
68 
69     __TBB_ASSERT(!(__TBB_InitOnce::initialization_done() && theTLS.get()), "TBB is unloaded while thread data still alive?");
70 
71     int status = theTLS.destroy();
72     if( status )
73         runtime_warning("failed to destroy task scheduler TLS: %s", std::strerror(status));
74     dynamic_unlink_all();
75 }
76 
77 rml::tbb_server* governor::create_rml_server ( rml::tbb_client& client ) {
78     rml::tbb_server* server = NULL;
79     if( !UsePrivateRML ) {
80         ::rml::factory::status_type status = theRMLServerFactory.make_server( server, client );
81         if( status != ::rml::factory::st_success ) {
82             UsePrivateRML = true;
83             runtime_warning( "rml::tbb_factory::make_server failed with status %x, falling back on private rml", status );
84         }
85     }
86     if ( !server ) {
87         __TBB_ASSERT( UsePrivateRML, NULL );
88         server = rml::make_private_server( client );
89     }
90     __TBB_ASSERT( server, "Failed to create RML server" );
91     return server;
92 }
93 
94 void governor::one_time_init() {
95     if ( !__TBB_InitOnce::initialization_done() ) {
96         DoOneTimeInitialization();
97     }
98 }
99 
100 /*
101     There is no portable way to get stack base address in Posix, however the modern
102     Linux versions provide pthread_attr_np API that can be used  to obtain thread's
103     stack size and base address. Unfortunately even this function does not provide
104     enough information for the main thread on IA-64 architecture (RSE spill area
105     and memory stack are allocated as two separate discontinuous chunks of memory),
106     and there is no portable way to discern the main and the secondary threads.
107     Thus for macOS* and IA-64 architecture for Linux* OS we use the TBB worker stack size for
108     all threads and use the current stack top as the stack base. This simplified
109     approach is based on the following assumptions:
110     1) If the default stack size is insufficient for the user app needs, the
111     required amount will be explicitly specified by the user at the point of the
112     TBB scheduler initialization (as an argument to tbb::task_scheduler_init
113     constructor).
114     2) When a master thread initializes the scheduler, it has enough space on its
115     stack. Here "enough" means "at least as much as worker threads have".
116     3) If the user app strives to conserve the memory by cutting stack size, it
117     should do this for TBB workers too (as in the #1).
118 */
119 static std::uintptr_t get_stack_base(std::size_t stack_size) {
120     // Stacks are growing top-down. Highest address is called "stack base",
121     // and the lowest is "stack limit".
122 #if USE_WINTHREAD
123     suppress_unused_warning(stack_size);
124     NT_TIB* pteb = (NT_TIB*)NtCurrentTeb();
125     __TBB_ASSERT(&pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB");
126     return pteb->StackBase;
127 #else /* USE_PTHREAD */
128     // There is no portable way to get stack base address in Posix, so we use
129     // non-portable method (on all modern Linux) or the simplified approach
130     // based on the common sense assumptions. The most important assumption
131     // is that the main thread's stack size is not less than that of other threads.
132 
133     // Points to the lowest addressable byte of a stack.
134     void* stack_limit = nullptr;
135 #if __linux__ && !__bg__
136     size_t np_stack_size = 0;
137     pthread_attr_t np_attr_stack;
138     if (0 == pthread_getattr_np(pthread_self(), &np_attr_stack)) {
139         if (0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size)) {
140             __TBB_ASSERT( &stack_limit > stack_limit, "stack size must be positive" );
141         }
142         pthread_attr_destroy(&np_attr_stack);
143     }
144 #endif /* __linux__ */
145     std::uintptr_t stack_base{};
146     if (stack_limit) {
147         stack_base = reinterpret_cast<std::uintptr_t>(stack_limit) + stack_size;
148     } else {
149         // Use an anchor as a base stack address.
150         int anchor{};
151         stack_base = reinterpret_cast<std::uintptr_t>(&anchor);
152     }
153     return stack_base;
154 #endif /* USE_PTHREAD */
155 }
156 
157 void governor::init_external_thread() {
158     one_time_init();
159     // Create new scheduler instance with arena
160     int num_slots = default_num_threads();
161     // TODO_REVAMP: support arena-less masters
162     int num_reserved_slots = 1;
163     unsigned arena_priority_level = 1; // corresponds to tbb::task_arena::priority::normal
164     std::size_t stack_size = 0;
165     arena& a = *market::create_arena(num_slots, num_reserved_slots, arena_priority_level, stack_size);
166     // We need an internal reference to the market. TODO: is it legacy?
167     market::global_market(false);
168     // Master thread always occupies the first slot
169     thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
170     td.attach_arena(a, /*slot index*/ 0);
171 
172     stack_size = a.my_market->worker_stack_size();
173     std::uintptr_t stack_base = get_stack_base(stack_size);
174     task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
175     task_disp.set_stealing_threshold(calculate_stealing_threshold(stack_base, stack_size));
176     td.attach_task_dispatcher(task_disp);
177 
178     td.my_arena_slot->occupy();
179     a.my_market->add_external_thread(td);
180     set_thread_data(td);
181 }
182 
183 void governor::auto_terminate(void* tls) {
184     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr ||
185         get_thread_data_if_initialized() == tls, NULL);
186     if (tls) {
187         thread_data* td = static_cast<thread_data*>(tls);
188 
189         // Only external thread can be inside an arena during termination.
190         if (td->my_arena_slot) {
191             arena* a = td->my_arena;
192             market* m = a->my_market;
193 
194             a->my_observers.notify_exit_observers(td->my_last_observer, td->my_is_worker);
195 
196             td->my_task_dispatcher->m_stealing_threshold = 0;
197             td->detach_task_dispatcher();
198             td->my_arena_slot->release();
199             // Release an arena
200             a->on_thread_leaving<arena::ref_external>();
201 
202             m->remove_external_thread(*td);
203             // If there was an associated arena, it added a public market reference
204             m->release( /*is_public*/ true, /*blocking_terminate*/ false);
205         }
206 
207         td->~thread_data();
208         cache_aligned_deallocate(td);
209 
210         clear_thread_data();
211     }
212     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr, NULL);
213 }
214 
215 void governor::initialize_rml_factory () {
216     ::rml::factory::status_type res = theRMLServerFactory.open();
217     UsePrivateRML = res != ::rml::factory::st_success;
218 }
219 
220 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
221 void __TBB_EXPORTED_FUNC get(d1::task_scheduler_handle& handle) {
222     handle.m_ctl = new(allocate_memory(sizeof(global_control))) global_control(global_control::scheduler_handle, 1);
223 }
224 
225 void release_impl(d1::task_scheduler_handle& handle) {
226     if (handle.m_ctl != nullptr) {
227         handle.m_ctl->~global_control();
228         deallocate_memory(handle.m_ctl);
229         handle.m_ctl = nullptr;
230     }
231 }
232 
233 bool finalize_impl(d1::task_scheduler_handle& handle) {
234     market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex );
235     bool ok = true; // ok if theMarket does not exist yet
236     market* m = market::theMarket; // read the state of theMarket
237     if (m != nullptr) {
238         lock.release();
239         __TBB_ASSERT(is_present(*handle.m_ctl), "finalize or release was already called on this object");
240         thread_data* td = governor::get_thread_data_if_initialized();
241         if (td) {
242             task_dispatcher* task_disp = td->my_task_dispatcher;
243             __TBB_ASSERT(task_disp, nullptr);
244             if (task_disp->m_properties.outermost && !td->my_is_worker) { // is not inside a parallel region
245                 governor::auto_terminate(td);
246             }
247         }
248         if (remove_and_check_if_empty(*handle.m_ctl)) {
249             ok = m->release(/*is_public*/ true, /*blocking_terminate*/ true);
250         } else {
251             ok = false;
252         }
253     }
254     return ok;
255 }
256 
257 bool __TBB_EXPORTED_FUNC finalize(d1::task_scheduler_handle& handle, std::intptr_t mode) {
258     if (mode == d1::release_nothrowing) {
259         release_impl(handle);
260         return true;
261     } else {
262         bool ok = finalize_impl(handle);
263         // TODO: it is unsafe when finalize is called concurrently and further library unload
264         release_impl(handle);
265         if (mode == d1::finalize_throwing && !ok) {
266             throw_exception(exception_id::unsafe_wait);
267         }
268         return ok;
269     }
270 }
271 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
272 
273 #if __TBB_NUMA_SUPPORT
274 
275 #if __TBB_WEAK_SYMBOLS_PRESENT
276 #pragma weak __TBB_internal_initialize_numa_topology
277 #pragma weak __TBB_internal_allocate_binding_handler
278 #pragma weak __TBB_internal_deallocate_binding_handler
279 #pragma weak __TBB_internal_bind_to_node
280 #pragma weak __TBB_internal_restore_affinity
281 
282 extern "C" {
283 void __TBB_internal_initialize_numa_topology(
284     size_t groups_num, int& nodes_count, int*& indexes_list, int*& concurrency_list );
285 
286 //TODO: consider renaming to `create_binding_handler` and `destroy_binding_handler`
287 binding_handler* __TBB_internal_allocate_binding_handler( int slot_num );
288 void __TBB_internal_deallocate_binding_handler( binding_handler* handler_ptr );
289 
290 void __TBB_internal_bind_to_node( binding_handler* handler_ptr, int slot_num, int numa_id );
291 void __TBB_internal_restore_affinity( binding_handler* handler_ptr, int slot_num );
292 }
293 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */
294 
295 // Handlers for communication with TBBBind
296 static void (*initialize_numa_topology_ptr)(
297     size_t groups_num, int& nodes_count, int*& indexes_list, int*& concurrency_list ) = NULL;
298 
299 static binding_handler* (*allocate_binding_handler_ptr)( int slot_num ) = NULL;
300 static void (*deallocate_binding_handler_ptr)( binding_handler* handler_ptr ) = NULL;
301 
302 static void (*bind_to_node_ptr)( binding_handler* handler_ptr, int slot_num, int numa_id ) = NULL;
303 static void (*restore_affinity_ptr)( binding_handler* handler_ptr, int slot_num ) = NULL;
304 
305 #if _WIN32 || _WIN64 || __linux__
306 // Table describing how to link the handlers.
307 static const dynamic_link_descriptor TbbBindLinkTable[] = {
308     DLD(__TBB_internal_initialize_numa_topology, initialize_numa_topology_ptr),
309     DLD(__TBB_internal_allocate_binding_handler, allocate_binding_handler_ptr),
310     DLD(__TBB_internal_deallocate_binding_handler, deallocate_binding_handler_ptr),
311     DLD(__TBB_internal_bind_to_node, bind_to_node_ptr),
312     DLD(__TBB_internal_restore_affinity, restore_affinity_ptr)
313 };
314 
315 static const unsigned LinkTableSize = 5;
316 
317 #if TBB_USE_DEBUG
318 #define DEBUG_SUFFIX "_debug"
319 #else
320 #define DEBUG_SUFFIX
321 #endif /* TBB_USE_DEBUG */
322 
323 #if _WIN32 || _WIN64
324 #define LIBRARY_EXTENSION ".dll"
325 #define LIBRARY_PREFIX
326 #elif __linux__
327 #define LIBRARY_EXTENSION __TBB_STRING(.so.3)
328 #define LIBRARY_PREFIX "lib"
329 #endif /* __linux__ */
330 
331 #define TBBBIND_NAME LIBRARY_PREFIX "tbbbind" DEBUG_SUFFIX LIBRARY_EXTENSION
332 #define TBBBIND_2_0_NAME LIBRARY_PREFIX "tbbbind_2_0" DEBUG_SUFFIX LIBRARY_EXTENSION
333 #endif /* _WIN32 || _WIN64 || __linux__ */
334 
335 // Stubs that will be used if TBBbind library is unavailable.
336 static binding_handler* dummy_allocate_binding_handler ( int ) { return NULL; }
337 static void dummy_deallocate_binding_handler ( binding_handler* ) { }
338 static void dummy_bind_to_node ( binding_handler*, int, int ) { }
339 static void dummy_restore_affinity ( binding_handler*, int ) { }
340 
341 // Representation of NUMA topology information on the TBB side.
342 // NUMA topology may be initialized by third-party component (e.g. hwloc)
343 // or just filled by default stubs (1 NUMA node with 0 index and
344 // default_num_threads value as default_concurrency).
345 namespace numa_topology {
346 namespace {
347 int  numa_nodes_count = 0;
348 int* numa_indexes = NULL;
349 int* default_concurrency_list = NULL;
350 static std::atomic<do_once_state> numa_topology_init_state;
351 
352 const char* load_tbbbind_shared_object() {
353 #if _WIN32 || _WIN64 || __linux__
354 #if _WIN32 && !_WIN64
355     // For 32-bit Windows applications, process affinity masks can only support up to 32 logical CPUs.
356     SYSTEM_INFO si;
357     GetNativeSystemInfo(&si);
358     if (si.dwNumberOfProcessors > 32) return nullptr;
359 #endif /* _WIN32 && !_WIN64 */
360     for (const auto& tbbbind_version : {TBBBIND_2_0_NAME, TBBBIND_NAME}) {
361         if (dynamic_link(tbbbind_version, TbbBindLinkTable, LinkTableSize)) {
362             return tbbbind_version;
363         }
364     }
365 #endif /* _WIN32 || _WIN64 || __linux__ */
366     return nullptr;
367 }
368 
369 int processor_groups_num() {
370 #if _WIN32
371     return NumberOfProcessorGroups();
372 #else
373     // Stub to improve code readability by reducing number of the compile-time conditions
374     return 1;
375 #endif
376 }
377 } // internal namespace
378 
379 // Tries to load TBBbind library API, if success, gets NUMA topology information from it,
380 // in another case, fills NUMA topology by stubs.
381 void initialization_impl() {
382     governor::one_time_init();
383 
384     if (const char* tbbbind_name = load_tbbbind_shared_object()) {
385         initialize_numa_topology_ptr(
386             processor_groups_num(), numa_nodes_count, numa_indexes, default_concurrency_list
387         );
388 
389         if (numa_nodes_count==1 && numa_indexes[0] >= 0) {
390             __TBB_ASSERT(
391                 (int)governor::default_num_threads() == default_concurrency_list[numa_indexes[0]],
392                 "default_concurrency() should be equal to governor::default_num_threads() on single NUMA node systems."
393             );
394         }
395         PrintExtraVersionInfo( "TBBBIND", tbbbind_name );
396         return;
397     }
398 
399     static int dummy_index = -1;
400     static int dummy_concurrency = governor::default_num_threads();
401 
402     numa_nodes_count = 1;
403     numa_indexes = &dummy_index;
404     default_concurrency_list = &dummy_concurrency;
405 
406     allocate_binding_handler_ptr = dummy_allocate_binding_handler;
407     deallocate_binding_handler_ptr = dummy_deallocate_binding_handler;
408 
409     bind_to_node_ptr = dummy_bind_to_node;
410     restore_affinity_ptr = dummy_restore_affinity;
411     PrintExtraVersionInfo( "TBBBIND", "UNAVAILABLE" );
412 }
413 
414 void initialize() {
415     atomic_do_once(initialization_impl, numa_topology_init_state);
416 }
417 } // namespace numa_topology
418 
419 binding_handler* construct_binding_handler(int slot_num) {
420     __TBB_ASSERT(allocate_binding_handler_ptr, "tbbbind loading was not performed");
421     return allocate_binding_handler_ptr(slot_num);
422 }
423 
424 void destroy_binding_handler(binding_handler* handler_ptr) {
425     __TBB_ASSERT(deallocate_binding_handler_ptr, "tbbbind loading was not performed");
426     deallocate_binding_handler_ptr(handler_ptr);
427 }
428 
429 void bind_thread_to_node(binding_handler* handler_ptr, int slot_num , int numa_id) {
430     __TBB_ASSERT(slot_num >= 0, "Negative thread index");
431     __TBB_ASSERT(bind_to_node_ptr, "tbbbind loading was not performed");
432     bind_to_node_ptr(handler_ptr, slot_num, numa_id);
433 }
434 
435 void restore_affinity_mask(binding_handler* handler_ptr, int slot_num) {
436     __TBB_ASSERT(slot_num >= 0, "Negative thread index");
437     __TBB_ASSERT(restore_affinity_ptr, "tbbbind loading was not performed");
438     restore_affinity_ptr(handler_ptr, slot_num);
439 }
440 
441 unsigned __TBB_EXPORTED_FUNC numa_node_count() {
442     numa_topology::initialize();
443     return numa_topology::numa_nodes_count;
444 }
445 void __TBB_EXPORTED_FUNC fill_numa_indices(int* index_array) {
446     numa_topology::initialize();
447     for (int i = 0; i < numa_topology::numa_nodes_count; i++) {
448         index_array[i] = numa_topology::numa_indexes[i];
449     }
450 }
451 int __TBB_EXPORTED_FUNC numa_default_concurrency(int node_id) {
452     if (node_id >= 0) {
453         numa_topology::initialize();
454         return numa_topology::default_concurrency_list[node_id];
455     }
456     return governor::default_num_threads();
457 }
458 #endif /* __TBB_NUMA_SUPPORT */
459 
460 } // namespace r1
461 } // namespace detail
462 } // namespace tbb
463