1 /* 2 Copyright (c) 2017-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include <atomic> 18 #include <cstring> 19 #include <cstdlib> 20 21 #include "../../src/tbb/rml_tbb.h" 22 #include "../../src/tbb/rml_thread_monitor.h" 23 #include "../../src/tbb/scheduler_common.h" 24 #include "../../src/tbb/governor.h" 25 #include "../../src/tbb/misc.h" 26 #include "tbb/cache_aligned_allocator.h" 27 28 #include "ipc_utils.h" 29 30 #include <fcntl.h> 31 #include <stdlib.h> 32 33 namespace rml { 34 namespace internal { 35 36 static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE"; 37 38 typedef versioned_object::version_type version_type; 39 40 extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) { 41 if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) { 42 return factory::st_incompatible; 43 } 44 45 // Hack to keep this library from being closed 46 static std::atomic<bool> one_time_flag{false}; 47 bool expected = false; 48 49 if( one_time_flag.compare_exchange_strong(expected, true) ) { 50 __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr ); 51 #if _WIN32||_WIN64 52 f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload); 53 #else 54 f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload); 55 #endif 56 } 57 // End of hack 58 59 return factory::st_success; 60 } 61 62 extern "C" void __RML_close_factory(factory& /*f*/) { 63 } 64 65 class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor { 66 public: 67 ipc_thread_monitor() : thread_monitor() {} 68 69 #if USE_WINTHREAD 70 #elif USE_PTHREAD 71 static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size); 72 #endif 73 }; 74 75 #if USE_WINTHREAD 76 #elif USE_PTHREAD 77 inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) { 78 pthread_attr_t s; 79 if( pthread_attr_init( &s ) ) return 0; 80 if( stack_size>0 ) { 81 if( pthread_attr_setstacksize( &s, stack_size ) ) return 0; 82 } 83 pthread_t handle; 84 if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0; 85 if( pthread_attr_destroy( &s ) ) return 0; 86 return handle; 87 } 88 #endif 89 90 }} // rml::internal 91 92 using rml::internal::ipc_thread_monitor; 93 using tbb::internal::rml::get_shared_name; 94 95 namespace tbb { 96 namespace detail { 97 98 namespace r1 { 99 bool terminate_on_exception() { 100 return false; 101 } 102 } 103 104 namespace rml { 105 106 typedef ipc_thread_monitor::handle_type thread_handle; 107 108 class ipc_server; 109 110 static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS"; 111 static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active"; 112 static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop"; 113 static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE"; 114 static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE"; 115 static const mode_t IPC_SEM_MODE = 0660; 116 117 static std::atomic<int> my_global_thread_count; 118 using tbb_client = tbb::detail::r1::rml::tbb_client; 119 using tbb_server = tbb::detail::r1::rml::tbb_server; 120 using tbb_factory = tbb::detail::r1::rml::tbb_factory; 121 122 using tbb::detail::r1::runtime_warning; 123 124 char* get_sem_name(const char* name, const char* prefix) { 125 __TBB_ASSERT(name != nullptr, nullptr); 126 __TBB_ASSERT(prefix != nullptr, nullptr); 127 char* value = std::getenv(name); 128 std::size_t len = value == nullptr ? 0 : std::strlen(value); 129 if (len > 0) { 130 // TODO: consider returning the original string instead of the copied string. 131 char* sem_name = new char[len + 1]; 132 __TBB_ASSERT(sem_name != nullptr, nullptr); 133 std::strncpy(sem_name, value, len+1); 134 __TBB_ASSERT(sem_name[len] == 0, nullptr); 135 return sem_name; 136 } else { 137 return get_shared_name(prefix); 138 } 139 } 140 141 char* get_active_sem_name() { 142 return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 143 } 144 145 char* get_stop_sem_name() { 146 return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 147 } 148 149 static void release_thread_sem(sem_t* my_sem) { 150 int old = my_global_thread_count.load(std::memory_order_relaxed); 151 do { 152 if( old<=0 ) return; 153 } while( !my_global_thread_count.compare_exchange_strong(old, old-1) ); 154 if( old>0 ) { 155 sem_post( my_sem ); 156 } 157 } 158 159 void set_sem_name(const char* name, const char* prefix) { 160 __TBB_ASSERT(name != nullptr, nullptr); 161 __TBB_ASSERT(prefix != nullptr, nullptr); 162 const char* postfix = "_XXXXXX"; 163 std::size_t plen = std::strlen(prefix); 164 std::size_t xlen = std::strlen(postfix); 165 char* templ = new char[plen + xlen + 1]; 166 __TBB_ASSERT(templ != nullptr, nullptr); 167 strncpy(templ, prefix, plen+1); 168 __TBB_ASSERT(templ[plen] == 0, nullptr); 169 strncat(templ, postfix, xlen + 1); 170 __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr); 171 // TODO: consider using mkstemp instead of mktemp. 172 char* sem_name = mktemp(templ); 173 if (sem_name != nullptr) { 174 int status = setenv(name, sem_name, /*overwrite*/ 1); 175 __TBB_ASSERT_EX(status == 0, nullptr); 176 } 177 delete[] templ; 178 } 179 180 extern "C" void set_active_sem_name() { 181 set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 182 } 183 184 extern "C" void set_stop_sem_name() { 185 set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 186 } 187 188 extern "C" void release_resources() { 189 if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) { 190 char* active_sem_name = get_active_sem_name(); 191 sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT ); 192 __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 193 delete[] active_sem_name; 194 195 do { 196 release_thread_sem( my_active_sem ); 197 } while( my_global_thread_count.load(std::memory_order_acquire)!=0 ); 198 } 199 } 200 201 extern "C" void release_semaphores() { 202 int status = 0; 203 char* sem_name = nullptr; 204 205 sem_name = get_active_sem_name(); 206 if( sem_name==nullptr ) { 207 runtime_warning("Can not get RML semaphore name"); 208 return; 209 } 210 status = sem_unlink( sem_name ); 211 if( status!=0 ) { 212 if( errno==ENOENT ) { 213 /* There is no semaphore with the given name, nothing to do */ 214 } else { 215 runtime_warning("Can not release RML semaphore"); 216 return; 217 } 218 } 219 delete[] sem_name; 220 221 sem_name = get_stop_sem_name(); 222 if( sem_name==nullptr ) { 223 runtime_warning( "Can not get RML semaphore name" ); 224 return; 225 } 226 status = sem_unlink( sem_name ); 227 if( status!=0 ) { 228 if( errno==ENOENT ) { 229 /* There is no semaphore with the given name, nothing to do */ 230 } else { 231 runtime_warning("Can not release RML semaphore"); 232 return; 233 } 234 } 235 delete[] sem_name; 236 } 237 238 class ipc_worker: no_copy { 239 protected: 240 //! State in finite-state machine that controls the worker. 241 /** State diagram: 242 /----------stop---\ 243 | ^ | 244 V | | 245 init --> starting --> normal | 246 | | | | 247 | V | | 248 \------> quit <-------/<----/ 249 */ 250 enum state_t { 251 //! *this is initialized 252 st_init, 253 //! *this has associated thread that is starting up. 254 st_starting, 255 //! Associated thread is doing normal life sequence. 256 st_normal, 257 //! Associated thread is stopped but can be started again. 258 st_stop, 259 //! Associated thread has ended normal life sequence and promises to never touch *this again. 260 st_quit 261 }; 262 std::atomic<state_t> my_state; 263 264 //! Associated server 265 ipc_server& my_server; 266 267 //! Associated client 268 tbb_client& my_client; 269 270 //! index used for avoiding the 64K aliasing problem 271 const size_t my_index; 272 273 //! Monitor for sleeping when there is no work to do. 274 /** The invariant that holds for sleeping workers is: 275 "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */ 276 ipc_thread_monitor my_thread_monitor; 277 278 //! Handle of the OS thread associated with this worker 279 thread_handle my_handle; 280 281 //! Link for list of workers that are sleeping or have no associated thread. 282 ipc_worker* my_next; 283 284 friend class ipc_server; 285 286 //! Actions executed by the associated thread 287 void run(); 288 289 //! Wake up associated thread (or launch a thread if there is none) 290 bool wake_or_launch(); 291 292 //! Called by a thread (usually not the associated thread) to commence termination. 293 void start_shutdown(bool join); 294 295 //! Called by a thread (usually not the associated thread) to commence stopping. 296 void start_stopping(bool join); 297 298 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 299 300 static void release_handle(thread_handle my_handle, bool join); 301 302 protected: 303 ipc_worker(ipc_server& server, tbb_client& client, const size_t i) : 304 my_server(server), 305 my_client(client), 306 my_index(i) 307 { 308 my_state = st_init; 309 } 310 }; 311 312 //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit 313 constexpr static size_t cache_line_sz = 128; 314 315 #if _MSC_VER && !defined(__INTEL_COMPILER) 316 // Suppress overzealous compiler warnings about uninstantiable class 317 #pragma warning(push) 318 #pragma warning(disable:4510 4610) 319 #endif 320 class padded_ipc_worker: public ipc_worker { 321 char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz]; 322 public: 323 padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i) 324 : ipc_worker( server,client,i ) { suppress_unused_warning(pad); } 325 }; 326 #if _MSC_VER && !defined(__INTEL_COMPILER) 327 #pragma warning(pop) 328 #endif 329 330 class ipc_waker : public padded_ipc_worker { 331 private: 332 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 333 void run(); 334 bool wake_or_launch(); 335 336 friend class ipc_server; 337 338 public: 339 ipc_waker(ipc_server& server, tbb_client& client, const size_t i) 340 : padded_ipc_worker( server, client, i ) {} 341 }; 342 343 class ipc_stopper : public padded_ipc_worker { 344 private: 345 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 346 void run(); 347 bool wake_or_launch(); 348 349 friend class ipc_server; 350 351 public: 352 ipc_stopper(ipc_server& server, tbb_client& client, const size_t i) 353 : padded_ipc_worker( server, client, i ) {} 354 }; 355 356 class ipc_server: public tbb_server, no_copy { 357 private: 358 tbb_client& my_client; 359 //! Maximum number of threads to be created. 360 /** Threads are created lazily, so maximum might not actually be reached. */ 361 tbb_client::size_type my_n_thread; 362 363 //! Stack size for each thread. */ 364 const size_t my_stack_size; 365 366 //! Number of jobs that could use their associated thread minus number of active threads. 367 /** If negative, indicates oversubscription. 368 If positive, indicates that more threads should run. 369 Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex, 370 because raising it impacts the invariant for sleeping threads. */ 371 std::atomic<int> my_slack; 372 373 //! Counter used to determine when to delete this. 374 std::atomic<int> my_ref_count; 375 376 padded_ipc_worker* my_thread_array; 377 378 //! List of workers that are asleep or committed to sleeping until notified by another thread. 379 std::atomic<ipc_worker*> my_asleep_list_root; 380 381 //! Protects my_asleep_list_root 382 typedef scheduler_mutex_type asleep_list_mutex_type; 383 asleep_list_mutex_type my_asleep_list_mutex; 384 385 //! Should server wait workers while terminate 386 const bool my_join_workers; 387 388 //! Service thread for waking of workers 389 ipc_waker* my_waker; 390 391 //! Service thread to stop threads 392 ipc_stopper* my_stopper; 393 394 //! Semaphore to account active threads 395 sem_t* my_active_sem; 396 397 //! Semaphore to account stop threads 398 sem_t* my_stop_sem; 399 400 #if TBB_USE_ASSERT 401 std::atomic<int> my_net_slack_requests; 402 #endif /* TBB_USE_ASSERT */ 403 404 //! Wake up to two sleeping workers, if there are any sleeping. 405 /** The call is used to propagate a chain reaction where each thread wakes up two threads, 406 which in turn each wake up two threads, etc. */ 407 void propagate_chain_reaction() { 408 // First test of a double-check idiom. Second test is inside wake_some(0). 409 if( my_slack.load(std::memory_order_acquire)>0 ) { 410 int active_threads = 0; 411 if( try_get_active_thread() ) { 412 ++active_threads; 413 if( try_get_active_thread() ) { 414 ++active_threads; 415 } 416 wake_some( 0, active_threads ); 417 } 418 } 419 } 420 421 //! Try to add t to list of sleeping workers 422 bool try_insert_in_asleep_list(ipc_worker& t); 423 424 //! Try to add t to list of sleeping workers even if there is some work to do 425 bool try_insert_in_asleep_list_forced(ipc_worker& t); 426 427 //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits. 428 void wake_some(int additional_slack, int active_threads); 429 430 //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits. 431 void wake_one_forced(int additional_slack); 432 433 //! Stop one thread from asleep list 434 bool stop_one(); 435 436 //! Wait for active thread 437 bool wait_active_thread(); 438 439 //! Try to get active thread 440 bool try_get_active_thread(); 441 442 //! Release active thread 443 void release_active_thread(); 444 445 //! Wait for thread to stop 446 bool wait_stop_thread(); 447 448 //! Add thread to stop list 449 void add_stop_thread(); 450 451 void remove_server_ref() { 452 if( --my_ref_count==0 ) { 453 my_client.acknowledge_close_connection(); 454 this->~ipc_server(); 455 tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 ); 456 } 457 } 458 459 friend class ipc_worker; 460 friend class ipc_waker; 461 friend class ipc_stopper; 462 public: 463 ipc_server(tbb_client& client); 464 virtual ~ipc_server(); 465 466 version_type version() const override { 467 return 0; 468 } 469 470 void request_close_connection(bool /*exiting*/) override { 471 my_waker->start_shutdown(false); 472 my_stopper->start_shutdown(false); 473 for( size_t i=0; i<my_n_thread; ++i ) 474 my_thread_array[i].start_shutdown( my_join_workers ); 475 remove_server_ref(); 476 } 477 478 void yield() override {d0::yield();} 479 480 void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); } 481 482 unsigned default_concurrency() const override { return my_n_thread - 1; } 483 484 void adjust_job_count_estimate(int delta) override; 485 486 #if _WIN32||_WIN64 487 void register_external_thread(::rml::server::execution_resource_t&) override {} 488 void unregister_external_thread(::rml::server::execution_resource_t) override {} 489 #endif /* _WIN32||_WIN64 */ 490 }; 491 492 //------------------------------------------------------------------------ 493 // Methods of ipc_worker 494 //------------------------------------------------------------------------ 495 #if _MSC_VER && !defined(__INTEL_COMPILER) 496 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 497 #pragma warning(push) 498 #pragma warning(disable:4189) 499 #endif 500 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 501 // ensure that stack is properly aligned 502 __attribute__((force_align_arg_pointer)) 503 #endif 504 __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) { 505 ipc_worker* self = static_cast<ipc_worker*>(arg); 506 AVOID_64K_ALIASING( self->my_index ); 507 self->run(); 508 return 0; 509 } 510 #if _MSC_VER && !defined(__INTEL_COMPILER) 511 #pragma warning(pop) 512 #endif 513 514 void ipc_worker::release_handle(thread_handle handle, bool join) { 515 if( join ) 516 ipc_thread_monitor::join( handle ); 517 else 518 ipc_thread_monitor::detach_thread( handle ); 519 } 520 521 void ipc_worker::start_shutdown(bool join) { 522 state_t s = my_state.load(std::memory_order_relaxed);; 523 524 do { 525 __TBB_ASSERT( s!=st_quit, nullptr ); 526 } while( !my_state.compare_exchange_strong( s, st_quit ) ); 527 if( s==st_normal || s==st_starting ) { 528 // May have invalidated invariant for sleeping, so wake up the thread. 529 // Note that the notify() here occurs without maintaining invariants for my_slack. 530 // It does not matter, because my_state==st_quit overrides checking of my_slack. 531 my_thread_monitor.notify(); 532 // Do not need release handle in st_init state, 533 // because in this case the thread wasn't started yet. 534 // For st_starting release is done at launch site. 535 if( s==st_normal ) 536 release_handle( my_handle, join ); 537 } 538 } 539 540 void ipc_worker::start_stopping(bool join) { 541 state_t s = my_state.load(std::memory_order_relaxed);; 542 543 while( !my_state.compare_exchange_strong( s, st_quit ) ) {}; 544 if( s==st_normal || s==st_starting ) { 545 // May have invalidated invariant for sleeping, so wake up the thread. 546 // Note that the notify() here occurs without maintaining invariants for my_slack. 547 // It does not matter, because my_state==st_quit overrides checking of my_slack. 548 my_thread_monitor.notify(); 549 // Do not need release handle in st_init state, 550 // because in this case the thread wasn't started yet. 551 // For st_starting release is done at launch site. 552 if( s==st_normal ) 553 release_handle( my_handle, join ); 554 } 555 } 556 557 void ipc_worker::run() { 558 my_server.propagate_chain_reaction(); 559 560 // Transiting to st_normal here would require setting my_handle, 561 // which would create race with the launching thread and 562 // complications in handle management on Windows. 563 564 ::rml::job& j = *my_client.create_one_job(); 565 state_t state = my_state.load(std::memory_order_acquire); 566 while( state!=st_quit && state!=st_stop ) { 567 if( my_server.my_slack>=0 ) { 568 my_client.process(j); 569 } else { 570 // Check/set the invariant for sleeping 571 state = my_state.load(std::memory_order_seq_cst); 572 if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) { 573 if( my_server.my_n_thread > 1 ) my_server.release_active_thread(); 574 my_thread_monitor.wait(); 575 my_server.propagate_chain_reaction(); 576 } 577 } 578 // memory_order_seq_cst to be strictly ordered after thread_monitor::wait 579 state = my_state.load(std::memory_order_seq_cst); 580 } 581 my_client.cleanup(j); 582 583 my_server.remove_server_ref(); 584 } 585 586 inline bool ipc_worker::wake_or_launch() { 587 state_t excepted_stop = st_stop, expected_init = st_init; 588 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) || 589 ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) { 590 // after this point, remove_server_ref() must be done by created thread 591 #if USE_WINTHREAD 592 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 593 #elif USE_PTHREAD 594 { 595 affinity_helper fpa; 596 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 597 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 598 if( my_handle == 0 ) { 599 // Unable to create new thread for process 600 // However, this is expected situation for the use cases of this coordination server 601 state_t s = st_starting; 602 my_state.compare_exchange_strong( s, st_init ); 603 if (st_starting != s) { 604 // Do shutdown during startup. my_handle can't be released 605 // by start_shutdown, because my_handle value might be not set yet 606 // at time of transition from st_starting to st_quit. 607 __TBB_ASSERT( s==st_quit, nullptr ); 608 release_handle( my_handle, my_server.my_join_workers ); 609 } 610 return false; 611 } else { 612 my_server.my_ref_count++; 613 } 614 // Implicit destruction of fpa resets original affinity mask. 615 } 616 #endif /* USE_PTHREAD */ 617 state_t s = st_starting; 618 my_state.compare_exchange_strong( s, st_normal ); 619 if( st_starting!=s ) { 620 // Do shutdown during startup. my_handle can't be released 621 // by start_shutdown, because my_handle value might be not set yet 622 // at time of transition from st_starting to st_quit. 623 __TBB_ASSERT( s==st_quit, nullptr ); 624 release_handle( my_handle, my_server.my_join_workers ); 625 } 626 } 627 else { 628 my_thread_monitor.notify(); 629 } 630 631 return true; 632 } 633 634 //------------------------------------------------------------------------ 635 // Methods of ipc_waker 636 //------------------------------------------------------------------------ 637 #if _MSC_VER && !defined(__INTEL_COMPILER) 638 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 639 #pragma warning(push) 640 #pragma warning(disable:4189) 641 #endif 642 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 643 // ensure that stack is properly aligned 644 __attribute__((force_align_arg_pointer)) 645 #endif 646 __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) { 647 ipc_waker* self = static_cast<ipc_waker*>(arg); 648 AVOID_64K_ALIASING( self->my_index ); 649 self->run(); 650 return 0; 651 } 652 #if _MSC_VER && !defined(__INTEL_COMPILER) 653 #pragma warning(pop) 654 #endif 655 656 void ipc_waker::run() { 657 // Transiting to st_normal here would require setting my_handle, 658 // which would create race with the launching thread and 659 // complications in handle management on Windows. 660 661 // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration 662 while( my_state.load(std::memory_order_seq_cst)!=st_quit ) { 663 bool have_to_sleep = false; 664 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 665 if( my_server.wait_active_thread() ) { 666 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 667 my_server.wake_some( 0, 1 ); 668 } else { 669 my_server.release_active_thread(); 670 have_to_sleep = true; 671 } 672 } 673 } else { 674 have_to_sleep = true; 675 } 676 if( have_to_sleep ) { 677 // Check/set the invariant for sleeping 678 if( my_server.my_slack.load(std::memory_order_acquire)<0 ) { 679 my_thread_monitor.wait(); 680 } 681 } 682 } 683 684 my_server.remove_server_ref(); 685 } 686 687 inline bool ipc_waker::wake_or_launch() { 688 state_t excepted = st_init; 689 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 690 // after this point, remove_server_ref() must be done by created thread 691 #if USE_WINTHREAD 692 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 693 #elif USE_PTHREAD 694 { 695 affinity_helper fpa; 696 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 697 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 698 if( my_handle == 0 ) { 699 runtime_warning( "Unable to create new thread for process %d", getpid() ); 700 state_t s = st_starting; 701 my_state.compare_exchange_strong(s, st_init); 702 if (st_starting != s) { 703 // Do shutdown during startup. my_handle can't be released 704 // by start_shutdown, because my_handle value might be not set yet 705 // at time of transition from st_starting to st_quit. 706 __TBB_ASSERT( s==st_quit, nullptr ); 707 release_handle( my_handle, my_server.my_join_workers ); 708 } 709 return false; 710 } else { 711 my_server.my_ref_count++; 712 } 713 // Implicit destruction of fpa resets original affinity mask. 714 } 715 #endif /* USE_PTHREAD */ 716 state_t s = st_starting; 717 my_state.compare_exchange_strong(s, st_normal); 718 if( st_starting!=s ) { 719 // Do shutdown during startup. my_handle can't be released 720 // by start_shutdown, because my_handle value might be not set yet 721 // at time of transition from st_starting to st_quit. 722 __TBB_ASSERT( s==st_quit, nullptr ); 723 release_handle( my_handle, my_server.my_join_workers ); 724 } 725 } 726 else { 727 my_thread_monitor.notify(); 728 } 729 730 return true; 731 } 732 733 //------------------------------------------------------------------------ 734 // Methods of ipc_stopper 735 //------------------------------------------------------------------------ 736 #if _MSC_VER && !defined(__INTEL_COMPILER) 737 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 738 #pragma warning(push) 739 #pragma warning(disable:4189) 740 #endif 741 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 742 // ensure that stack is properly aligned 743 __attribute__((force_align_arg_pointer)) 744 #endif 745 __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) { 746 ipc_stopper* self = static_cast<ipc_stopper*>(arg); 747 AVOID_64K_ALIASING( self->my_index ); 748 self->run(); 749 return 0; 750 } 751 #if _MSC_VER && !defined(__INTEL_COMPILER) 752 #pragma warning(pop) 753 #endif 754 755 void ipc_stopper::run() { 756 // Transiting to st_normal here would require setting my_handle, 757 // which would create race with the launching thread and 758 // complications in handle management on Windows. 759 760 while( my_state.load(std::memory_order_acquire)!=st_quit ) { 761 if( my_server.wait_stop_thread() ) { 762 if( my_state.load(std::memory_order_acquire)!=st_quit ) { 763 if( !my_server.stop_one() ) { 764 my_server.add_stop_thread(); 765 tbb::detail::r1::prolonged_pause(); 766 } 767 } 768 } 769 } 770 771 my_server.remove_server_ref(); 772 } 773 774 inline bool ipc_stopper::wake_or_launch() { 775 state_t excepted = st_init; 776 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 777 // after this point, remove_server_ref() must be done by created thread 778 #if USE_WINTHREAD 779 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 780 #elif USE_PTHREAD 781 { 782 affinity_helper fpa; 783 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 784 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 785 if( my_handle == 0 ) { 786 runtime_warning( "Unable to create new thread for process %d", getpid() ); 787 state_t s = st_starting; 788 my_state.compare_exchange_strong(s, st_init); 789 if (st_starting != s) { 790 // Do shutdown during startup. my_handle can't be released 791 // by start_shutdown, because my_handle value might be not set yet 792 // at time of transition from st_starting to st_quit. 793 __TBB_ASSERT( s==st_quit, nullptr ); 794 release_handle( my_handle, my_server.my_join_workers ); 795 } 796 return false; 797 } else { 798 my_server.my_ref_count++; 799 } 800 // Implicit destruction of fpa resets original affinity mask. 801 } 802 #endif /* USE_PTHREAD */ 803 state_t s = st_starting; 804 my_state.compare_exchange_strong(s, st_normal); 805 if( st_starting!=s ) { 806 // Do shutdown during startup. my_handle can't be released 807 // by start_shutdown, because my_handle value might be not set yet 808 // at time of transition from st_starting to st_quit. 809 __TBB_ASSERT( s==st_quit, nullptr ); 810 release_handle( my_handle, my_server.my_join_workers ); 811 } 812 } 813 else { 814 my_thread_monitor.notify(); 815 } 816 817 return true; 818 } 819 820 //------------------------------------------------------------------------ 821 // Methods of ipc_server 822 //------------------------------------------------------------------------ 823 ipc_server::ipc_server(tbb_client& client) : 824 my_client( client ), 825 my_stack_size( client.min_stack_size() ), 826 my_thread_array(nullptr), 827 my_join_workers(false), 828 my_waker(nullptr), 829 my_stopper(nullptr) 830 { 831 my_ref_count = 1; 832 my_slack = 0; 833 #if TBB_USE_ASSERT 834 my_net_slack_requests = 0; 835 #endif /* TBB_USE_ASSERT */ 836 my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME); 837 if( my_n_thread==0 ) { 838 my_n_thread = tbb::detail::r1::AvailableHwConcurrency(); 839 __TBB_ASSERT( my_n_thread>0, nullptr ); 840 } 841 842 my_asleep_list_root = nullptr; 843 my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread ); 844 for( size_t i=0; i<my_n_thread; ++i ) { 845 ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i ); 846 t->my_next = my_asleep_list_root; 847 my_asleep_list_root = t; 848 } 849 850 my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1); 851 new( my_waker ) ipc_waker( *this, client, my_n_thread ); 852 853 my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1); 854 new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 ); 855 856 char* active_sem_name = get_active_sem_name(); 857 my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 ); 858 __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 859 delete[] active_sem_name; 860 861 char* stop_sem_name = get_stop_sem_name(); 862 my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 ); 863 __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" ); 864 delete[] stop_sem_name; 865 } 866 867 ipc_server::~ipc_server() { 868 __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr ); 869 870 for( size_t i=my_n_thread; i--; ) 871 my_thread_array[i].~padded_ipc_worker(); 872 tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread ); 873 tbb::detail::d0::poison_pointer( my_thread_array ); 874 875 my_waker->~ipc_waker(); 876 tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 ); 877 tbb::detail::d0::poison_pointer( my_waker ); 878 879 my_stopper->~ipc_stopper(); 880 tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 ); 881 tbb::detail::d0::poison_pointer( my_stopper ); 882 883 sem_close( my_active_sem ); 884 sem_close( my_stop_sem ); 885 } 886 887 inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) { 888 asleep_list_mutex_type::scoped_lock lock; 889 if( !lock.try_acquire( my_asleep_list_mutex ) ) 890 return false; 891 // Contribute to slack under lock so that if another takes that unit of slack, 892 // it sees us sleeping on the list and wakes us up. 893 int k = ++my_slack; 894 if( k<=0 ) { 895 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 896 my_asleep_list_root.store(&t, std::memory_order_relaxed); 897 return true; 898 } else { 899 --my_slack; 900 return false; 901 } 902 } 903 904 inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) { 905 asleep_list_mutex_type::scoped_lock lock; 906 if( !lock.try_acquire( my_asleep_list_mutex ) ) 907 return false; 908 // Contribute to slack under lock so that if another takes that unit of slack, 909 // it sees us sleeping on the list and wakes us up. 910 ++my_slack; 911 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 912 my_asleep_list_root.store(&t, std::memory_order_relaxed); 913 return true; 914 } 915 916 inline bool ipc_server::wait_active_thread() { 917 if( sem_wait( my_active_sem ) == 0 ) { 918 ++my_global_thread_count; 919 return true; 920 } 921 return false; 922 } 923 924 inline bool ipc_server::try_get_active_thread() { 925 if( sem_trywait( my_active_sem ) == 0 ) { 926 ++my_global_thread_count; 927 return true; 928 } 929 return false; 930 } 931 932 inline void ipc_server::release_active_thread() { 933 release_thread_sem( my_active_sem ); 934 } 935 936 inline bool ipc_server::wait_stop_thread() { 937 struct timespec ts; 938 if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) { 939 ts.tv_sec++; 940 if( sem_timedwait( my_stop_sem, &ts )==0 ) { 941 return true; 942 } 943 } 944 return false; 945 } 946 947 inline void ipc_server::add_stop_thread() { 948 sem_post( my_stop_sem ); 949 } 950 951 void ipc_server::wake_some( int additional_slack, int active_threads ) { 952 __TBB_ASSERT( additional_slack>=0, nullptr ); 953 ipc_worker* wakee[2]; 954 ipc_worker **w = wakee; 955 { 956 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 957 while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) { 958 if( additional_slack>0 ) { 959 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 960 break; 961 --additional_slack; 962 } else { 963 // Chain reaction; Try to claim unit of slack 964 int old; 965 do { 966 old = my_slack.load(std::memory_order_relaxed); 967 if( old<=0 ) goto done; 968 } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 969 } 970 // Pop sleeping worker to combine with claimed unit of slack 971 my_asleep_list_root.store( 972 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 973 std::memory_order_relaxed 974 ); 975 --active_threads; 976 } 977 if( additional_slack ) { 978 // Contribute our unused slack to my_slack. 979 my_slack += additional_slack; 980 } 981 } 982 done: 983 while( w>wakee ) { 984 if( !(*--w)->wake_or_launch() ) { 985 add_stop_thread(); 986 do { 987 } while( !try_insert_in_asleep_list_forced(**w) ); 988 release_active_thread(); 989 } 990 } 991 while( active_threads ) { 992 release_active_thread(); 993 --active_threads; 994 } 995 } 996 997 void ipc_server::wake_one_forced( int additional_slack ) { 998 __TBB_ASSERT( additional_slack>=0, nullptr ); 999 ipc_worker* wakee[1]; 1000 ipc_worker **w = wakee; 1001 { 1002 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 1003 while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) { 1004 if( additional_slack>0 ) { 1005 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 1006 break; 1007 --additional_slack; 1008 } else { 1009 // Chain reaction; Try to claim unit of slack 1010 int old; 1011 do { 1012 old = my_slack.load(std::memory_order_relaxed); 1013 if( old<=0 ) goto done; 1014 } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 1015 } 1016 // Pop sleeping worker to combine with claimed unit of slack 1017 my_asleep_list_root.store( 1018 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 1019 std::memory_order_relaxed); 1020 } 1021 if( additional_slack ) { 1022 // Contribute our unused slack to my_slack. 1023 my_slack += additional_slack; 1024 } 1025 } 1026 done: 1027 while( w>wakee ) { 1028 if( !(*--w)->wake_or_launch() ) { 1029 add_stop_thread(); 1030 do { 1031 } while( !try_insert_in_asleep_list_forced(**w) ); 1032 } 1033 } 1034 } 1035 1036 bool ipc_server::stop_one() { 1037 ipc_worker* current = nullptr; 1038 ipc_worker* next = nullptr; 1039 { 1040 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 1041 if( my_asleep_list_root.load(std::memory_order_relaxed) ) { 1042 current = my_asleep_list_root.load(std::memory_order_relaxed); 1043 if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 1044 next = current->my_next; 1045 while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 1046 current = next; 1047 next = current->my_next; 1048 } 1049 current->start_stopping( my_join_workers ); 1050 return true; 1051 } 1052 } 1053 } 1054 return false; 1055 } 1056 1057 void ipc_server::adjust_job_count_estimate( int delta ) { 1058 #if TBB_USE_ASSERT 1059 my_net_slack_requests+=delta; 1060 #endif /* TBB_USE_ASSERT */ 1061 if( my_n_thread > 1 ) { 1062 if( delta<0 ) { 1063 my_slack+=delta; 1064 } else if( delta>0 ) { 1065 int active_threads = 0; 1066 if( try_get_active_thread() ) { 1067 ++active_threads; 1068 if( try_get_active_thread() ) { 1069 ++active_threads; 1070 } 1071 } 1072 wake_some( delta, active_threads ); 1073 1074 if( !my_waker->wake_or_launch() ) { 1075 add_stop_thread(); 1076 } 1077 if( !my_stopper->wake_or_launch() ) { 1078 add_stop_thread(); 1079 } 1080 } 1081 } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one 1082 if( delta<0 ) { 1083 my_slack += delta; 1084 } else { 1085 wake_one_forced( delta ); 1086 } 1087 } 1088 } 1089 1090 //------------------------------------------------------------------------ 1091 // RML factory methods 1092 //------------------------------------------------------------------------ 1093 1094 #if USE_PTHREAD 1095 1096 static tbb_client* my_global_client = nullptr; 1097 static tbb_server* my_global_server = nullptr; 1098 1099 void rml_atexit() { 1100 release_resources(); 1101 } 1102 1103 void rml_atfork_child() { 1104 if( my_global_server!=nullptr && my_global_client!=nullptr ) { 1105 ipc_server* server = static_cast<ipc_server*>( my_global_server ); 1106 server->~ipc_server(); 1107 // memset( server, 0, sizeof(ipc_server) ); 1108 new( server ) ipc_server( *my_global_client ); 1109 pthread_atfork( nullptr, nullptr, rml_atfork_child ); 1110 atexit( rml_atexit ); 1111 } 1112 } 1113 1114 #endif /* USE_PTHREAD */ 1115 1116 extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) { 1117 server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client); 1118 #if USE_PTHREAD 1119 my_global_client = &client; 1120 my_global_server = server; 1121 pthread_atfork( nullptr, nullptr, rml_atfork_child ); 1122 atexit( rml_atexit ); 1123 #endif /* USE_PTHREAD */ 1124 if( getenv( "RML_DEBUG" ) ) { 1125 runtime_warning("IPC server is started"); 1126 } 1127 return tbb_factory::st_success; 1128 } 1129 1130 extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) { 1131 } 1132 1133 } // namespace rml 1134 } // namespace detail 1135 1136 } // namespace tbb 1137