1 /* 2 Copyright (c) 2017-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include <atomic> 18 #include <cstring> 19 #include <cstdlib> 20 21 #include "../../src/tbb/rml_tbb.h" 22 #include "../../src/tbb/rml_thread_monitor.h" 23 #include "../../src/tbb/scheduler_common.h" 24 #include "../../src/tbb/governor.h" 25 #include "../../src/tbb/misc.h" 26 #include "tbb/cache_aligned_allocator.h" 27 28 #include "ipc_utils.h" 29 30 #include <fcntl.h> 31 #include <stdlib.h> 32 33 namespace rml { 34 namespace internal { 35 36 static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE"; 37 38 typedef versioned_object::version_type version_type; 39 40 extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) { 41 if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) { 42 return factory::st_incompatible; 43 } 44 45 // Hack to keep this library from being closed 46 static std::atomic<bool> one_time_flag{false}; 47 bool expected = false; 48 49 if( one_time_flag.compare_exchange_strong(expected, true) ) { 50 __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr ); 51 #if _WIN32||_WIN64 52 f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload); 53 #else 54 f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload); 55 #endif 56 } 57 // End of hack 58 59 return factory::st_success; 60 } 61 62 extern "C" void __RML_close_factory(factory& /*f*/) { 63 } 64 65 class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor { 66 public: 67 ipc_thread_monitor() : thread_monitor() {} 68 69 #if USE_WINTHREAD 70 #elif USE_PTHREAD 71 static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size); 72 #endif 73 }; 74 75 #if USE_WINTHREAD 76 #elif USE_PTHREAD 77 inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) { 78 pthread_attr_t s; 79 if( pthread_attr_init( &s ) ) return 0; 80 if( stack_size>0 ) { 81 if( pthread_attr_setstacksize( &s, stack_size ) ) return 0; 82 } 83 pthread_t handle; 84 if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0; 85 if( pthread_attr_destroy( &s ) ) return 0; 86 return handle; 87 } 88 #endif 89 90 }} // rml::internal 91 92 using rml::internal::ipc_thread_monitor; 93 using tbb::internal::rml::get_shared_name; 94 95 namespace tbb { 96 namespace detail { 97 98 namespace r1 { 99 bool terminate_on_exception() { 100 return false; 101 } 102 } 103 104 namespace rml { 105 106 typedef ipc_thread_monitor::handle_type thread_handle; 107 108 class ipc_server; 109 110 static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS"; 111 static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active"; 112 static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop"; 113 static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE"; 114 static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE"; 115 static const mode_t IPC_SEM_MODE = 0660; 116 117 static std::atomic<int> my_global_thread_count; 118 using tbb_client = tbb::detail::r1::rml::tbb_client; 119 using tbb_server = tbb::detail::r1::rml::tbb_server; 120 using tbb_factory = tbb::detail::r1::rml::tbb_factory; 121 122 using tbb::detail::r1::runtime_warning; 123 124 char* get_sem_name(const char* name, const char* prefix) { 125 __TBB_ASSERT(name != nullptr, nullptr); 126 __TBB_ASSERT(prefix != nullptr, nullptr); 127 char* value = std::getenv(name); 128 std::size_t len = value == nullptr ? 0 : std::strlen(value); 129 if (len > 0) { 130 // TODO: consider returning the original string instead of the copied string. 131 char* sem_name = new char[len + 1]; 132 __TBB_ASSERT(sem_name != nullptr, nullptr); 133 std::strncpy(sem_name, value, len+1); 134 __TBB_ASSERT(sem_name[len] == 0, nullptr); 135 return sem_name; 136 } else { 137 return get_shared_name(prefix); 138 } 139 } 140 141 char* get_active_sem_name() { 142 return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 143 } 144 145 char* get_stop_sem_name() { 146 return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 147 } 148 149 static void release_thread_sem(sem_t* my_sem) { 150 int old = my_global_thread_count.load(std::memory_order_relaxed); 151 do { 152 if( old<=0 ) return; 153 } while( !my_global_thread_count.compare_exchange_strong(old, old-1) ); 154 if( old>0 ) { 155 sem_post( my_sem ); 156 } 157 } 158 159 void set_sem_name(const char* name, const char* prefix) { 160 __TBB_ASSERT(name != nullptr, nullptr); 161 __TBB_ASSERT(prefix != nullptr, nullptr); 162 const char* postfix = "_XXXXXX"; 163 std::size_t plen = std::strlen(prefix); 164 std::size_t xlen = std::strlen(postfix); 165 char* templ = new char[plen + xlen + 1]; 166 __TBB_ASSERT(templ != nullptr, nullptr); 167 strncpy(templ, prefix, plen+1); 168 __TBB_ASSERT(templ[plen] == 0, nullptr); 169 strncat(templ, postfix, xlen + 1); 170 __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr); 171 // TODO: consider using mkstemp instead of mktemp. 172 char* sem_name = mktemp(templ); 173 if (sem_name != nullptr) { 174 int status = setenv(name, sem_name, /*overwrite*/ 1); 175 __TBB_ASSERT_EX(status == 0, nullptr); 176 } 177 delete[] templ; 178 } 179 180 extern "C" void set_active_sem_name() { 181 set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 182 } 183 184 extern "C" void set_stop_sem_name() { 185 set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 186 } 187 188 extern "C" void release_resources() { 189 if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) { 190 char* active_sem_name = get_active_sem_name(); 191 sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT ); 192 __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 193 delete[] active_sem_name; 194 195 do { 196 release_thread_sem( my_active_sem ); 197 } while( my_global_thread_count.load(std::memory_order_acquire)!=0 ); 198 } 199 } 200 201 extern "C" void release_semaphores() { 202 int status = 0; 203 char* sem_name = nullptr; 204 205 sem_name = get_active_sem_name(); 206 if( sem_name==nullptr ) { 207 runtime_warning("Can not get RML semaphore name"); 208 return; 209 } 210 status = sem_unlink( sem_name ); 211 if( status!=0 ) { 212 if( errno==ENOENT ) { 213 /* There is no semaphore with the given name, nothing to do */ 214 } else { 215 runtime_warning("Can not release RML semaphore"); 216 return; 217 } 218 } 219 delete[] sem_name; 220 221 sem_name = get_stop_sem_name(); 222 if( sem_name==nullptr ) { 223 runtime_warning( "Can not get RML semaphore name" ); 224 return; 225 } 226 status = sem_unlink( sem_name ); 227 if( status!=0 ) { 228 if( errno==ENOENT ) { 229 /* There is no semaphore with the given name, nothing to do */ 230 } else { 231 runtime_warning("Can not release RML semaphore"); 232 return; 233 } 234 } 235 delete[] sem_name; 236 } 237 238 class ipc_worker: no_copy { 239 protected: 240 //! State in finite-state machine that controls the worker. 241 /** State diagram: 242 /----------stop---\ 243 | ^ | 244 V | | 245 init --> starting --> normal | 246 | | | | 247 | V | | 248 \------> quit <-------/<----/ 249 */ 250 enum state_t { 251 //! *this is initialized 252 st_init, 253 //! *this has associated thread that is starting up. 254 st_starting, 255 //! Associated thread is doing normal life sequence. 256 st_normal, 257 //! Associated thread is stopped but can be started again. 258 st_stop, 259 //! Associated thread has ended normal life sequence and promises to never touch *this again. 260 st_quit 261 }; 262 std::atomic<state_t> my_state; 263 264 //! Associated server 265 ipc_server& my_server; 266 267 //! Associated client 268 tbb_client& my_client; 269 270 //! index used for avoiding the 64K aliasing problem 271 const size_t my_index; 272 273 //! Monitor for sleeping when there is no work to do. 274 /** The invariant that holds for sleeping workers is: 275 "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */ 276 ipc_thread_monitor my_thread_monitor; 277 278 //! Handle of the OS thread associated with this worker 279 thread_handle my_handle; 280 281 //! Link for list of workers that are sleeping or have no associated thread. 282 ipc_worker* my_next; 283 284 friend class ipc_server; 285 286 //! Actions executed by the associated thread 287 void run(); 288 289 //! Wake up associated thread (or launch a thread if there is none) 290 bool wake_or_launch(); 291 292 //! Called by a thread (usually not the associated thread) to commence termination. 293 void start_shutdown(bool join); 294 295 //! Called by a thread (usually not the associated thread) to commence stopping. 296 void start_stopping(bool join); 297 298 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 299 300 static void release_handle(thread_handle my_handle, bool join); 301 302 protected: 303 ipc_worker(ipc_server& server, tbb_client& client, const size_t i) : 304 my_server(server), 305 my_client(client), 306 my_index(i) 307 { 308 my_state = st_init; 309 } 310 }; 311 312 //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit 313 constexpr static size_t cache_line_sz = 128; 314 315 #if _MSC_VER && !defined(__INTEL_COMPILER) 316 // Suppress overzealous compiler warnings about uninstantiable class 317 #pragma warning(push) 318 #pragma warning(disable:4510 4610) 319 #endif 320 class padded_ipc_worker: public ipc_worker { 321 char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz]; 322 public: 323 padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i) 324 : ipc_worker( server,client,i ) { suppress_unused_warning(pad); } 325 }; 326 #if _MSC_VER && !defined(__INTEL_COMPILER) 327 #pragma warning(pop) 328 #endif 329 330 class ipc_waker : public padded_ipc_worker { 331 private: 332 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 333 void run(); 334 bool wake_or_launch(); 335 336 friend class ipc_server; 337 338 public: 339 ipc_waker(ipc_server& server, tbb_client& client, const size_t i) 340 : padded_ipc_worker( server, client, i ) {} 341 }; 342 343 class ipc_stopper : public padded_ipc_worker { 344 private: 345 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 346 void run(); 347 bool wake_or_launch(); 348 349 friend class ipc_server; 350 351 public: 352 ipc_stopper(ipc_server& server, tbb_client& client, const size_t i) 353 : padded_ipc_worker( server, client, i ) {} 354 }; 355 356 class ipc_server: public tbb_server, no_copy { 357 private: 358 tbb_client& my_client; 359 //! Maximum number of threads to be created. 360 /** Threads are created lazily, so maximum might not actually be reached. */ 361 tbb_client::size_type my_n_thread; 362 363 //! Stack size for each thread. */ 364 const size_t my_stack_size; 365 366 //! Number of jobs that could use their associated thread minus number of active threads. 367 /** If negative, indicates oversubscription. 368 If positive, indicates that more threads should run. 369 Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex, 370 because raising it impacts the invariant for sleeping threads. */ 371 std::atomic<int> my_slack; 372 373 //! Counter used to determine when to delete this. 374 std::atomic<int> my_ref_count; 375 376 padded_ipc_worker* my_thread_array; 377 378 //! List of workers that are asleep or committed to sleeping until notified by another thread. 379 std::atomic<ipc_worker*> my_asleep_list_root; 380 381 //! Protects my_asleep_list_root 382 typedef scheduler_mutex_type asleep_list_mutex_type; 383 asleep_list_mutex_type my_asleep_list_mutex; 384 385 //! Should server wait workers while terminate 386 const bool my_join_workers; 387 388 //! Service thread for waking of workers 389 ipc_waker* my_waker; 390 391 //! Service thread to stop threads 392 ipc_stopper* my_stopper; 393 394 //! Semaphore to account active threads 395 sem_t* my_active_sem; 396 397 //! Semaphore to account stop threads 398 sem_t* my_stop_sem; 399 400 #if TBB_USE_ASSERT 401 std::atomic<int> my_net_slack_requests; 402 #endif /* TBB_USE_ASSERT */ 403 404 //! Wake up to two sleeping workers, if there are any sleeping. 405 /** The call is used to propagate a chain reaction where each thread wakes up two threads, 406 which in turn each wake up two threads, etc. */ 407 void propagate_chain_reaction() { 408 // First test of a double-check idiom. Second test is inside wake_some(0). 409 if( my_slack.load(std::memory_order_acquire)>0 ) { 410 int active_threads = 0; 411 if( try_get_active_thread() ) { 412 ++active_threads; 413 if( try_get_active_thread() ) { 414 ++active_threads; 415 } 416 wake_some( 0, active_threads ); 417 } 418 } 419 } 420 421 //! Try to add t to list of sleeping workers 422 bool try_insert_in_asleep_list(ipc_worker& t); 423 424 //! Try to add t to list of sleeping workers even if there is some work to do 425 bool try_insert_in_asleep_list_forced(ipc_worker& t); 426 427 //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits. 428 void wake_some(int additional_slack, int active_threads); 429 430 //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits. 431 void wake_one_forced(int additional_slack); 432 433 //! Stop one thread from asleep list 434 bool stop_one(); 435 436 //! Wait for active thread 437 bool wait_active_thread(); 438 439 //! Try to get active thread 440 bool try_get_active_thread(); 441 442 //! Release active thread 443 void release_active_thread(); 444 445 //! Wait for thread to stop 446 bool wait_stop_thread(); 447 448 //! Add thread to stop list 449 void add_stop_thread(); 450 451 void remove_server_ref() { 452 if( --my_ref_count==0 ) { 453 my_client.acknowledge_close_connection(); 454 this->~ipc_server(); 455 tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 ); 456 } 457 } 458 459 friend class ipc_worker; 460 friend class ipc_waker; 461 friend class ipc_stopper; 462 public: 463 ipc_server(tbb_client& client); 464 virtual ~ipc_server(); 465 466 version_type version() const override { 467 return 0; 468 } 469 470 void request_close_connection(bool /*exiting*/) override { 471 my_waker->start_shutdown(false); 472 my_stopper->start_shutdown(false); 473 for( size_t i=0; i<my_n_thread; ++i ) 474 my_thread_array[i].start_shutdown( my_join_workers ); 475 remove_server_ref(); 476 } 477 478 void yield() override {d0::yield();} 479 480 void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); } 481 482 unsigned default_concurrency() const override { return my_n_thread - 1; } 483 484 void adjust_job_count_estimate(int delta) override; 485 486 #if _WIN32||_WIN64 487 void register_external_thread(::rml::server::execution_resource_t&) override {} 488 void unregister_external_thread(::rml::server::execution_resource_t) override {} 489 #endif /* _WIN32||_WIN64 */ 490 }; 491 492 //------------------------------------------------------------------------ 493 // Methods of ipc_worker 494 //------------------------------------------------------------------------ 495 #if _MSC_VER && !defined(__INTEL_COMPILER) 496 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 497 #pragma warning(push) 498 #pragma warning(disable:4189) 499 #endif 500 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 501 // ensure that stack is properly aligned 502 __attribute__((force_align_arg_pointer)) 503 #endif 504 __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) { 505 ipc_worker* self = static_cast<ipc_worker*>(arg); 506 AVOID_64K_ALIASING( self->my_index ); 507 self->run(); 508 return 0; 509 } 510 #if _MSC_VER && !defined(__INTEL_COMPILER) 511 #pragma warning(pop) 512 #endif 513 514 void ipc_worker::release_handle(thread_handle handle, bool join) { 515 if( join ) 516 ipc_thread_monitor::join( handle ); 517 else 518 ipc_thread_monitor::detach_thread( handle ); 519 } 520 521 void ipc_worker::start_shutdown(bool join) { 522 state_t s = my_state.load(std::memory_order_relaxed);; 523 524 do { 525 __TBB_ASSERT( s!=st_quit, nullptr ); 526 } while( !my_state.compare_exchange_strong( s, st_quit ) ); 527 if( s==st_normal || s==st_starting ) { 528 // May have invalidated invariant for sleeping, so wake up the thread. 529 // Note that the notify() here occurs without maintaining invariants for my_slack. 530 // It does not matter, because my_state==st_quit overrides checking of my_slack. 531 my_thread_monitor.notify(); 532 // Do not need release handle in st_init state, 533 // because in this case the thread wasn't started yet. 534 // For st_starting release is done at launch site. 535 if( s==st_normal ) 536 release_handle( my_handle, join ); 537 } 538 } 539 540 void ipc_worker::start_stopping(bool join) { 541 state_t s = my_state.load(std::memory_order_relaxed);; 542 543 while( !my_state.compare_exchange_strong( s, st_quit ) ) {}; 544 if( s==st_normal || s==st_starting ) { 545 // May have invalidated invariant for sleeping, so wake up the thread. 546 // Note that the notify() here occurs without maintaining invariants for my_slack. 547 // It does not matter, because my_state==st_quit overrides checking of my_slack. 548 my_thread_monitor.notify(); 549 // Do not need release handle in st_init state, 550 // because in this case the thread wasn't started yet. 551 // For st_starting release is done at launch site. 552 if( s==st_normal ) 553 release_handle( my_handle, join ); 554 } 555 } 556 557 void ipc_worker::run() { 558 my_server.propagate_chain_reaction(); 559 560 // Transiting to st_normal here would require setting my_handle, 561 // which would create race with the launching thread and 562 // complications in handle management on Windows. 563 564 ::rml::job& j = *my_client.create_one_job(); 565 state_t state = my_state.load(std::memory_order_acquire); 566 while( state!=st_quit && state!=st_stop ) { 567 if( my_server.my_slack>=0 ) { 568 my_client.process(j); 569 } else { 570 ipc_thread_monitor::cookie c; 571 // Prepare to wait 572 my_thread_monitor.prepare_wait(c); 573 // Check/set the invariant for sleeping 574 state = my_state.load(std::memory_order_acquire); 575 if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) { 576 if( my_server.my_n_thread > 1 ) my_server.release_active_thread(); 577 my_thread_monitor.commit_wait(c); 578 my_server.propagate_chain_reaction(); 579 } else { 580 // Invariant broken 581 my_thread_monitor.cancel_wait(); 582 } 583 } 584 state = my_state.load(std::memory_order_acquire); 585 } 586 my_client.cleanup(j); 587 588 my_server.remove_server_ref(); 589 } 590 591 inline bool ipc_worker::wake_or_launch() { 592 state_t excepted_stop = st_stop, expected_init = st_init; 593 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) || 594 ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) { 595 // after this point, remove_server_ref() must be done by created thread 596 #if USE_WINTHREAD 597 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 598 #elif USE_PTHREAD 599 { 600 affinity_helper fpa; 601 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 602 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 603 if( my_handle == 0 ) { 604 // Unable to create new thread for process 605 // However, this is expected situation for the use cases of this coordination server 606 state_t s = st_starting; 607 my_state.compare_exchange_strong( s, st_init ); 608 if (st_starting != s) { 609 // Do shutdown during startup. my_handle can't be released 610 // by start_shutdown, because my_handle value might be not set yet 611 // at time of transition from st_starting to st_quit. 612 __TBB_ASSERT( s==st_quit, nullptr ); 613 release_handle( my_handle, my_server.my_join_workers ); 614 } 615 return false; 616 } else { 617 my_server.my_ref_count++; 618 } 619 // Implicit destruction of fpa resets original affinity mask. 620 } 621 #endif /* USE_PTHREAD */ 622 state_t s = st_starting; 623 my_state.compare_exchange_strong( s, st_normal ); 624 if( st_starting!=s ) { 625 // Do shutdown during startup. my_handle can't be released 626 // by start_shutdown, because my_handle value might be not set yet 627 // at time of transition from st_starting to st_quit. 628 __TBB_ASSERT( s==st_quit, nullptr ); 629 release_handle( my_handle, my_server.my_join_workers ); 630 } 631 } 632 else { 633 my_thread_monitor.notify(); 634 } 635 636 return true; 637 } 638 639 //------------------------------------------------------------------------ 640 // Methods of ipc_waker 641 //------------------------------------------------------------------------ 642 #if _MSC_VER && !defined(__INTEL_COMPILER) 643 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 644 #pragma warning(push) 645 #pragma warning(disable:4189) 646 #endif 647 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 648 // ensure that stack is properly aligned 649 __attribute__((force_align_arg_pointer)) 650 #endif 651 __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) { 652 ipc_waker* self = static_cast<ipc_waker*>(arg); 653 AVOID_64K_ALIASING( self->my_index ); 654 self->run(); 655 return 0; 656 } 657 #if _MSC_VER && !defined(__INTEL_COMPILER) 658 #pragma warning(pop) 659 #endif 660 661 void ipc_waker::run() { 662 // Transiting to st_normal here would require setting my_handle, 663 // which would create race with the launching thread and 664 // complications in handle management on Windows. 665 666 while( my_state.load(std::memory_order_acquire)!=st_quit ) { 667 bool have_to_sleep = false; 668 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 669 if( my_server.wait_active_thread() ) { 670 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 671 my_server.wake_some( 0, 1 ); 672 } else { 673 my_server.release_active_thread(); 674 have_to_sleep = true; 675 } 676 } 677 } else { 678 have_to_sleep = true; 679 } 680 if( have_to_sleep ) { 681 ipc_thread_monitor::cookie c; 682 // Prepare to wait 683 my_thread_monitor.prepare_wait(c); 684 // Check/set the invariant for sleeping 685 if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) { 686 my_thread_monitor.commit_wait(c); 687 } else { 688 // Invariant broken 689 my_thread_monitor.cancel_wait(); 690 } 691 } 692 } 693 694 my_server.remove_server_ref(); 695 } 696 697 inline bool ipc_waker::wake_or_launch() { 698 state_t excepted = st_init; 699 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 700 // after this point, remove_server_ref() must be done by created thread 701 #if USE_WINTHREAD 702 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 703 #elif USE_PTHREAD 704 { 705 affinity_helper fpa; 706 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 707 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 708 if( my_handle == 0 ) { 709 runtime_warning( "Unable to create new thread for process %d", getpid() ); 710 state_t s = st_starting; 711 my_state.compare_exchange_strong(s, st_init); 712 if (st_starting != s) { 713 // Do shutdown during startup. my_handle can't be released 714 // by start_shutdown, because my_handle value might be not set yet 715 // at time of transition from st_starting to st_quit. 716 __TBB_ASSERT( s==st_quit, nullptr ); 717 release_handle( my_handle, my_server.my_join_workers ); 718 } 719 return false; 720 } else { 721 my_server.my_ref_count++; 722 } 723 // Implicit destruction of fpa resets original affinity mask. 724 } 725 #endif /* USE_PTHREAD */ 726 state_t s = st_starting; 727 my_state.compare_exchange_strong(s, st_normal); 728 if( st_starting!=s ) { 729 // Do shutdown during startup. my_handle can't be released 730 // by start_shutdown, because my_handle value might be not set yet 731 // at time of transition from st_starting to st_quit. 732 __TBB_ASSERT( s==st_quit, nullptr ); 733 release_handle( my_handle, my_server.my_join_workers ); 734 } 735 } 736 else { 737 my_thread_monitor.notify(); 738 } 739 740 return true; 741 } 742 743 //------------------------------------------------------------------------ 744 // Methods of ipc_stopper 745 //------------------------------------------------------------------------ 746 #if _MSC_VER && !defined(__INTEL_COMPILER) 747 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 748 #pragma warning(push) 749 #pragma warning(disable:4189) 750 #endif 751 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 752 // ensure that stack is properly aligned 753 __attribute__((force_align_arg_pointer)) 754 #endif 755 __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) { 756 ipc_stopper* self = static_cast<ipc_stopper*>(arg); 757 AVOID_64K_ALIASING( self->my_index ); 758 self->run(); 759 return 0; 760 } 761 #if _MSC_VER && !defined(__INTEL_COMPILER) 762 #pragma warning(pop) 763 #endif 764 765 void ipc_stopper::run() { 766 // Transiting to st_normal here would require setting my_handle, 767 // which would create race with the launching thread and 768 // complications in handle management on Windows. 769 770 while( my_state.load(std::memory_order_acquire)!=st_quit ) { 771 if( my_server.wait_stop_thread() ) { 772 if( my_state.load(std::memory_order_acquire)!=st_quit ) { 773 if( !my_server.stop_one() ) { 774 my_server.add_stop_thread(); 775 tbb::detail::r1::prolonged_pause(); 776 } 777 } 778 } 779 } 780 781 my_server.remove_server_ref(); 782 } 783 784 inline bool ipc_stopper::wake_or_launch() { 785 state_t excepted = st_init; 786 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 787 // after this point, remove_server_ref() must be done by created thread 788 #if USE_WINTHREAD 789 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 790 #elif USE_PTHREAD 791 { 792 affinity_helper fpa; 793 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 794 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 795 if( my_handle == 0 ) { 796 runtime_warning( "Unable to create new thread for process %d", getpid() ); 797 state_t s = st_starting; 798 my_state.compare_exchange_strong(s, st_init); 799 if (st_starting != s) { 800 // Do shutdown during startup. my_handle can't be released 801 // by start_shutdown, because my_handle value might be not set yet 802 // at time of transition from st_starting to st_quit. 803 __TBB_ASSERT( s==st_quit, nullptr ); 804 release_handle( my_handle, my_server.my_join_workers ); 805 } 806 return false; 807 } else { 808 my_server.my_ref_count++; 809 } 810 // Implicit destruction of fpa resets original affinity mask. 811 } 812 #endif /* USE_PTHREAD */ 813 state_t s = st_starting; 814 my_state.compare_exchange_strong(s, st_normal); 815 if( st_starting!=s ) { 816 // Do shutdown during startup. my_handle can't be released 817 // by start_shutdown, because my_handle value might be not set yet 818 // at time of transition from st_starting to st_quit. 819 __TBB_ASSERT( s==st_quit, nullptr ); 820 release_handle( my_handle, my_server.my_join_workers ); 821 } 822 } 823 else { 824 my_thread_monitor.notify(); 825 } 826 827 return true; 828 } 829 830 //------------------------------------------------------------------------ 831 // Methods of ipc_server 832 //------------------------------------------------------------------------ 833 ipc_server::ipc_server(tbb_client& client) : 834 my_client( client ), 835 my_stack_size( client.min_stack_size() ), 836 my_thread_array(nullptr), 837 my_join_workers(false), 838 my_waker(nullptr), 839 my_stopper(nullptr) 840 { 841 my_ref_count = 1; 842 my_slack = 0; 843 #if TBB_USE_ASSERT 844 my_net_slack_requests = 0; 845 #endif /* TBB_USE_ASSERT */ 846 my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME); 847 if( my_n_thread==0 ) { 848 my_n_thread = tbb::detail::r1::AvailableHwConcurrency(); 849 __TBB_ASSERT( my_n_thread>0, nullptr ); 850 } 851 852 my_asleep_list_root = nullptr; 853 my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread ); 854 for( size_t i=0; i<my_n_thread; ++i ) { 855 ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i ); 856 t->my_next = my_asleep_list_root; 857 my_asleep_list_root = t; 858 } 859 860 my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1); 861 new( my_waker ) ipc_waker( *this, client, my_n_thread ); 862 863 my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1); 864 new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 ); 865 866 char* active_sem_name = get_active_sem_name(); 867 my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 ); 868 __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 869 delete[] active_sem_name; 870 871 char* stop_sem_name = get_stop_sem_name(); 872 my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 ); 873 __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" ); 874 delete[] stop_sem_name; 875 } 876 877 ipc_server::~ipc_server() { 878 __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr ); 879 880 for( size_t i=my_n_thread; i--; ) 881 my_thread_array[i].~padded_ipc_worker(); 882 tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread ); 883 tbb::detail::d0::poison_pointer( my_thread_array ); 884 885 my_waker->~ipc_waker(); 886 tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 ); 887 tbb::detail::d0::poison_pointer( my_waker ); 888 889 my_stopper->~ipc_stopper(); 890 tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 ); 891 tbb::detail::d0::poison_pointer( my_stopper ); 892 893 sem_close( my_active_sem ); 894 sem_close( my_stop_sem ); 895 } 896 897 inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) { 898 asleep_list_mutex_type::scoped_lock lock; 899 if( !lock.try_acquire( my_asleep_list_mutex ) ) 900 return false; 901 // Contribute to slack under lock so that if another takes that unit of slack, 902 // it sees us sleeping on the list and wakes us up. 903 int k = ++my_slack; 904 if( k<=0 ) { 905 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 906 my_asleep_list_root.store(&t, std::memory_order_relaxed); 907 return true; 908 } else { 909 --my_slack; 910 return false; 911 } 912 } 913 914 inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) { 915 asleep_list_mutex_type::scoped_lock lock; 916 if( !lock.try_acquire( my_asleep_list_mutex ) ) 917 return false; 918 // Contribute to slack under lock so that if another takes that unit of slack, 919 // it sees us sleeping on the list and wakes us up. 920 ++my_slack; 921 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 922 my_asleep_list_root.store(&t, std::memory_order_relaxed); 923 return true; 924 } 925 926 inline bool ipc_server::wait_active_thread() { 927 if( sem_wait( my_active_sem ) == 0 ) { 928 ++my_global_thread_count; 929 return true; 930 } 931 return false; 932 } 933 934 inline bool ipc_server::try_get_active_thread() { 935 if( sem_trywait( my_active_sem ) == 0 ) { 936 ++my_global_thread_count; 937 return true; 938 } 939 return false; 940 } 941 942 inline void ipc_server::release_active_thread() { 943 release_thread_sem( my_active_sem ); 944 } 945 946 inline bool ipc_server::wait_stop_thread() { 947 struct timespec ts; 948 if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) { 949 ts.tv_sec++; 950 if( sem_timedwait( my_stop_sem, &ts )==0 ) { 951 return true; 952 } 953 } 954 return false; 955 } 956 957 inline void ipc_server::add_stop_thread() { 958 sem_post( my_stop_sem ); 959 } 960 961 void ipc_server::wake_some( int additional_slack, int active_threads ) { 962 __TBB_ASSERT( additional_slack>=0, nullptr ); 963 ipc_worker* wakee[2]; 964 ipc_worker **w = wakee; 965 { 966 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 967 while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) { 968 if( additional_slack>0 ) { 969 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 970 break; 971 --additional_slack; 972 } else { 973 // Chain reaction; Try to claim unit of slack 974 int old; 975 do { 976 old = my_slack.load(std::memory_order_relaxed); 977 if( old<=0 ) goto done; 978 } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 979 } 980 // Pop sleeping worker to combine with claimed unit of slack 981 my_asleep_list_root.store( 982 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 983 std::memory_order_relaxed 984 ); 985 --active_threads; 986 } 987 if( additional_slack ) { 988 // Contribute our unused slack to my_slack. 989 my_slack += additional_slack; 990 } 991 } 992 done: 993 while( w>wakee ) { 994 if( !(*--w)->wake_or_launch() ) { 995 add_stop_thread(); 996 do { 997 } while( !try_insert_in_asleep_list_forced(**w) ); 998 release_active_thread(); 999 } 1000 } 1001 while( active_threads ) { 1002 release_active_thread(); 1003 --active_threads; 1004 } 1005 } 1006 1007 void ipc_server::wake_one_forced( int additional_slack ) { 1008 __TBB_ASSERT( additional_slack>=0, nullptr ); 1009 ipc_worker* wakee[1]; 1010 ipc_worker **w = wakee; 1011 { 1012 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 1013 while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) { 1014 if( additional_slack>0 ) { 1015 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 1016 break; 1017 --additional_slack; 1018 } else { 1019 // Chain reaction; Try to claim unit of slack 1020 int old; 1021 do { 1022 old = my_slack.load(std::memory_order_relaxed); 1023 if( old<=0 ) goto done; 1024 } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 1025 } 1026 // Pop sleeping worker to combine with claimed unit of slack 1027 my_asleep_list_root.store( 1028 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 1029 std::memory_order_relaxed); 1030 } 1031 if( additional_slack ) { 1032 // Contribute our unused slack to my_slack. 1033 my_slack += additional_slack; 1034 } 1035 } 1036 done: 1037 while( w>wakee ) { 1038 if( !(*--w)->wake_or_launch() ) { 1039 add_stop_thread(); 1040 do { 1041 } while( !try_insert_in_asleep_list_forced(**w) ); 1042 } 1043 } 1044 } 1045 1046 bool ipc_server::stop_one() { 1047 ipc_worker* current = nullptr; 1048 ipc_worker* next = nullptr; 1049 { 1050 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 1051 if( my_asleep_list_root.load(std::memory_order_relaxed) ) { 1052 current = my_asleep_list_root.load(std::memory_order_relaxed); 1053 if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 1054 next = current->my_next; 1055 while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 1056 current = next; 1057 next = current->my_next; 1058 } 1059 current->start_stopping( my_join_workers ); 1060 return true; 1061 } 1062 } 1063 } 1064 return false; 1065 } 1066 1067 void ipc_server::adjust_job_count_estimate( int delta ) { 1068 #if TBB_USE_ASSERT 1069 my_net_slack_requests+=delta; 1070 #endif /* TBB_USE_ASSERT */ 1071 if( my_n_thread > 1 ) { 1072 if( delta<0 ) { 1073 my_slack+=delta; 1074 } else if( delta>0 ) { 1075 int active_threads = 0; 1076 if( try_get_active_thread() ) { 1077 ++active_threads; 1078 if( try_get_active_thread() ) { 1079 ++active_threads; 1080 } 1081 } 1082 wake_some( delta, active_threads ); 1083 1084 if( !my_waker->wake_or_launch() ) { 1085 add_stop_thread(); 1086 } 1087 if( !my_stopper->wake_or_launch() ) { 1088 add_stop_thread(); 1089 } 1090 } 1091 } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one 1092 if( delta<0 ) { 1093 my_slack += delta; 1094 } else { 1095 wake_one_forced( delta ); 1096 } 1097 } 1098 } 1099 1100 //------------------------------------------------------------------------ 1101 // RML factory methods 1102 //------------------------------------------------------------------------ 1103 1104 #if USE_PTHREAD 1105 1106 static tbb_client* my_global_client = nullptr; 1107 static tbb_server* my_global_server = nullptr; 1108 1109 void rml_atexit() { 1110 release_resources(); 1111 } 1112 1113 void rml_atfork_child() { 1114 if( my_global_server!=nullptr && my_global_client!=nullptr ) { 1115 ipc_server* server = static_cast<ipc_server*>( my_global_server ); 1116 server->~ipc_server(); 1117 // memset( server, 0, sizeof(ipc_server) ); 1118 new( server ) ipc_server( *my_global_client ); 1119 pthread_atfork( nullptr, nullptr, rml_atfork_child ); 1120 atexit( rml_atexit ); 1121 } 1122 } 1123 1124 #endif /* USE_PTHREAD */ 1125 1126 extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) { 1127 server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client); 1128 #if USE_PTHREAD 1129 my_global_client = &client; 1130 my_global_server = server; 1131 pthread_atfork( nullptr, nullptr, rml_atfork_child ); 1132 atexit( rml_atexit ); 1133 #endif /* USE_PTHREAD */ 1134 if( getenv( "RML_DEBUG" ) ) { 1135 runtime_warning("IPC server is started"); 1136 } 1137 return tbb_factory::st_success; 1138 } 1139 1140 extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) { 1141 } 1142 1143 } // namespace rml 1144 } // namespace detail 1145 1146 } // namespace tbb 1147