xref: /oneTBB/src/tbb/governor.cpp (revision 8dcbd5b1)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "governor.h"
18 #include "main.h"
19 #include "thread_data.h"
20 #include "market.h"
21 #include "arena.h"
22 #include "dynamic_link.h"
23 
24 #include "oneapi/tbb/task_group.h"
25 #include "oneapi/tbb/global_control.h"
26 #include "oneapi/tbb/tbb_allocator.h"
27 
28 #include "task_dispatcher.h"
29 
30 #include <cstdio>
31 #include <cstdlib>
32 #include <cstring>
33 #include <atomic>
34 
35 namespace tbb {
36 namespace detail {
37 namespace r1 {
38 
39 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
40 //! global_control.cpp contains definition
41 bool remove_and_check_if_empty(d1::global_control& gc);
42 bool is_present(d1::global_control& gc);
43 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
44 
45 namespace rml {
46 tbb_server* make_private_server( tbb_client& client );
47 } // namespace rml
48 
49 //------------------------------------------------------------------------
50 // governor
51 //------------------------------------------------------------------------
52 
53 void governor::acquire_resources () {
54 #if __TBB_USE_POSIX
55     int status = theTLS.create(auto_terminate);
56 #else
57     int status = theTLS.create();
58 #endif
59     if( status )
60         handle_perror(status, "TBB failed to initialize task scheduler TLS\n");
61     detect_cpu_features(cpu_features);
62     is_rethrow_broken = gcc_rethrow_exception_broken();
63 }
64 
65 void governor::release_resources () {
66     theRMLServerFactory.close();
67     destroy_process_mask();
68 
69     __TBB_ASSERT(!(__TBB_InitOnce::initialization_done() && theTLS.get()), "TBB is unloaded while thread data still alive?");
70 
71     int status = theTLS.destroy();
72     if( status )
73         runtime_warning("failed to destroy task scheduler TLS: %s", std::strerror(status));
74     dynamic_unlink_all();
75 }
76 
77 rml::tbb_server* governor::create_rml_server ( rml::tbb_client& client ) {
78     rml::tbb_server* server = NULL;
79     if( !UsePrivateRML ) {
80         ::rml::factory::status_type status = theRMLServerFactory.make_server( server, client );
81         if( status != ::rml::factory::st_success ) {
82             UsePrivateRML = true;
83             runtime_warning( "rml::tbb_factory::make_server failed with status %x, falling back on private rml", status );
84         }
85     }
86     if ( !server ) {
87         __TBB_ASSERT( UsePrivateRML, NULL );
88         server = rml::make_private_server( client );
89     }
90     __TBB_ASSERT( server, "Failed to create RML server" );
91     return server;
92 }
93 
94 void governor::one_time_init() {
95     if ( !__TBB_InitOnce::initialization_done() ) {
96         DoOneTimeInitialization();
97     }
98 }
99 
100 /*
101     There is no portable way to get stack base address in Posix, however the modern
102     Linux versions provide pthread_attr_np API that can be used  to obtain thread's
103     stack size and base address. Unfortunately even this function does not provide
104     enough information for the main thread on IA-64 architecture (RSE spill area
105     and memory stack are allocated as two separate discontinuous chunks of memory),
106     and there is no portable way to discern the main and the secondary threads.
107     Thus for macOS* and IA-64 architecture for Linux* OS we use the TBB worker stack size for
108     all threads and use the current stack top as the stack base. This simplified
109     approach is based on the following assumptions:
110     1) If the default stack size is insufficient for the user app needs, the
111     required amount will be explicitly specified by the user at the point of the
112     TBB scheduler initialization (as an argument to tbb::task_scheduler_init
113     constructor).
114     2) When a master thread initializes the scheduler, it has enough space on its
115     stack. Here "enough" means "at least as much as worker threads have".
116     3) If the user app strives to conserve the memory by cutting stack size, it
117     should do this for TBB workers too (as in the #1).
118 */
119 static std::uintptr_t get_stack_base(std::size_t stack_size) {
120     // Stacks are growing top-down. Highest address is called "stack base",
121     // and the lowest is "stack limit".
122 #if USE_WINTHREAD
123     suppress_unused_warning(stack_size);
124     NT_TIB* pteb = (NT_TIB*)NtCurrentTeb();
125     __TBB_ASSERT(&pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB");
126     return pteb->StackBase;
127 #else /* USE_PTHREAD */
128     // There is no portable way to get stack base address in Posix, so we use
129     // non-portable method (on all modern Linux) or the simplified approach
130     // based on the common sense assumptions. The most important assumption
131     // is that the main thread's stack size is not less than that of other threads.
132 
133     // Points to the lowest addressable byte of a stack.
134     void* stack_limit = nullptr;
135 #if __linux__ && !__bg__
136     size_t np_stack_size = 0;
137     pthread_attr_t np_attr_stack;
138     if (0 == pthread_getattr_np(pthread_self(), &np_attr_stack)) {
139         if (0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size)) {
140             __TBB_ASSERT( &stack_limit > stack_limit, "stack size must be positive" );
141         }
142         pthread_attr_destroy(&np_attr_stack);
143     }
144 #endif /* __linux__ */
145     std::uintptr_t stack_base{};
146     if (stack_limit) {
147         stack_base = reinterpret_cast<std::uintptr_t>(stack_limit) + stack_size;
148     } else {
149         // Use an anchor as a base stack address.
150         int anchor{};
151         stack_base = reinterpret_cast<std::uintptr_t>(&anchor);
152     }
153     return stack_base;
154 #endif /* USE_PTHREAD */
155 }
156 
157 void governor::init_external_thread() {
158     one_time_init();
159     // Create new scheduler instance with arena
160     int num_slots = default_num_threads();
161     // TODO_REVAMP: support arena-less masters
162     int num_reserved_slots = 1;
163     unsigned arena_priority_level = 1; // corresponds to tbb::task_arena::priority::normal
164     std::size_t stack_size = 0;
165     arena& a = *market::create_arena(num_slots, num_reserved_slots, arena_priority_level, stack_size);
166     // We need an internal reference to the market. TODO: is it legacy?
167     market::global_market(false);
168     // Master thread always occupies the first slot
169     thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false);
170     td.attach_arena(a, /*slot index*/ 0);
171 
172     stack_size = a.my_market->worker_stack_size();
173     std::uintptr_t stack_base = get_stack_base(stack_size);
174     task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher();
175     task_disp.set_stealing_threshold(calculate_stealing_threshold(stack_base, stack_size));
176     td.attach_task_dispatcher(task_disp);
177 
178     td.my_arena_slot->occupy();
179     a.my_market->add_external_thread(td);
180     set_thread_data(td);
181 }
182 
183 void governor::auto_terminate(void* tls) {
184     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr ||
185         get_thread_data_if_initialized() == tls, NULL);
186     if (tls) {
187         thread_data* td = static_cast<thread_data*>(tls);
188 
189         // Only external thread can be inside an arena during termination.
190         if (td->my_arena_slot) {
191             arena* a = td->my_arena;
192             market* m = a->my_market;
193 
194             a->my_observers.notify_exit_observers(td->my_last_observer, td->my_is_worker);
195 
196             td->my_task_dispatcher->m_stealing_threshold = 0;
197             td->detach_task_dispatcher();
198             td->my_arena_slot->release();
199             // Release an arena
200             a->on_thread_leaving<arena::ref_external>();
201 
202             m->remove_external_thread(*td);
203             // If there was an associated arena, it added a public market reference
204             m->release( /*is_public*/ true, /*blocking_terminate*/ false);
205         }
206 
207         td->~thread_data();
208         cache_aligned_deallocate(td);
209 
210         clear_thread_data();
211     }
212     __TBB_ASSERT(get_thread_data_if_initialized() == nullptr, NULL);
213 }
214 
215 void governor::initialize_rml_factory () {
216     ::rml::factory::status_type res = theRMLServerFactory.open();
217     UsePrivateRML = res != ::rml::factory::st_success;
218 }
219 
220 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
221 void __TBB_EXPORTED_FUNC get(d1::task_scheduler_handle& handle) {
222     handle.m_ctl = new(allocate_memory(sizeof(global_control))) global_control(global_control::scheduler_handle, 1);
223 }
224 
225 void release_impl(d1::task_scheduler_handle& handle) {
226     if (handle.m_ctl != nullptr) {
227         handle.m_ctl->~global_control();
228         deallocate_memory(handle.m_ctl);
229         handle.m_ctl = nullptr;
230     }
231 }
232 
233 bool finalize_impl(d1::task_scheduler_handle& handle) {
234     market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex );
235     bool ok = true; // ok if theMarket does not exist yet
236     market* m = market::theMarket; // read the state of theMarket
237     if (m != nullptr) {
238         lock.release();
239         __TBB_ASSERT(is_present(*handle.m_ctl), "finalize or release was already called on this object");
240         thread_data* td = governor::get_thread_data_if_initialized();
241         if (td) {
242             task_dispatcher* task_disp = td->my_task_dispatcher;
243             __TBB_ASSERT(task_disp, nullptr);
244             if (task_disp->m_properties.outermost && !td->my_is_worker) { // is not inside a parallel region
245                 governor::auto_terminate(td);
246             }
247         }
248         if (remove_and_check_if_empty(*handle.m_ctl)) {
249             ok = m->release(/*is_public*/ true, /*blocking_terminate*/ true);
250         } else {
251             ok = false;
252         }
253     }
254     return ok;
255 }
256 
257 bool __TBB_EXPORTED_FUNC finalize(d1::task_scheduler_handle& handle, std::intptr_t mode) {
258     if (mode == d1::release_nothrowing) {
259         release_impl(handle);
260         return true;
261     } else {
262         bool ok = finalize_impl(handle);
263         // TODO: it is unsafe when finalize is called concurrently and further library unload
264         release_impl(handle);
265         if (mode == d1::finalize_throwing && !ok) {
266             throw_exception(exception_id::unsafe_wait);
267         }
268         return ok;
269     }
270 }
271 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE
272 
273 #if __TBB_NUMA_SUPPORT
274 
275 #if __TBB_WEAK_SYMBOLS_PRESENT
276 #pragma weak __TBB_internal_initialize_numa_topology
277 #pragma weak __TBB_internal_allocate_binding_handler
278 #pragma weak __TBB_internal_deallocate_binding_handler
279 #pragma weak __TBB_internal_bind_to_node
280 #pragma weak __TBB_internal_restore_affinity
281 
282 extern "C" {
283 void __TBB_internal_initialize_numa_topology(
284     size_t groups_num, int& nodes_count, int*& indexes_list, int*& concurrency_list );
285 
286 //TODO: consider renaming to `create_binding_handler` and `destroy_binding_handler`
287 binding_handler* __TBB_internal_allocate_binding_handler( int slot_num );
288 void __TBB_internal_deallocate_binding_handler( binding_handler* handler_ptr );
289 
290 void __TBB_internal_bind_to_node( binding_handler* handler_ptr, int slot_num, int numa_id );
291 void __TBB_internal_restore_affinity( binding_handler* handler_ptr, int slot_num );
292 }
293 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */
294 
295 // Handlers for communication with TBBbind
296 #if _WIN32 || _WIN64 || __linux__
297 static void (*initialize_numa_topology_ptr)(
298     size_t groups_num, int& nodes_count, int*& indexes_list, int*& concurrency_list ) = NULL;
299 #endif /* _WIN32 || _WIN64 || __linux__ */
300 
301 static binding_handler* (*allocate_binding_handler_ptr)( int slot_num ) = NULL;
302 static void (*deallocate_binding_handler_ptr)( binding_handler* handler_ptr ) = NULL;
303 
304 static void (*bind_to_node_ptr)( binding_handler* handler_ptr, int slot_num, int numa_id ) = NULL;
305 static void (*restore_affinity_ptr)( binding_handler* handler_ptr, int slot_num ) = NULL;
306 
307 #if _WIN32 || _WIN64 || __linux__
308 // Table describing how to link the handlers.
309 static const dynamic_link_descriptor TbbBindLinkTable[] = {
310     DLD(__TBB_internal_initialize_numa_topology, initialize_numa_topology_ptr),
311     DLD(__TBB_internal_allocate_binding_handler, allocate_binding_handler_ptr),
312     DLD(__TBB_internal_deallocate_binding_handler, deallocate_binding_handler_ptr),
313     DLD(__TBB_internal_bind_to_node, bind_to_node_ptr),
314     DLD(__TBB_internal_restore_affinity, restore_affinity_ptr)
315 };
316 
317 static const unsigned LinkTableSize = 5;
318 
319 #if TBB_USE_DEBUG
320 #define DEBUG_SUFFIX "_debug"
321 #else
322 #define DEBUG_SUFFIX
323 #endif /* TBB_USE_DEBUG */
324 
325 #if _WIN32 || _WIN64
326 #define TBBBIND_NAME "tbbbind" DEBUG_SUFFIX ".dll"
327 #elif __linux__
328 #define TBBBIND_NAME "libtbbbind" DEBUG_SUFFIX __TBB_STRING(.so.3)
329 #endif /* __linux__ */
330 #endif /* _WIN32 || _WIN64 || __linux__ */
331 
332 // Stubs that will be used if TBBbind library is unavailable.
333 static binding_handler* dummy_allocate_binding_handler ( int ) { return NULL; }
334 static void dummy_deallocate_binding_handler ( binding_handler* ) { }
335 static void dummy_bind_to_node ( binding_handler*, int, int ) { }
336 static void dummy_restore_affinity ( binding_handler*, int ) { }
337 
338 // Representation of NUMA topology information on the TBB side.
339 // NUMA topology may be initialized by third-party component (e.g. hwloc)
340 // or just filled by default stubs (1 NUMA node with 0 index and
341 // default_num_threads value as default_concurrency).
342 namespace numa_topology {
343 namespace {
344 int  numa_nodes_count = 0;
345 int* numa_indexes = NULL;
346 int* default_concurrency_list = NULL;
347 static std::atomic<do_once_state> numa_topology_init_state;
348 } // internal namespace
349 
350 // Tries to load TBBbind library API, if success, gets NUMA topology information from it,
351 // in another case, fills NUMA topology by stubs.
352 // TODO: Add TBBbind loading status if TBB_VERSION is set.
353 void initialization_impl() {
354     governor::one_time_init();
355 
356 #if _WIN32 || _WIN64 || __linux__
357     bool load_tbbbind = true;
358 #if _WIN32 && !_WIN64
359     // For 32-bit Windows applications, process affinity masks can only support up to 32 logical CPUs.
360     SYSTEM_INFO si;
361     GetNativeSystemInfo(&si);
362     load_tbbbind = si.dwNumberOfProcessors <= 32;
363 #endif /* _WIN32 && !_WIN64 */
364 
365     if (load_tbbbind && dynamic_link(TBBBIND_NAME, TbbBindLinkTable, LinkTableSize)) {
366         int number_of_groups = 1;
367 #if _WIN32 || _WIN64
368         number_of_groups = NumberOfProcessorGroups();
369 #endif /* _WIN32 || _WIN64 */
370         initialize_numa_topology_ptr(
371             number_of_groups, numa_nodes_count, numa_indexes, default_concurrency_list);
372 
373         if (numa_nodes_count==1 && numa_indexes[0] >= 0) {
374             __TBB_ASSERT(default_concurrency_list[numa_indexes[0]] == (int)governor::default_num_threads(),
375                 "default_concurrency() should be equal to governor::default_num_threads() on single"
376                 "NUMA node systems.");
377         }
378         return;
379     }
380 #endif /* _WIN32 || _WIN64 || __linux__ */
381 
382     static int dummy_index = -1;
383     static int dummy_concurrency = governor::default_num_threads();
384 
385     numa_nodes_count = 1;
386     numa_indexes = &dummy_index;
387     default_concurrency_list = &dummy_concurrency;
388 
389     allocate_binding_handler_ptr = dummy_allocate_binding_handler;
390     deallocate_binding_handler_ptr = dummy_deallocate_binding_handler;
391 
392     bind_to_node_ptr = dummy_bind_to_node;
393     restore_affinity_ptr = dummy_restore_affinity;
394 }
395 
396 void initialize() {
397     atomic_do_once(initialization_impl, numa_topology_init_state);
398 }
399 } // namespace numa_topology
400 
401 binding_handler* construct_binding_handler(int slot_num) {
402     __TBB_ASSERT(allocate_binding_handler_ptr, "tbbbind loading was not performed");
403     return allocate_binding_handler_ptr(slot_num);
404 }
405 
406 void destroy_binding_handler(binding_handler* handler_ptr) {
407     __TBB_ASSERT(deallocate_binding_handler_ptr, "tbbbind loading was not performed");
408     deallocate_binding_handler_ptr(handler_ptr);
409 }
410 
411 void bind_thread_to_node(binding_handler* handler_ptr, int slot_num , int numa_id) {
412     __TBB_ASSERT(slot_num >= 0, "Negative thread index");
413     __TBB_ASSERT(bind_to_node_ptr, "tbbbind loading was not performed");
414     bind_to_node_ptr(handler_ptr, slot_num, numa_id);
415 }
416 
417 void restore_affinity_mask(binding_handler* handler_ptr, int slot_num) {
418     __TBB_ASSERT(slot_num >= 0, "Negative thread index");
419     __TBB_ASSERT(restore_affinity_ptr, "tbbbind loading was not performed");
420     restore_affinity_ptr(handler_ptr, slot_num);
421 }
422 
423 unsigned __TBB_EXPORTED_FUNC numa_node_count() {
424     numa_topology::initialize();
425     return numa_topology::numa_nodes_count;
426 }
427 void __TBB_EXPORTED_FUNC fill_numa_indices(int* index_array) {
428     numa_topology::initialize();
429     for (int i = 0; i < numa_topology::numa_nodes_count; i++) {
430         index_array[i] = numa_topology::numa_indexes[i];
431     }
432 }
433 int __TBB_EXPORTED_FUNC numa_default_concurrency(int node_id) {
434     if (node_id >= 0) {
435         numa_topology::initialize();
436         return numa_topology::default_concurrency_list[node_id];
437     }
438     return governor::default_num_threads();
439 }
440 #endif /* __TBB_NUMA_SUPPORT */
441 
442 } // namespace r1
443 } // namespace detail
444 } // namespace tbb
445