1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "governor.h" 18 #include "main.h" 19 #include "thread_data.h" 20 #include "market.h" 21 #include "arena.h" 22 #include "dynamic_link.h" 23 #include "concurrent_monitor.h" 24 25 #include "oneapi/tbb/task_group.h" 26 #include "oneapi/tbb/global_control.h" 27 #include "oneapi/tbb/tbb_allocator.h" 28 #include "oneapi/tbb/info.h" 29 30 #include "task_dispatcher.h" 31 32 #include <cstdio> 33 #include <cstdlib> 34 #include <cstring> 35 #include <atomic> 36 #include <algorithm> 37 38 namespace tbb { 39 namespace detail { 40 namespace r1 { 41 42 void clear_address_waiter_table(); 43 44 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 45 //! global_control.cpp contains definition 46 bool remove_and_check_if_empty(d1::global_control& gc); 47 bool is_present(d1::global_control& gc); 48 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 49 50 namespace rml { 51 tbb_server* make_private_server( tbb_client& client ); 52 } // namespace rml 53 54 //------------------------------------------------------------------------ 55 // governor 56 //------------------------------------------------------------------------ 57 58 void governor::acquire_resources () { 59 #if __TBB_USE_POSIX 60 int status = theTLS.create(auto_terminate); 61 #else 62 int status = theTLS.create(); 63 #endif 64 if( status ) 65 handle_perror(status, "TBB failed to initialize task scheduler TLS\n"); 66 detect_cpu_features(cpu_features); 67 68 is_rethrow_broken = gcc_rethrow_exception_broken(); 69 } 70 71 void governor::release_resources () { 72 theRMLServerFactory.close(); 73 destroy_process_mask(); 74 75 __TBB_ASSERT(!(__TBB_InitOnce::initialization_done() && theTLS.get()), "TBB is unloaded while thread data still alive?"); 76 77 int status = theTLS.destroy(); 78 if( status ) 79 runtime_warning("failed to destroy task scheduler TLS: %s", std::strerror(status)); 80 clear_address_waiter_table(); 81 82 dynamic_unlink_all(); 83 } 84 85 rml::tbb_server* governor::create_rml_server ( rml::tbb_client& client ) { 86 rml::tbb_server* server = NULL; 87 if( !UsePrivateRML ) { 88 ::rml::factory::status_type status = theRMLServerFactory.make_server( server, client ); 89 if( status != ::rml::factory::st_success ) { 90 UsePrivateRML = true; 91 runtime_warning( "rml::tbb_factory::make_server failed with status %x, falling back on private rml", status ); 92 } 93 } 94 if ( !server ) { 95 __TBB_ASSERT( UsePrivateRML, NULL ); 96 server = rml::make_private_server( client ); 97 } 98 __TBB_ASSERT( server, "Failed to create RML server" ); 99 return server; 100 } 101 102 void governor::one_time_init() { 103 if ( !__TBB_InitOnce::initialization_done() ) { 104 DoOneTimeInitialization(); 105 } 106 } 107 108 /* 109 There is no portable way to get stack base address in Posix, however the modern 110 Linux versions provide pthread_attr_np API that can be used to obtain thread's 111 stack size and base address. Unfortunately even this function does not provide 112 enough information for the main thread on IA-64 architecture (RSE spill area 113 and memory stack are allocated as two separate discontinuous chunks of memory), 114 and there is no portable way to discern the main and the secondary threads. 115 Thus for macOS* and IA-64 architecture for Linux* OS we use the TBB worker stack size for 116 all threads and use the current stack top as the stack base. This simplified 117 approach is based on the following assumptions: 118 1) If the default stack size is insufficient for the user app needs, the 119 required amount will be explicitly specified by the user at the point of the 120 TBB scheduler initialization (as an argument to tbb::task_scheduler_init 121 constructor). 122 2) When an external thread initializes the scheduler, it has enough space on its 123 stack. Here "enough" means "at least as much as worker threads have". 124 3) If the user app strives to conserve the memory by cutting stack size, it 125 should do this for TBB workers too (as in the #1). 126 */ 127 static std::uintptr_t get_stack_base(std::size_t stack_size) { 128 // Stacks are growing top-down. Highest address is called "stack base", 129 // and the lowest is "stack limit". 130 #if USE_WINTHREAD 131 suppress_unused_warning(stack_size); 132 NT_TIB* pteb = (NT_TIB*)NtCurrentTeb(); 133 __TBB_ASSERT(&pteb < pteb->StackBase && &pteb > pteb->StackLimit, "invalid stack info in TEB"); 134 return pteb->StackBase; 135 #else /* USE_PTHREAD */ 136 // There is no portable way to get stack base address in Posix, so we use 137 // non-portable method (on all modern Linux) or the simplified approach 138 // based on the common sense assumptions. The most important assumption 139 // is that the main thread's stack size is not less than that of other threads. 140 141 // Points to the lowest addressable byte of a stack. 142 void* stack_limit = nullptr; 143 #if __linux__ && !__bg__ 144 size_t np_stack_size = 0; 145 pthread_attr_t np_attr_stack; 146 if (0 == pthread_getattr_np(pthread_self(), &np_attr_stack)) { 147 if (0 == pthread_attr_getstack(&np_attr_stack, &stack_limit, &np_stack_size)) { 148 __TBB_ASSERT( &stack_limit > stack_limit, "stack size must be positive" ); 149 } 150 pthread_attr_destroy(&np_attr_stack); 151 } 152 #endif /* __linux__ */ 153 std::uintptr_t stack_base{}; 154 if (stack_limit) { 155 stack_base = reinterpret_cast<std::uintptr_t>(stack_limit) + stack_size; 156 } else { 157 // Use an anchor as a base stack address. 158 int anchor{}; 159 stack_base = reinterpret_cast<std::uintptr_t>(&anchor); 160 } 161 return stack_base; 162 #endif /* USE_PTHREAD */ 163 } 164 165 void governor::init_external_thread() { 166 one_time_init(); 167 // Create new scheduler instance with arena 168 int num_slots = default_num_threads(); 169 // TODO_REVAMP: support an external thread without an implicit arena 170 int num_reserved_slots = 1; 171 unsigned arena_priority_level = 1; // corresponds to tbb::task_arena::priority::normal 172 std::size_t stack_size = 0; 173 arena& a = *market::create_arena(num_slots, num_reserved_slots, arena_priority_level, stack_size); 174 // We need an internal reference to the market. TODO: is it legacy? 175 market::global_market(false); 176 // External thread always occupies the first slot 177 thread_data& td = *new(cache_aligned_allocate(sizeof(thread_data))) thread_data(0, false); 178 td.attach_arena(a, /*slot index*/ 0); 179 __TBB_ASSERT(td.my_inbox.is_idle_state(false), nullptr); 180 181 stack_size = a.my_market->worker_stack_size(); 182 std::uintptr_t stack_base = get_stack_base(stack_size); 183 task_dispatcher& task_disp = td.my_arena_slot->default_task_dispatcher(); 184 task_disp.set_stealing_threshold(calculate_stealing_threshold(stack_base, stack_size)); 185 td.attach_task_dispatcher(task_disp); 186 187 td.my_arena_slot->occupy(); 188 a.my_market->add_external_thread(td); 189 set_thread_data(td); 190 } 191 192 void governor::auto_terminate(void* tls) { 193 __TBB_ASSERT(get_thread_data_if_initialized() == nullptr || 194 get_thread_data_if_initialized() == tls, NULL); 195 if (tls) { 196 thread_data* td = static_cast<thread_data*>(tls); 197 198 // Only external thread can be inside an arena during termination. 199 if (td->my_arena_slot) { 200 arena* a = td->my_arena; 201 market* m = a->my_market; 202 203 a->my_observers.notify_exit_observers(td->my_last_observer, td->my_is_worker); 204 205 td->my_task_dispatcher->m_stealing_threshold = 0; 206 td->detach_task_dispatcher(); 207 td->my_arena_slot->release(); 208 // Release an arena 209 a->on_thread_leaving<arena::ref_external>(); 210 211 m->remove_external_thread(*td); 212 // If there was an associated arena, it added a public market reference 213 m->release( /*is_public*/ true, /*blocking_terminate*/ false); 214 } 215 216 td->~thread_data(); 217 cache_aligned_deallocate(td); 218 219 clear_thread_data(); 220 } 221 __TBB_ASSERT(get_thread_data_if_initialized() == nullptr, NULL); 222 } 223 224 void governor::initialize_rml_factory () { 225 ::rml::factory::status_type res = theRMLServerFactory.open(); 226 UsePrivateRML = res != ::rml::factory::st_success; 227 } 228 229 #if __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 230 void __TBB_EXPORTED_FUNC get(d1::task_scheduler_handle& handle) { 231 handle.m_ctl = new(allocate_memory(sizeof(global_control))) global_control(global_control::scheduler_handle, 1); 232 } 233 234 void release_impl(d1::task_scheduler_handle& handle) { 235 if (handle.m_ctl != nullptr) { 236 handle.m_ctl->~global_control(); 237 deallocate_memory(handle.m_ctl); 238 handle.m_ctl = nullptr; 239 } 240 } 241 242 bool finalize_impl(d1::task_scheduler_handle& handle) { 243 __TBB_ASSERT_RELEASE(handle, "trying to finalize with null handle"); 244 market::global_market_mutex_type::scoped_lock lock( market::theMarketMutex ); 245 bool ok = true; // ok if theMarket does not exist yet 246 market* m = market::theMarket; // read the state of theMarket 247 if (m != nullptr) { 248 lock.release(); 249 __TBB_ASSERT(is_present(*handle.m_ctl), "finalize or release was already called on this object"); 250 thread_data* td = governor::get_thread_data_if_initialized(); 251 if (td) { 252 task_dispatcher* task_disp = td->my_task_dispatcher; 253 __TBB_ASSERT(task_disp, nullptr); 254 if (task_disp->m_properties.outermost && !td->my_is_worker) { // is not inside a parallel region 255 governor::auto_terminate(td); 256 } 257 } 258 if (remove_and_check_if_empty(*handle.m_ctl)) { 259 ok = m->release(/*is_public*/ true, /*blocking_terminate*/ true); 260 } else { 261 ok = false; 262 } 263 } 264 return ok; 265 } 266 267 bool __TBB_EXPORTED_FUNC finalize(d1::task_scheduler_handle& handle, std::intptr_t mode) { 268 if (mode == d1::release_nothrowing) { 269 release_impl(handle); 270 return true; 271 } else { 272 bool ok = finalize_impl(handle); 273 // TODO: it is unsafe when finalize is called concurrently and further library unload 274 release_impl(handle); 275 if (mode == d1::finalize_throwing && !ok) { 276 throw_exception(exception_id::unsafe_wait); 277 } 278 return ok; 279 } 280 } 281 #endif // __TBB_SUPPORTS_WORKERS_WAITING_IN_TERMINATE 282 283 #if __TBB_ARENA_BINDING 284 285 #if __TBB_WEAK_SYMBOLS_PRESENT 286 #pragma weak __TBB_internal_initialize_system_topology 287 #pragma weak __TBB_internal_allocate_binding_handler 288 #pragma weak __TBB_internal_deallocate_binding_handler 289 #pragma weak __TBB_internal_apply_affinity 290 #pragma weak __TBB_internal_restore_affinity 291 #pragma weak __TBB_internal_get_default_concurrency 292 293 extern "C" { 294 void __TBB_internal_initialize_system_topology( 295 size_t groups_num, 296 int& numa_nodes_count, int*& numa_indexes_list, 297 int& core_types_count, int*& core_types_indexes_list 298 ); 299 300 //TODO: consider renaming to `create_binding_handler` and `destroy_binding_handler` 301 binding_handler* __TBB_internal_allocate_binding_handler( int slot_num, int numa_id, int core_type_id, int max_threads_per_core ); 302 void __TBB_internal_deallocate_binding_handler( binding_handler* handler_ptr ); 303 304 void __TBB_internal_apply_affinity( binding_handler* handler_ptr, int slot_num ); 305 void __TBB_internal_restore_affinity( binding_handler* handler_ptr, int slot_num ); 306 307 int __TBB_internal_get_default_concurrency( int numa_id, int core_type_id, int max_threads_per_core ); 308 } 309 #endif /* __TBB_WEAK_SYMBOLS_PRESENT */ 310 311 // Stubs that will be used if TBBbind library is unavailable. 312 static binding_handler* dummy_allocate_binding_handler ( int, int, int, int ) { return nullptr; } 313 static void dummy_deallocate_binding_handler ( binding_handler* ) { } 314 static void dummy_apply_affinity ( binding_handler*, int ) { } 315 static void dummy_restore_affinity ( binding_handler*, int ) { } 316 static int dummy_get_default_concurrency( int, int, int ) { return governor::default_num_threads(); } 317 318 // Handlers for communication with TBBbind 319 static void (*initialize_system_topology_ptr)( 320 size_t groups_num, 321 int& numa_nodes_count, int*& numa_indexes_list, 322 int& core_types_count, int*& core_types_indexes_list 323 ) = nullptr; 324 325 static binding_handler* (*allocate_binding_handler_ptr)( int slot_num, int numa_id, int core_type_id, int max_threads_per_core ) 326 = dummy_allocate_binding_handler; 327 static void (*deallocate_binding_handler_ptr)( binding_handler* handler_ptr ) 328 = dummy_deallocate_binding_handler; 329 static void (*apply_affinity_ptr)( binding_handler* handler_ptr, int slot_num ) 330 = dummy_apply_affinity; 331 static void (*restore_affinity_ptr)( binding_handler* handler_ptr, int slot_num ) 332 = dummy_restore_affinity; 333 int (*get_default_concurrency_ptr)( int numa_id, int core_type_id, int max_threads_per_core ) 334 = dummy_get_default_concurrency; 335 336 #if _WIN32 || _WIN64 || __unix__ 337 // Table describing how to link the handlers. 338 static const dynamic_link_descriptor TbbBindLinkTable[] = { 339 DLD(__TBB_internal_initialize_system_topology, initialize_system_topology_ptr), 340 DLD(__TBB_internal_allocate_binding_handler, allocate_binding_handler_ptr), 341 DLD(__TBB_internal_deallocate_binding_handler, deallocate_binding_handler_ptr), 342 DLD(__TBB_internal_apply_affinity, apply_affinity_ptr), 343 DLD(__TBB_internal_restore_affinity, restore_affinity_ptr), 344 DLD(__TBB_internal_get_default_concurrency, get_default_concurrency_ptr) 345 }; 346 347 static const unsigned LinkTableSize = sizeof(TbbBindLinkTable) / sizeof(dynamic_link_descriptor); 348 349 #if TBB_USE_DEBUG 350 #define DEBUG_SUFFIX "_debug" 351 #else 352 #define DEBUG_SUFFIX 353 #endif /* TBB_USE_DEBUG */ 354 355 #if _WIN32 || _WIN64 356 #define LIBRARY_EXTENSION ".dll" 357 #define LIBRARY_PREFIX 358 #elif __unix__ 359 #define LIBRARY_EXTENSION __TBB_STRING(.so.3) 360 #define LIBRARY_PREFIX "lib" 361 #endif /* __unix__ */ 362 363 #define TBBBIND_NAME LIBRARY_PREFIX "tbbbind" DEBUG_SUFFIX LIBRARY_EXTENSION 364 #define TBBBIND_2_0_NAME LIBRARY_PREFIX "tbbbind_2_0" DEBUG_SUFFIX LIBRARY_EXTENSION 365 366 #define TBBBIND_2_5_NAME LIBRARY_PREFIX "tbbbind_2_5" DEBUG_SUFFIX LIBRARY_EXTENSION 367 #endif /* _WIN32 || _WIN64 || __unix__ */ 368 369 // Representation of system hardware topology information on the TBB side. 370 // System topology may be initialized by third-party component (e.g. hwloc) 371 // or just filled in with default stubs. 372 namespace system_topology { 373 374 constexpr int automatic = -1; 375 376 static std::atomic<do_once_state> initialization_state; 377 378 namespace { 379 int numa_nodes_count = 0; 380 int* numa_nodes_indexes = nullptr; 381 382 int core_types_count = 0; 383 int* core_types_indexes = nullptr; 384 385 const char* load_tbbbind_shared_object() { 386 #if _WIN32 || _WIN64 || __unix__ 387 #if _WIN32 && !_WIN64 388 // For 32-bit Windows applications, process affinity masks can only support up to 32 logical CPUs. 389 SYSTEM_INFO si; 390 GetNativeSystemInfo(&si); 391 if (si.dwNumberOfProcessors > 32) return nullptr; 392 #endif /* _WIN32 && !_WIN64 */ 393 for (const auto& tbbbind_version : {TBBBIND_2_5_NAME, TBBBIND_2_0_NAME, TBBBIND_NAME}) { 394 if (dynamic_link(tbbbind_version, TbbBindLinkTable, LinkTableSize, nullptr, DYNAMIC_LINK_LOCAL_BINDING)) { 395 return tbbbind_version; 396 } 397 } 398 #endif /* _WIN32 || _WIN64 || __unix__ */ 399 return nullptr; 400 } 401 402 int processor_groups_num() { 403 #if _WIN32 404 return NumberOfProcessorGroups(); 405 #else 406 // Stub to improve code readability by reducing number of the compile-time conditions 407 return 1; 408 #endif 409 } 410 } // internal namespace 411 412 // Tries to load TBBbind library API, if success, gets NUMA topology information from it, 413 // in another case, fills NUMA topology by stubs. 414 void initialization_impl() { 415 governor::one_time_init(); 416 417 if (const char* tbbbind_name = load_tbbbind_shared_object()) { 418 initialize_system_topology_ptr( 419 processor_groups_num(), 420 numa_nodes_count, numa_nodes_indexes, 421 core_types_count, core_types_indexes 422 ); 423 424 PrintExtraVersionInfo("TBBBIND", tbbbind_name); 425 return; 426 } 427 428 static int dummy_index = automatic; 429 430 numa_nodes_count = 1; 431 numa_nodes_indexes = &dummy_index; 432 433 core_types_count = 1; 434 core_types_indexes = &dummy_index; 435 436 PrintExtraVersionInfo("TBBBIND", "UNAVAILABLE"); 437 } 438 439 void initialize() { 440 atomic_do_once(initialization_impl, initialization_state); 441 } 442 } // namespace system_topology 443 444 binding_handler* construct_binding_handler(int slot_num, int numa_id, int core_type_id, int max_threads_per_core) { 445 system_topology::initialize(); 446 return allocate_binding_handler_ptr(slot_num, numa_id, core_type_id, max_threads_per_core); 447 } 448 449 void destroy_binding_handler(binding_handler* handler_ptr) { 450 __TBB_ASSERT(deallocate_binding_handler_ptr, "tbbbind loading was not performed"); 451 deallocate_binding_handler_ptr(handler_ptr); 452 } 453 454 void apply_affinity_mask(binding_handler* handler_ptr, int slot_index) { 455 __TBB_ASSERT(slot_index >= 0, "Negative thread index"); 456 __TBB_ASSERT(apply_affinity_ptr, "tbbbind loading was not performed"); 457 apply_affinity_ptr(handler_ptr, slot_index); 458 } 459 460 void restore_affinity_mask(binding_handler* handler_ptr, int slot_index) { 461 __TBB_ASSERT(slot_index >= 0, "Negative thread index"); 462 __TBB_ASSERT(restore_affinity_ptr, "tbbbind loading was not performed"); 463 restore_affinity_ptr(handler_ptr, slot_index); 464 } 465 466 unsigned __TBB_EXPORTED_FUNC numa_node_count() { 467 system_topology::initialize(); 468 return system_topology::numa_nodes_count; 469 } 470 471 void __TBB_EXPORTED_FUNC fill_numa_indices(int* index_array) { 472 system_topology::initialize(); 473 std::memcpy(index_array, system_topology::numa_nodes_indexes, system_topology::numa_nodes_count * sizeof(int)); 474 } 475 476 int __TBB_EXPORTED_FUNC numa_default_concurrency(int node_id) { 477 if (node_id >= 0) { 478 system_topology::initialize(); 479 int result = get_default_concurrency_ptr( 480 node_id, 481 /*core_type*/system_topology::automatic, 482 /*threads_per_core*/system_topology::automatic 483 ); 484 if (result > 0) return result; 485 } 486 return governor::default_num_threads(); 487 } 488 489 unsigned __TBB_EXPORTED_FUNC core_type_count(intptr_t /*reserved*/) { 490 system_topology::initialize(); 491 return system_topology::core_types_count; 492 } 493 494 void __TBB_EXPORTED_FUNC fill_core_type_indices(int* index_array, intptr_t /*reserved*/) { 495 system_topology::initialize(); 496 std::memcpy(index_array, system_topology::core_types_indexes, system_topology::core_types_count * sizeof(int)); 497 } 498 499 void constraints_assertion(d1::constraints c) { 500 bool is_topology_initialized = system_topology::initialization_state == do_once_state::initialized; 501 __TBB_ASSERT_RELEASE(c.max_threads_per_core == system_topology::automatic || c.max_threads_per_core > 0, 502 "Wrong max_threads_per_core constraints field value."); 503 504 auto numa_nodes_begin = system_topology::numa_nodes_indexes; 505 auto numa_nodes_end = system_topology::numa_nodes_indexes + system_topology::numa_nodes_count; 506 __TBB_ASSERT_RELEASE( 507 c.numa_id == system_topology::automatic || 508 (is_topology_initialized && std::find(numa_nodes_begin, numa_nodes_end, c.numa_id) != numa_nodes_end), 509 "The constraints::numa_id value is not known to the library. Use tbb::info::numa_nodes() to get the list of possible values."); 510 511 int* core_types_begin = system_topology::core_types_indexes; 512 int* core_types_end = system_topology::core_types_indexes + system_topology::core_types_count; 513 __TBB_ASSERT_RELEASE(c.core_type == system_topology::automatic || 514 (is_topology_initialized && std::find(core_types_begin, core_types_end, c.core_type) != core_types_end), 515 "The constraints::core_type value is not known to the library. Use tbb::info::core_types() to get the list of possible values."); 516 } 517 518 int __TBB_EXPORTED_FUNC constraints_default_concurrency(const d1::constraints& c, intptr_t /*reserved*/) { 519 constraints_assertion(c); 520 521 if (c.numa_id >= 0 || c.core_type >= 0 || c.max_threads_per_core > 0) { 522 system_topology::initialize(); 523 return get_default_concurrency_ptr(c.numa_id, c.core_type, c.max_threads_per_core); 524 } 525 return governor::default_num_threads(); 526 } 527 528 int __TBB_EXPORTED_FUNC constraints_threads_per_core(const d1::constraints&, intptr_t /*reserved*/) { 529 return system_topology::automatic; 530 } 531 #endif /* __TBB_ARENA_BINDING */ 532 533 } // namespace r1 534 } // namespace detail 535 } // namespace tbb 536