1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2017-2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev #include <atomic> 18*51c0b2f7Stbbdev #include <cstring> 19*51c0b2f7Stbbdev #include <cstdlib> 20*51c0b2f7Stbbdev 21*51c0b2f7Stbbdev #include "../../src/tbb/rml_tbb.h" 22*51c0b2f7Stbbdev #include "../../src/tbb/rml_thread_monitor.h" 23*51c0b2f7Stbbdev #include "../../src/tbb/scheduler_common.h" 24*51c0b2f7Stbbdev #include "../../src/tbb/governor.h" 25*51c0b2f7Stbbdev #include "../../src/tbb/misc.h" 26*51c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h" 27*51c0b2f7Stbbdev 28*51c0b2f7Stbbdev #include "ipc_utils.h" 29*51c0b2f7Stbbdev 30*51c0b2f7Stbbdev #include <fcntl.h> 31*51c0b2f7Stbbdev #include <stdlib.h> 32*51c0b2f7Stbbdev 33*51c0b2f7Stbbdev namespace rml { 34*51c0b2f7Stbbdev namespace internal { 35*51c0b2f7Stbbdev 36*51c0b2f7Stbbdev static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE"; 37*51c0b2f7Stbbdev 38*51c0b2f7Stbbdev typedef versioned_object::version_type version_type; 39*51c0b2f7Stbbdev 40*51c0b2f7Stbbdev extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) { 41*51c0b2f7Stbbdev if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) { 42*51c0b2f7Stbbdev return factory::st_incompatible; 43*51c0b2f7Stbbdev } 44*51c0b2f7Stbbdev 45*51c0b2f7Stbbdev // Hack to keep this library from being closed 46*51c0b2f7Stbbdev static std::atomic<bool> one_time_flag{false}; 47*51c0b2f7Stbbdev bool expected = false; 48*51c0b2f7Stbbdev 49*51c0b2f7Stbbdev if( one_time_flag.compare_exchange_strong(expected, true) ) { 50*51c0b2f7Stbbdev __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr ); 51*51c0b2f7Stbbdev #if _WIN32||_WIN64 52*51c0b2f7Stbbdev f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload); 53*51c0b2f7Stbbdev #else 54*51c0b2f7Stbbdev f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload); 55*51c0b2f7Stbbdev #endif 56*51c0b2f7Stbbdev } 57*51c0b2f7Stbbdev // End of hack 58*51c0b2f7Stbbdev 59*51c0b2f7Stbbdev return factory::st_success; 60*51c0b2f7Stbbdev } 61*51c0b2f7Stbbdev 62*51c0b2f7Stbbdev extern "C" void __RML_close_factory(factory& /*f*/) { 63*51c0b2f7Stbbdev } 64*51c0b2f7Stbbdev 65*51c0b2f7Stbbdev class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor { 66*51c0b2f7Stbbdev public: 67*51c0b2f7Stbbdev ipc_thread_monitor() : thread_monitor() {} 68*51c0b2f7Stbbdev 69*51c0b2f7Stbbdev #if USE_WINTHREAD 70*51c0b2f7Stbbdev #elif USE_PTHREAD 71*51c0b2f7Stbbdev static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size); 72*51c0b2f7Stbbdev #endif 73*51c0b2f7Stbbdev }; 74*51c0b2f7Stbbdev 75*51c0b2f7Stbbdev #if USE_WINTHREAD 76*51c0b2f7Stbbdev #elif USE_PTHREAD 77*51c0b2f7Stbbdev inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) { 78*51c0b2f7Stbbdev pthread_attr_t s; 79*51c0b2f7Stbbdev if( pthread_attr_init( &s ) ) return 0; 80*51c0b2f7Stbbdev if( stack_size>0 ) { 81*51c0b2f7Stbbdev if( pthread_attr_setstacksize( &s, stack_size ) ) return 0; 82*51c0b2f7Stbbdev } 83*51c0b2f7Stbbdev pthread_t handle; 84*51c0b2f7Stbbdev if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0; 85*51c0b2f7Stbbdev if( pthread_attr_destroy( &s ) ) return 0; 86*51c0b2f7Stbbdev return handle; 87*51c0b2f7Stbbdev } 88*51c0b2f7Stbbdev #endif 89*51c0b2f7Stbbdev 90*51c0b2f7Stbbdev }} // rml::internal 91*51c0b2f7Stbbdev 92*51c0b2f7Stbbdev using rml::internal::ipc_thread_monitor; 93*51c0b2f7Stbbdev using tbb::internal::rml::get_shared_name; 94*51c0b2f7Stbbdev 95*51c0b2f7Stbbdev namespace tbb { 96*51c0b2f7Stbbdev namespace detail { 97*51c0b2f7Stbbdev 98*51c0b2f7Stbbdev namespace r1 { 99*51c0b2f7Stbbdev bool terminate_on_exception() { 100*51c0b2f7Stbbdev return false; 101*51c0b2f7Stbbdev } 102*51c0b2f7Stbbdev } 103*51c0b2f7Stbbdev 104*51c0b2f7Stbbdev namespace rml { 105*51c0b2f7Stbbdev 106*51c0b2f7Stbbdev typedef ipc_thread_monitor::handle_type thread_handle; 107*51c0b2f7Stbbdev 108*51c0b2f7Stbbdev class ipc_server; 109*51c0b2f7Stbbdev 110*51c0b2f7Stbbdev static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS"; 111*51c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active"; 112*51c0b2f7Stbbdev static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop"; 113*51c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE"; 114*51c0b2f7Stbbdev static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE"; 115*51c0b2f7Stbbdev static const mode_t IPC_SEM_MODE = 0660; 116*51c0b2f7Stbbdev 117*51c0b2f7Stbbdev static std::atomic<int> my_global_thread_count; 118*51c0b2f7Stbbdev using tbb_client = tbb::detail::r1::rml::tbb_client; 119*51c0b2f7Stbbdev using tbb_server = tbb::detail::r1::rml::tbb_server; 120*51c0b2f7Stbbdev using tbb_factory = tbb::detail::r1::rml::tbb_factory; 121*51c0b2f7Stbbdev 122*51c0b2f7Stbbdev using tbb::detail::r1::runtime_warning; 123*51c0b2f7Stbbdev 124*51c0b2f7Stbbdev char* get_sem_name(const char* name, const char* prefix) { 125*51c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr); 126*51c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr); 127*51c0b2f7Stbbdev char* value = std::getenv(name); 128*51c0b2f7Stbbdev std::size_t len = value == nullptr ? 0 : std::strlen(value); 129*51c0b2f7Stbbdev if (len > 0) { 130*51c0b2f7Stbbdev // TODO: consider returning the original string instead of the copied string. 131*51c0b2f7Stbbdev char* sem_name = new char[len + 1]; 132*51c0b2f7Stbbdev __TBB_ASSERT(sem_name != nullptr, nullptr); 133*51c0b2f7Stbbdev std::strncpy(sem_name, value, len+1); 134*51c0b2f7Stbbdev __TBB_ASSERT(sem_name[len] == 0, nullptr); 135*51c0b2f7Stbbdev return sem_name; 136*51c0b2f7Stbbdev } else { 137*51c0b2f7Stbbdev return get_shared_name(prefix); 138*51c0b2f7Stbbdev } 139*51c0b2f7Stbbdev } 140*51c0b2f7Stbbdev 141*51c0b2f7Stbbdev char* get_active_sem_name() { 142*51c0b2f7Stbbdev return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 143*51c0b2f7Stbbdev } 144*51c0b2f7Stbbdev 145*51c0b2f7Stbbdev char* get_stop_sem_name() { 146*51c0b2f7Stbbdev return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 147*51c0b2f7Stbbdev } 148*51c0b2f7Stbbdev 149*51c0b2f7Stbbdev static void release_thread_sem(sem_t* my_sem) { 150*51c0b2f7Stbbdev int old = my_global_thread_count.load(std::memory_order_relaxed); 151*51c0b2f7Stbbdev do { 152*51c0b2f7Stbbdev if( old<=0 ) return; 153*51c0b2f7Stbbdev } while( !my_global_thread_count.compare_exchange_strong(old, old-1) ); 154*51c0b2f7Stbbdev if( old>0 ) { 155*51c0b2f7Stbbdev sem_post( my_sem ); 156*51c0b2f7Stbbdev } 157*51c0b2f7Stbbdev } 158*51c0b2f7Stbbdev 159*51c0b2f7Stbbdev void set_sem_name(const char* name, const char* prefix) { 160*51c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr); 161*51c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr); 162*51c0b2f7Stbbdev const char* postfix = "_XXXXXX"; 163*51c0b2f7Stbbdev std::size_t plen = std::strlen(prefix); 164*51c0b2f7Stbbdev std::size_t xlen = std::strlen(postfix); 165*51c0b2f7Stbbdev char* templ = new char[plen + xlen + 1]; 166*51c0b2f7Stbbdev __TBB_ASSERT(templ != nullptr, nullptr); 167*51c0b2f7Stbbdev strncpy(templ, prefix, plen+1); 168*51c0b2f7Stbbdev __TBB_ASSERT(templ[plen] == 0, nullptr); 169*51c0b2f7Stbbdev strncat(templ, postfix, xlen + 1); 170*51c0b2f7Stbbdev __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr); 171*51c0b2f7Stbbdev // TODO: consider using mkstemp instead of mktemp. 172*51c0b2f7Stbbdev char* sem_name = mktemp(templ); 173*51c0b2f7Stbbdev if (sem_name != nullptr) { 174*51c0b2f7Stbbdev int status = setenv(name, sem_name, /*overwrite*/ 1); 175*51c0b2f7Stbbdev __TBB_ASSERT_EX(status == 0, nullptr); 176*51c0b2f7Stbbdev } 177*51c0b2f7Stbbdev delete[] templ; 178*51c0b2f7Stbbdev } 179*51c0b2f7Stbbdev 180*51c0b2f7Stbbdev extern "C" void set_active_sem_name() { 181*51c0b2f7Stbbdev set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 182*51c0b2f7Stbbdev } 183*51c0b2f7Stbbdev 184*51c0b2f7Stbbdev extern "C" void set_stop_sem_name() { 185*51c0b2f7Stbbdev set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 186*51c0b2f7Stbbdev } 187*51c0b2f7Stbbdev 188*51c0b2f7Stbbdev extern "C" void release_resources() { 189*51c0b2f7Stbbdev if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) { 190*51c0b2f7Stbbdev char* active_sem_name = get_active_sem_name(); 191*51c0b2f7Stbbdev sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT ); 192*51c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 193*51c0b2f7Stbbdev delete[] active_sem_name; 194*51c0b2f7Stbbdev 195*51c0b2f7Stbbdev do { 196*51c0b2f7Stbbdev release_thread_sem( my_active_sem ); 197*51c0b2f7Stbbdev } while( my_global_thread_count.load(std::memory_order_acquire)!=0 ); 198*51c0b2f7Stbbdev } 199*51c0b2f7Stbbdev } 200*51c0b2f7Stbbdev 201*51c0b2f7Stbbdev extern "C" void release_semaphores() { 202*51c0b2f7Stbbdev int status = 0; 203*51c0b2f7Stbbdev char* sem_name = nullptr; 204*51c0b2f7Stbbdev 205*51c0b2f7Stbbdev sem_name = get_active_sem_name(); 206*51c0b2f7Stbbdev if( sem_name==nullptr ) { 207*51c0b2f7Stbbdev runtime_warning("Can not get RML semaphore name"); 208*51c0b2f7Stbbdev return; 209*51c0b2f7Stbbdev } 210*51c0b2f7Stbbdev status = sem_unlink( sem_name ); 211*51c0b2f7Stbbdev if( status!=0 ) { 212*51c0b2f7Stbbdev if( errno==ENOENT ) { 213*51c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */ 214*51c0b2f7Stbbdev } else { 215*51c0b2f7Stbbdev runtime_warning("Can not release RML semaphore"); 216*51c0b2f7Stbbdev return; 217*51c0b2f7Stbbdev } 218*51c0b2f7Stbbdev } 219*51c0b2f7Stbbdev delete[] sem_name; 220*51c0b2f7Stbbdev 221*51c0b2f7Stbbdev sem_name = get_stop_sem_name(); 222*51c0b2f7Stbbdev if( sem_name==nullptr ) { 223*51c0b2f7Stbbdev runtime_warning( "Can not get RML semaphore name" ); 224*51c0b2f7Stbbdev return; 225*51c0b2f7Stbbdev } 226*51c0b2f7Stbbdev status = sem_unlink( sem_name ); 227*51c0b2f7Stbbdev if( status!=0 ) { 228*51c0b2f7Stbbdev if( errno==ENOENT ) { 229*51c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */ 230*51c0b2f7Stbbdev } else { 231*51c0b2f7Stbbdev runtime_warning("Can not release RML semaphore"); 232*51c0b2f7Stbbdev return; 233*51c0b2f7Stbbdev } 234*51c0b2f7Stbbdev } 235*51c0b2f7Stbbdev delete[] sem_name; 236*51c0b2f7Stbbdev } 237*51c0b2f7Stbbdev 238*51c0b2f7Stbbdev class ipc_worker: no_copy { 239*51c0b2f7Stbbdev protected: 240*51c0b2f7Stbbdev //! State in finite-state machine that controls the worker. 241*51c0b2f7Stbbdev /** State diagram: 242*51c0b2f7Stbbdev /----------stop---\ 243*51c0b2f7Stbbdev | ^ | 244*51c0b2f7Stbbdev V | | 245*51c0b2f7Stbbdev init --> starting --> normal | 246*51c0b2f7Stbbdev | | | | 247*51c0b2f7Stbbdev | V | | 248*51c0b2f7Stbbdev \------> quit <-------/<----/ 249*51c0b2f7Stbbdev */ 250*51c0b2f7Stbbdev enum state_t { 251*51c0b2f7Stbbdev //! *this is initialized 252*51c0b2f7Stbbdev st_init, 253*51c0b2f7Stbbdev //! *this has associated thread that is starting up. 254*51c0b2f7Stbbdev st_starting, 255*51c0b2f7Stbbdev //! Associated thread is doing normal life sequence. 256*51c0b2f7Stbbdev st_normal, 257*51c0b2f7Stbbdev //! Associated thread is stopped but can be started again. 258*51c0b2f7Stbbdev st_stop, 259*51c0b2f7Stbbdev //! Associated thread has ended normal life sequence and promises to never touch *this again. 260*51c0b2f7Stbbdev st_quit 261*51c0b2f7Stbbdev }; 262*51c0b2f7Stbbdev std::atomic<state_t> my_state; 263*51c0b2f7Stbbdev 264*51c0b2f7Stbbdev //! Associated server 265*51c0b2f7Stbbdev ipc_server& my_server; 266*51c0b2f7Stbbdev 267*51c0b2f7Stbbdev //! Associated client 268*51c0b2f7Stbbdev tbb_client& my_client; 269*51c0b2f7Stbbdev 270*51c0b2f7Stbbdev //! index used for avoiding the 64K aliasing problem 271*51c0b2f7Stbbdev const size_t my_index; 272*51c0b2f7Stbbdev 273*51c0b2f7Stbbdev //! Monitor for sleeping when there is no work to do. 274*51c0b2f7Stbbdev /** The invariant that holds for sleeping workers is: 275*51c0b2f7Stbbdev "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */ 276*51c0b2f7Stbbdev ipc_thread_monitor my_thread_monitor; 277*51c0b2f7Stbbdev 278*51c0b2f7Stbbdev //! Handle of the OS thread associated with this worker 279*51c0b2f7Stbbdev thread_handle my_handle; 280*51c0b2f7Stbbdev 281*51c0b2f7Stbbdev //! Link for list of workers that are sleeping or have no associated thread. 282*51c0b2f7Stbbdev ipc_worker* my_next; 283*51c0b2f7Stbbdev 284*51c0b2f7Stbbdev friend class ipc_server; 285*51c0b2f7Stbbdev 286*51c0b2f7Stbbdev //! Actions executed by the associated thread 287*51c0b2f7Stbbdev void run(); 288*51c0b2f7Stbbdev 289*51c0b2f7Stbbdev //! Wake up associated thread (or launch a thread if there is none) 290*51c0b2f7Stbbdev bool wake_or_launch(); 291*51c0b2f7Stbbdev 292*51c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence termination. 293*51c0b2f7Stbbdev void start_shutdown(bool join); 294*51c0b2f7Stbbdev 295*51c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence stopping. 296*51c0b2f7Stbbdev void start_stopping(bool join); 297*51c0b2f7Stbbdev 298*51c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 299*51c0b2f7Stbbdev 300*51c0b2f7Stbbdev static void release_handle(thread_handle my_handle, bool join); 301*51c0b2f7Stbbdev 302*51c0b2f7Stbbdev protected: 303*51c0b2f7Stbbdev ipc_worker(ipc_server& server, tbb_client& client, const size_t i) : 304*51c0b2f7Stbbdev my_server(server), 305*51c0b2f7Stbbdev my_client(client), 306*51c0b2f7Stbbdev my_index(i) 307*51c0b2f7Stbbdev { 308*51c0b2f7Stbbdev my_state = st_init; 309*51c0b2f7Stbbdev } 310*51c0b2f7Stbbdev }; 311*51c0b2f7Stbbdev 312*51c0b2f7Stbbdev //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit 313*51c0b2f7Stbbdev constexpr static size_t cache_line_sz = 128; 314*51c0b2f7Stbbdev 315*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 316*51c0b2f7Stbbdev // Suppress overzealous compiler warnings about uninstantiable class 317*51c0b2f7Stbbdev #pragma warning(push) 318*51c0b2f7Stbbdev #pragma warning(disable:4510 4610) 319*51c0b2f7Stbbdev #endif 320*51c0b2f7Stbbdev class padded_ipc_worker: public ipc_worker { 321*51c0b2f7Stbbdev char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz]; 322*51c0b2f7Stbbdev public: 323*51c0b2f7Stbbdev padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i) 324*51c0b2f7Stbbdev : ipc_worker( server,client,i ) { suppress_unused_warning(pad); } 325*51c0b2f7Stbbdev }; 326*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 327*51c0b2f7Stbbdev #pragma warning(pop) 328*51c0b2f7Stbbdev #endif 329*51c0b2f7Stbbdev 330*51c0b2f7Stbbdev class ipc_waker : public padded_ipc_worker { 331*51c0b2f7Stbbdev private: 332*51c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 333*51c0b2f7Stbbdev void run(); 334*51c0b2f7Stbbdev bool wake_or_launch(); 335*51c0b2f7Stbbdev 336*51c0b2f7Stbbdev friend class ipc_server; 337*51c0b2f7Stbbdev 338*51c0b2f7Stbbdev public: 339*51c0b2f7Stbbdev ipc_waker(ipc_server& server, tbb_client& client, const size_t i) 340*51c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {} 341*51c0b2f7Stbbdev }; 342*51c0b2f7Stbbdev 343*51c0b2f7Stbbdev class ipc_stopper : public padded_ipc_worker { 344*51c0b2f7Stbbdev private: 345*51c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 346*51c0b2f7Stbbdev void run(); 347*51c0b2f7Stbbdev bool wake_or_launch(); 348*51c0b2f7Stbbdev 349*51c0b2f7Stbbdev friend class ipc_server; 350*51c0b2f7Stbbdev 351*51c0b2f7Stbbdev public: 352*51c0b2f7Stbbdev ipc_stopper(ipc_server& server, tbb_client& client, const size_t i) 353*51c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {} 354*51c0b2f7Stbbdev }; 355*51c0b2f7Stbbdev 356*51c0b2f7Stbbdev class ipc_server: public tbb_server, no_copy { 357*51c0b2f7Stbbdev private: 358*51c0b2f7Stbbdev tbb_client& my_client; 359*51c0b2f7Stbbdev //! Maximum number of threads to be created. 360*51c0b2f7Stbbdev /** Threads are created lazily, so maximum might not actually be reached. */ 361*51c0b2f7Stbbdev tbb_client::size_type my_n_thread; 362*51c0b2f7Stbbdev 363*51c0b2f7Stbbdev //! Stack size for each thread. */ 364*51c0b2f7Stbbdev const size_t my_stack_size; 365*51c0b2f7Stbbdev 366*51c0b2f7Stbbdev //! Number of jobs that could use their associated thread minus number of active threads. 367*51c0b2f7Stbbdev /** If negative, indicates oversubscription. 368*51c0b2f7Stbbdev If positive, indicates that more threads should run. 369*51c0b2f7Stbbdev Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex, 370*51c0b2f7Stbbdev because raising it impacts the invariant for sleeping threads. */ 371*51c0b2f7Stbbdev std::atomic<int> my_slack; 372*51c0b2f7Stbbdev 373*51c0b2f7Stbbdev //! Counter used to determine when to delete this. 374*51c0b2f7Stbbdev std::atomic<int> my_ref_count; 375*51c0b2f7Stbbdev 376*51c0b2f7Stbbdev padded_ipc_worker* my_thread_array; 377*51c0b2f7Stbbdev 378*51c0b2f7Stbbdev //! List of workers that are asleep or committed to sleeping until notified by another thread. 379*51c0b2f7Stbbdev std::atomic<ipc_worker*> my_asleep_list_root; 380*51c0b2f7Stbbdev 381*51c0b2f7Stbbdev //! Protects my_asleep_list_root 382*51c0b2f7Stbbdev typedef scheduler_mutex_type asleep_list_mutex_type; 383*51c0b2f7Stbbdev asleep_list_mutex_type my_asleep_list_mutex; 384*51c0b2f7Stbbdev 385*51c0b2f7Stbbdev //! Should server wait workers while terminate 386*51c0b2f7Stbbdev const bool my_join_workers; 387*51c0b2f7Stbbdev 388*51c0b2f7Stbbdev //! Service thread for waking of workers 389*51c0b2f7Stbbdev ipc_waker* my_waker; 390*51c0b2f7Stbbdev 391*51c0b2f7Stbbdev //! Service thread to stop threads 392*51c0b2f7Stbbdev ipc_stopper* my_stopper; 393*51c0b2f7Stbbdev 394*51c0b2f7Stbbdev //! Semaphore to account active threads 395*51c0b2f7Stbbdev sem_t* my_active_sem; 396*51c0b2f7Stbbdev 397*51c0b2f7Stbbdev //! Semaphore to account stop threads 398*51c0b2f7Stbbdev sem_t* my_stop_sem; 399*51c0b2f7Stbbdev 400*51c0b2f7Stbbdev #if TBB_USE_ASSERT 401*51c0b2f7Stbbdev std::atomic<int> my_net_slack_requests; 402*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 403*51c0b2f7Stbbdev 404*51c0b2f7Stbbdev //! Wake up to two sleeping workers, if there are any sleeping. 405*51c0b2f7Stbbdev /** The call is used to propagate a chain reaction where each thread wakes up two threads, 406*51c0b2f7Stbbdev which in turn each wake up two threads, etc. */ 407*51c0b2f7Stbbdev void propagate_chain_reaction() { 408*51c0b2f7Stbbdev // First test of a double-check idiom. Second test is inside wake_some(0). 409*51c0b2f7Stbbdev if( my_slack.load(std::memory_order_acquire)>0 ) { 410*51c0b2f7Stbbdev int active_threads = 0; 411*51c0b2f7Stbbdev if( try_get_active_thread() ) { 412*51c0b2f7Stbbdev ++active_threads; 413*51c0b2f7Stbbdev if( try_get_active_thread() ) { 414*51c0b2f7Stbbdev ++active_threads; 415*51c0b2f7Stbbdev } 416*51c0b2f7Stbbdev wake_some( 0, active_threads ); 417*51c0b2f7Stbbdev } 418*51c0b2f7Stbbdev } 419*51c0b2f7Stbbdev } 420*51c0b2f7Stbbdev 421*51c0b2f7Stbbdev //! Try to add t to list of sleeping workers 422*51c0b2f7Stbbdev bool try_insert_in_asleep_list(ipc_worker& t); 423*51c0b2f7Stbbdev 424*51c0b2f7Stbbdev //! Try to add t to list of sleeping workers even if there is some work to do 425*51c0b2f7Stbbdev bool try_insert_in_asleep_list_forced(ipc_worker& t); 426*51c0b2f7Stbbdev 427*51c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits. 428*51c0b2f7Stbbdev void wake_some(int additional_slack, int active_threads); 429*51c0b2f7Stbbdev 430*51c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits. 431*51c0b2f7Stbbdev void wake_one_forced(int additional_slack); 432*51c0b2f7Stbbdev 433*51c0b2f7Stbbdev //! Stop one thread from asleep list 434*51c0b2f7Stbbdev bool stop_one(); 435*51c0b2f7Stbbdev 436*51c0b2f7Stbbdev //! Wait for active thread 437*51c0b2f7Stbbdev bool wait_active_thread(); 438*51c0b2f7Stbbdev 439*51c0b2f7Stbbdev //! Try to get active thread 440*51c0b2f7Stbbdev bool try_get_active_thread(); 441*51c0b2f7Stbbdev 442*51c0b2f7Stbbdev //! Release active thread 443*51c0b2f7Stbbdev void release_active_thread(); 444*51c0b2f7Stbbdev 445*51c0b2f7Stbbdev //! Wait for thread to stop 446*51c0b2f7Stbbdev bool wait_stop_thread(); 447*51c0b2f7Stbbdev 448*51c0b2f7Stbbdev //! Add thread to stop list 449*51c0b2f7Stbbdev void add_stop_thread(); 450*51c0b2f7Stbbdev 451*51c0b2f7Stbbdev void remove_server_ref() { 452*51c0b2f7Stbbdev if( --my_ref_count==0 ) { 453*51c0b2f7Stbbdev my_client.acknowledge_close_connection(); 454*51c0b2f7Stbbdev this->~ipc_server(); 455*51c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 ); 456*51c0b2f7Stbbdev } 457*51c0b2f7Stbbdev } 458*51c0b2f7Stbbdev 459*51c0b2f7Stbbdev friend class ipc_worker; 460*51c0b2f7Stbbdev friend class ipc_waker; 461*51c0b2f7Stbbdev friend class ipc_stopper; 462*51c0b2f7Stbbdev public: 463*51c0b2f7Stbbdev ipc_server(tbb_client& client); 464*51c0b2f7Stbbdev virtual ~ipc_server(); 465*51c0b2f7Stbbdev 466*51c0b2f7Stbbdev version_type version() const override { 467*51c0b2f7Stbbdev return 0; 468*51c0b2f7Stbbdev } 469*51c0b2f7Stbbdev 470*51c0b2f7Stbbdev void request_close_connection(bool /*exiting*/) override { 471*51c0b2f7Stbbdev my_waker->start_shutdown(false); 472*51c0b2f7Stbbdev my_stopper->start_shutdown(false); 473*51c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i ) 474*51c0b2f7Stbbdev my_thread_array[i].start_shutdown( my_join_workers ); 475*51c0b2f7Stbbdev remove_server_ref(); 476*51c0b2f7Stbbdev } 477*51c0b2f7Stbbdev 478*51c0b2f7Stbbdev void yield() override {d0::yield();} 479*51c0b2f7Stbbdev 480*51c0b2f7Stbbdev void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); } 481*51c0b2f7Stbbdev 482*51c0b2f7Stbbdev unsigned default_concurrency() const override { return my_n_thread - 1; } 483*51c0b2f7Stbbdev 484*51c0b2f7Stbbdev void adjust_job_count_estimate(int delta) override; 485*51c0b2f7Stbbdev 486*51c0b2f7Stbbdev #if _WIN32||_WIN64 487*51c0b2f7Stbbdev void register_master(::rml::server::execution_resource_t&) override {} 488*51c0b2f7Stbbdev void unregister_master(::rml::server::execution_resource_t) override {} 489*51c0b2f7Stbbdev #endif /* _WIN32||_WIN64 */ 490*51c0b2f7Stbbdev }; 491*51c0b2f7Stbbdev 492*51c0b2f7Stbbdev //------------------------------------------------------------------------ 493*51c0b2f7Stbbdev // Methods of ipc_worker 494*51c0b2f7Stbbdev //------------------------------------------------------------------------ 495*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 496*51c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 497*51c0b2f7Stbbdev #pragma warning(push) 498*51c0b2f7Stbbdev #pragma warning(disable:4189) 499*51c0b2f7Stbbdev #endif 500*51c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 501*51c0b2f7Stbbdev // ensure that stack is properly aligned 502*51c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 503*51c0b2f7Stbbdev #endif 504*51c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) { 505*51c0b2f7Stbbdev ipc_worker* self = static_cast<ipc_worker*>(arg); 506*51c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 507*51c0b2f7Stbbdev self->run(); 508*51c0b2f7Stbbdev return 0; 509*51c0b2f7Stbbdev } 510*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 511*51c0b2f7Stbbdev #pragma warning(pop) 512*51c0b2f7Stbbdev #endif 513*51c0b2f7Stbbdev 514*51c0b2f7Stbbdev void ipc_worker::release_handle(thread_handle handle, bool join) { 515*51c0b2f7Stbbdev if( join ) 516*51c0b2f7Stbbdev ipc_thread_monitor::join( handle ); 517*51c0b2f7Stbbdev else 518*51c0b2f7Stbbdev ipc_thread_monitor::detach_thread( handle ); 519*51c0b2f7Stbbdev } 520*51c0b2f7Stbbdev 521*51c0b2f7Stbbdev void ipc_worker::start_shutdown(bool join) { 522*51c0b2f7Stbbdev state_t s = my_state.load(std::memory_order_relaxed);; 523*51c0b2f7Stbbdev 524*51c0b2f7Stbbdev do { 525*51c0b2f7Stbbdev __TBB_ASSERT( s!=st_quit, nullptr ); 526*51c0b2f7Stbbdev } while( !my_state.compare_exchange_strong( s, st_quit ) ); 527*51c0b2f7Stbbdev if( s==st_normal || s==st_starting ) { 528*51c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread. 529*51c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack. 530*51c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack. 531*51c0b2f7Stbbdev my_thread_monitor.notify(); 532*51c0b2f7Stbbdev // Do not need release handle in st_init state, 533*51c0b2f7Stbbdev // because in this case the thread wasn't started yet. 534*51c0b2f7Stbbdev // For st_starting release is done at launch site. 535*51c0b2f7Stbbdev if( s==st_normal ) 536*51c0b2f7Stbbdev release_handle( my_handle, join ); 537*51c0b2f7Stbbdev } 538*51c0b2f7Stbbdev } 539*51c0b2f7Stbbdev 540*51c0b2f7Stbbdev void ipc_worker::start_stopping(bool join) { 541*51c0b2f7Stbbdev state_t s = my_state.load(std::memory_order_relaxed);; 542*51c0b2f7Stbbdev 543*51c0b2f7Stbbdev while( !my_state.compare_exchange_strong( s, st_quit ) ) {}; 544*51c0b2f7Stbbdev if( s==st_normal || s==st_starting ) { 545*51c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread. 546*51c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack. 547*51c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack. 548*51c0b2f7Stbbdev my_thread_monitor.notify(); 549*51c0b2f7Stbbdev // Do not need release handle in st_init state, 550*51c0b2f7Stbbdev // because in this case the thread wasn't started yet. 551*51c0b2f7Stbbdev // For st_starting release is done at launch site. 552*51c0b2f7Stbbdev if( s==st_normal ) 553*51c0b2f7Stbbdev release_handle( my_handle, join ); 554*51c0b2f7Stbbdev } 555*51c0b2f7Stbbdev } 556*51c0b2f7Stbbdev 557*51c0b2f7Stbbdev void ipc_worker::run() { 558*51c0b2f7Stbbdev my_server.propagate_chain_reaction(); 559*51c0b2f7Stbbdev 560*51c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 561*51c0b2f7Stbbdev // which would create race with the launching thread and 562*51c0b2f7Stbbdev // complications in handle management on Windows. 563*51c0b2f7Stbbdev 564*51c0b2f7Stbbdev ::rml::job& j = *my_client.create_one_job(); 565*51c0b2f7Stbbdev state_t state = my_state.load(std::memory_order_acquire); 566*51c0b2f7Stbbdev while( state!=st_quit && state!=st_stop ) { 567*51c0b2f7Stbbdev if( my_server.my_slack>=0 ) { 568*51c0b2f7Stbbdev my_client.process(j); 569*51c0b2f7Stbbdev } else { 570*51c0b2f7Stbbdev ipc_thread_monitor::cookie c; 571*51c0b2f7Stbbdev // Prepare to wait 572*51c0b2f7Stbbdev my_thread_monitor.prepare_wait(c); 573*51c0b2f7Stbbdev // Check/set the invariant for sleeping 574*51c0b2f7Stbbdev state = my_state.load(std::memory_order_acquire); 575*51c0b2f7Stbbdev if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) { 576*51c0b2f7Stbbdev if( my_server.my_n_thread > 1 ) my_server.release_active_thread(); 577*51c0b2f7Stbbdev my_thread_monitor.commit_wait(c); 578*51c0b2f7Stbbdev my_server.propagate_chain_reaction(); 579*51c0b2f7Stbbdev } else { 580*51c0b2f7Stbbdev // Invariant broken 581*51c0b2f7Stbbdev my_thread_monitor.cancel_wait(); 582*51c0b2f7Stbbdev } 583*51c0b2f7Stbbdev } 584*51c0b2f7Stbbdev state = my_state.load(std::memory_order_acquire); 585*51c0b2f7Stbbdev } 586*51c0b2f7Stbbdev my_client.cleanup(j); 587*51c0b2f7Stbbdev 588*51c0b2f7Stbbdev my_server.remove_server_ref(); 589*51c0b2f7Stbbdev } 590*51c0b2f7Stbbdev 591*51c0b2f7Stbbdev inline bool ipc_worker::wake_or_launch() { 592*51c0b2f7Stbbdev state_t excepted_stop = st_stop, expected_init = st_init; 593*51c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) || 594*51c0b2f7Stbbdev ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) { 595*51c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 596*51c0b2f7Stbbdev #if USE_WINTHREAD 597*51c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 598*51c0b2f7Stbbdev #elif USE_PTHREAD 599*51c0b2f7Stbbdev { 600*51c0b2f7Stbbdev affinity_helper fpa; 601*51c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 602*51c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 603*51c0b2f7Stbbdev if( my_handle == 0 ) { 604*51c0b2f7Stbbdev // Unable to create new thread for process 605*51c0b2f7Stbbdev // However, this is expected situation for the use cases of this coordination server 606*51c0b2f7Stbbdev state_t s = st_starting; 607*51c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_init ); 608*51c0b2f7Stbbdev if (st_starting != s) { 609*51c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 610*51c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 611*51c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 612*51c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 613*51c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 614*51c0b2f7Stbbdev } 615*51c0b2f7Stbbdev return false; 616*51c0b2f7Stbbdev } else { 617*51c0b2f7Stbbdev my_server.my_ref_count++; 618*51c0b2f7Stbbdev } 619*51c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 620*51c0b2f7Stbbdev } 621*51c0b2f7Stbbdev #endif /* USE_PTHREAD */ 622*51c0b2f7Stbbdev state_t s = st_starting; 623*51c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_normal ); 624*51c0b2f7Stbbdev if( st_starting!=s ) { 625*51c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 626*51c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 627*51c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 628*51c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 629*51c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 630*51c0b2f7Stbbdev } 631*51c0b2f7Stbbdev } 632*51c0b2f7Stbbdev else { 633*51c0b2f7Stbbdev my_thread_monitor.notify(); 634*51c0b2f7Stbbdev } 635*51c0b2f7Stbbdev 636*51c0b2f7Stbbdev return true; 637*51c0b2f7Stbbdev } 638*51c0b2f7Stbbdev 639*51c0b2f7Stbbdev //------------------------------------------------------------------------ 640*51c0b2f7Stbbdev // Methods of ipc_waker 641*51c0b2f7Stbbdev //------------------------------------------------------------------------ 642*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 643*51c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 644*51c0b2f7Stbbdev #pragma warning(push) 645*51c0b2f7Stbbdev #pragma warning(disable:4189) 646*51c0b2f7Stbbdev #endif 647*51c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 648*51c0b2f7Stbbdev // ensure that stack is properly aligned 649*51c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 650*51c0b2f7Stbbdev #endif 651*51c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) { 652*51c0b2f7Stbbdev ipc_waker* self = static_cast<ipc_waker*>(arg); 653*51c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 654*51c0b2f7Stbbdev self->run(); 655*51c0b2f7Stbbdev return 0; 656*51c0b2f7Stbbdev } 657*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 658*51c0b2f7Stbbdev #pragma warning(pop) 659*51c0b2f7Stbbdev #endif 660*51c0b2f7Stbbdev 661*51c0b2f7Stbbdev void ipc_waker::run() { 662*51c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 663*51c0b2f7Stbbdev // which would create race with the launching thread and 664*51c0b2f7Stbbdev // complications in handle management on Windows. 665*51c0b2f7Stbbdev 666*51c0b2f7Stbbdev while( my_state.load(std::memory_order_acquire)!=st_quit ) { 667*51c0b2f7Stbbdev bool have_to_sleep = false; 668*51c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 669*51c0b2f7Stbbdev if( my_server.wait_active_thread() ) { 670*51c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 671*51c0b2f7Stbbdev my_server.wake_some( 0, 1 ); 672*51c0b2f7Stbbdev } else { 673*51c0b2f7Stbbdev my_server.release_active_thread(); 674*51c0b2f7Stbbdev have_to_sleep = true; 675*51c0b2f7Stbbdev } 676*51c0b2f7Stbbdev } 677*51c0b2f7Stbbdev } else { 678*51c0b2f7Stbbdev have_to_sleep = true; 679*51c0b2f7Stbbdev } 680*51c0b2f7Stbbdev if( have_to_sleep ) { 681*51c0b2f7Stbbdev ipc_thread_monitor::cookie c; 682*51c0b2f7Stbbdev // Prepare to wait 683*51c0b2f7Stbbdev my_thread_monitor.prepare_wait(c); 684*51c0b2f7Stbbdev // Check/set the invariant for sleeping 685*51c0b2f7Stbbdev if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) { 686*51c0b2f7Stbbdev my_thread_monitor.commit_wait(c); 687*51c0b2f7Stbbdev } else { 688*51c0b2f7Stbbdev // Invariant broken 689*51c0b2f7Stbbdev my_thread_monitor.cancel_wait(); 690*51c0b2f7Stbbdev } 691*51c0b2f7Stbbdev } 692*51c0b2f7Stbbdev } 693*51c0b2f7Stbbdev 694*51c0b2f7Stbbdev my_server.remove_server_ref(); 695*51c0b2f7Stbbdev } 696*51c0b2f7Stbbdev 697*51c0b2f7Stbbdev inline bool ipc_waker::wake_or_launch() { 698*51c0b2f7Stbbdev state_t excepted = st_init; 699*51c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 700*51c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 701*51c0b2f7Stbbdev #if USE_WINTHREAD 702*51c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 703*51c0b2f7Stbbdev #elif USE_PTHREAD 704*51c0b2f7Stbbdev { 705*51c0b2f7Stbbdev affinity_helper fpa; 706*51c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 707*51c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 708*51c0b2f7Stbbdev if( my_handle == 0 ) { 709*51c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() ); 710*51c0b2f7Stbbdev state_t s = st_starting; 711*51c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init); 712*51c0b2f7Stbbdev if (st_starting != s) { 713*51c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 714*51c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 715*51c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 716*51c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 717*51c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 718*51c0b2f7Stbbdev } 719*51c0b2f7Stbbdev return false; 720*51c0b2f7Stbbdev } else { 721*51c0b2f7Stbbdev my_server.my_ref_count++; 722*51c0b2f7Stbbdev } 723*51c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 724*51c0b2f7Stbbdev } 725*51c0b2f7Stbbdev #endif /* USE_PTHREAD */ 726*51c0b2f7Stbbdev state_t s = st_starting; 727*51c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal); 728*51c0b2f7Stbbdev if( st_starting!=s ) { 729*51c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 730*51c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 731*51c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 732*51c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 733*51c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 734*51c0b2f7Stbbdev } 735*51c0b2f7Stbbdev } 736*51c0b2f7Stbbdev else { 737*51c0b2f7Stbbdev my_thread_monitor.notify(); 738*51c0b2f7Stbbdev } 739*51c0b2f7Stbbdev 740*51c0b2f7Stbbdev return true; 741*51c0b2f7Stbbdev } 742*51c0b2f7Stbbdev 743*51c0b2f7Stbbdev //------------------------------------------------------------------------ 744*51c0b2f7Stbbdev // Methods of ipc_stopper 745*51c0b2f7Stbbdev //------------------------------------------------------------------------ 746*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 747*51c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 748*51c0b2f7Stbbdev #pragma warning(push) 749*51c0b2f7Stbbdev #pragma warning(disable:4189) 750*51c0b2f7Stbbdev #endif 751*51c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 752*51c0b2f7Stbbdev // ensure that stack is properly aligned 753*51c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 754*51c0b2f7Stbbdev #endif 755*51c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) { 756*51c0b2f7Stbbdev ipc_stopper* self = static_cast<ipc_stopper*>(arg); 757*51c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 758*51c0b2f7Stbbdev self->run(); 759*51c0b2f7Stbbdev return 0; 760*51c0b2f7Stbbdev } 761*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 762*51c0b2f7Stbbdev #pragma warning(pop) 763*51c0b2f7Stbbdev #endif 764*51c0b2f7Stbbdev 765*51c0b2f7Stbbdev void ipc_stopper::run() { 766*51c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 767*51c0b2f7Stbbdev // which would create race with the launching thread and 768*51c0b2f7Stbbdev // complications in handle management on Windows. 769*51c0b2f7Stbbdev 770*51c0b2f7Stbbdev while( my_state.load(std::memory_order_acquire)!=st_quit ) { 771*51c0b2f7Stbbdev if( my_server.wait_stop_thread() ) { 772*51c0b2f7Stbbdev if( my_state.load(std::memory_order_acquire)!=st_quit ) { 773*51c0b2f7Stbbdev if( !my_server.stop_one() ) { 774*51c0b2f7Stbbdev my_server.add_stop_thread(); 775*51c0b2f7Stbbdev tbb::detail::r1::prolonged_pause(); 776*51c0b2f7Stbbdev } 777*51c0b2f7Stbbdev } 778*51c0b2f7Stbbdev } 779*51c0b2f7Stbbdev } 780*51c0b2f7Stbbdev 781*51c0b2f7Stbbdev my_server.remove_server_ref(); 782*51c0b2f7Stbbdev } 783*51c0b2f7Stbbdev 784*51c0b2f7Stbbdev inline bool ipc_stopper::wake_or_launch() { 785*51c0b2f7Stbbdev state_t excepted = st_init; 786*51c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 787*51c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 788*51c0b2f7Stbbdev #if USE_WINTHREAD 789*51c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 790*51c0b2f7Stbbdev #elif USE_PTHREAD 791*51c0b2f7Stbbdev { 792*51c0b2f7Stbbdev affinity_helper fpa; 793*51c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 794*51c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 795*51c0b2f7Stbbdev if( my_handle == 0 ) { 796*51c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() ); 797*51c0b2f7Stbbdev state_t s = st_starting; 798*51c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init); 799*51c0b2f7Stbbdev if (st_starting != s) { 800*51c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 801*51c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 802*51c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 803*51c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 804*51c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 805*51c0b2f7Stbbdev } 806*51c0b2f7Stbbdev return false; 807*51c0b2f7Stbbdev } else { 808*51c0b2f7Stbbdev my_server.my_ref_count++; 809*51c0b2f7Stbbdev } 810*51c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 811*51c0b2f7Stbbdev } 812*51c0b2f7Stbbdev #endif /* USE_PTHREAD */ 813*51c0b2f7Stbbdev state_t s = st_starting; 814*51c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal); 815*51c0b2f7Stbbdev if( st_starting!=s ) { 816*51c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 817*51c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 818*51c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 819*51c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 820*51c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 821*51c0b2f7Stbbdev } 822*51c0b2f7Stbbdev } 823*51c0b2f7Stbbdev else { 824*51c0b2f7Stbbdev my_thread_monitor.notify(); 825*51c0b2f7Stbbdev } 826*51c0b2f7Stbbdev 827*51c0b2f7Stbbdev return true; 828*51c0b2f7Stbbdev } 829*51c0b2f7Stbbdev 830*51c0b2f7Stbbdev //------------------------------------------------------------------------ 831*51c0b2f7Stbbdev // Methods of ipc_server 832*51c0b2f7Stbbdev //------------------------------------------------------------------------ 833*51c0b2f7Stbbdev ipc_server::ipc_server(tbb_client& client) : 834*51c0b2f7Stbbdev my_client( client ), 835*51c0b2f7Stbbdev my_stack_size( client.min_stack_size() ), 836*51c0b2f7Stbbdev my_thread_array(nullptr), 837*51c0b2f7Stbbdev my_join_workers(false), 838*51c0b2f7Stbbdev my_waker(nullptr), 839*51c0b2f7Stbbdev my_stopper(nullptr) 840*51c0b2f7Stbbdev { 841*51c0b2f7Stbbdev my_ref_count = 1; 842*51c0b2f7Stbbdev my_slack = 0; 843*51c0b2f7Stbbdev #if TBB_USE_ASSERT 844*51c0b2f7Stbbdev my_net_slack_requests = 0; 845*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 846*51c0b2f7Stbbdev my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME); 847*51c0b2f7Stbbdev if( my_n_thread==0 ) { 848*51c0b2f7Stbbdev my_n_thread = tbb::detail::r1::AvailableHwConcurrency(); 849*51c0b2f7Stbbdev __TBB_ASSERT( my_n_thread>0, nullptr ); 850*51c0b2f7Stbbdev } 851*51c0b2f7Stbbdev 852*51c0b2f7Stbbdev my_asleep_list_root = nullptr; 853*51c0b2f7Stbbdev my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread ); 854*51c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i ) { 855*51c0b2f7Stbbdev ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i ); 856*51c0b2f7Stbbdev t->my_next = my_asleep_list_root; 857*51c0b2f7Stbbdev my_asleep_list_root = t; 858*51c0b2f7Stbbdev } 859*51c0b2f7Stbbdev 860*51c0b2f7Stbbdev my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1); 861*51c0b2f7Stbbdev new( my_waker ) ipc_waker( *this, client, my_n_thread ); 862*51c0b2f7Stbbdev 863*51c0b2f7Stbbdev my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1); 864*51c0b2f7Stbbdev new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 ); 865*51c0b2f7Stbbdev 866*51c0b2f7Stbbdev char* active_sem_name = get_active_sem_name(); 867*51c0b2f7Stbbdev my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 ); 868*51c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 869*51c0b2f7Stbbdev delete[] active_sem_name; 870*51c0b2f7Stbbdev 871*51c0b2f7Stbbdev char* stop_sem_name = get_stop_sem_name(); 872*51c0b2f7Stbbdev my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 ); 873*51c0b2f7Stbbdev __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" ); 874*51c0b2f7Stbbdev delete[] stop_sem_name; 875*51c0b2f7Stbbdev } 876*51c0b2f7Stbbdev 877*51c0b2f7Stbbdev ipc_server::~ipc_server() { 878*51c0b2f7Stbbdev __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr ); 879*51c0b2f7Stbbdev 880*51c0b2f7Stbbdev for( size_t i=my_n_thread; i--; ) 881*51c0b2f7Stbbdev my_thread_array[i].~padded_ipc_worker(); 882*51c0b2f7Stbbdev tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread ); 883*51c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_thread_array ); 884*51c0b2f7Stbbdev 885*51c0b2f7Stbbdev my_waker->~ipc_waker(); 886*51c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 ); 887*51c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_waker ); 888*51c0b2f7Stbbdev 889*51c0b2f7Stbbdev my_stopper->~ipc_stopper(); 890*51c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 ); 891*51c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_stopper ); 892*51c0b2f7Stbbdev 893*51c0b2f7Stbbdev sem_close( my_active_sem ); 894*51c0b2f7Stbbdev sem_close( my_stop_sem ); 895*51c0b2f7Stbbdev } 896*51c0b2f7Stbbdev 897*51c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) { 898*51c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock; 899*51c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) ) 900*51c0b2f7Stbbdev return false; 901*51c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack, 902*51c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up. 903*51c0b2f7Stbbdev int k = ++my_slack; 904*51c0b2f7Stbbdev if( k<=0 ) { 905*51c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 906*51c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed); 907*51c0b2f7Stbbdev return true; 908*51c0b2f7Stbbdev } else { 909*51c0b2f7Stbbdev --my_slack; 910*51c0b2f7Stbbdev return false; 911*51c0b2f7Stbbdev } 912*51c0b2f7Stbbdev } 913*51c0b2f7Stbbdev 914*51c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) { 915*51c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock; 916*51c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) ) 917*51c0b2f7Stbbdev return false; 918*51c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack, 919*51c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up. 920*51c0b2f7Stbbdev ++my_slack; 921*51c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 922*51c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed); 923*51c0b2f7Stbbdev return true; 924*51c0b2f7Stbbdev } 925*51c0b2f7Stbbdev 926*51c0b2f7Stbbdev inline bool ipc_server::wait_active_thread() { 927*51c0b2f7Stbbdev if( sem_wait( my_active_sem ) == 0 ) { 928*51c0b2f7Stbbdev ++my_global_thread_count; 929*51c0b2f7Stbbdev return true; 930*51c0b2f7Stbbdev } 931*51c0b2f7Stbbdev return false; 932*51c0b2f7Stbbdev } 933*51c0b2f7Stbbdev 934*51c0b2f7Stbbdev inline bool ipc_server::try_get_active_thread() { 935*51c0b2f7Stbbdev if( sem_trywait( my_active_sem ) == 0 ) { 936*51c0b2f7Stbbdev ++my_global_thread_count; 937*51c0b2f7Stbbdev return true; 938*51c0b2f7Stbbdev } 939*51c0b2f7Stbbdev return false; 940*51c0b2f7Stbbdev } 941*51c0b2f7Stbbdev 942*51c0b2f7Stbbdev inline void ipc_server::release_active_thread() { 943*51c0b2f7Stbbdev release_thread_sem( my_active_sem ); 944*51c0b2f7Stbbdev } 945*51c0b2f7Stbbdev 946*51c0b2f7Stbbdev inline bool ipc_server::wait_stop_thread() { 947*51c0b2f7Stbbdev struct timespec ts; 948*51c0b2f7Stbbdev if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) { 949*51c0b2f7Stbbdev ts.tv_sec++; 950*51c0b2f7Stbbdev if( sem_timedwait( my_stop_sem, &ts )==0 ) { 951*51c0b2f7Stbbdev return true; 952*51c0b2f7Stbbdev } 953*51c0b2f7Stbbdev } 954*51c0b2f7Stbbdev return false; 955*51c0b2f7Stbbdev } 956*51c0b2f7Stbbdev 957*51c0b2f7Stbbdev inline void ipc_server::add_stop_thread() { 958*51c0b2f7Stbbdev sem_post( my_stop_sem ); 959*51c0b2f7Stbbdev } 960*51c0b2f7Stbbdev 961*51c0b2f7Stbbdev void ipc_server::wake_some( int additional_slack, int active_threads ) { 962*51c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr ); 963*51c0b2f7Stbbdev ipc_worker* wakee[2]; 964*51c0b2f7Stbbdev ipc_worker **w = wakee; 965*51c0b2f7Stbbdev { 966*51c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 967*51c0b2f7Stbbdev while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) { 968*51c0b2f7Stbbdev if( additional_slack>0 ) { 969*51c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 970*51c0b2f7Stbbdev break; 971*51c0b2f7Stbbdev --additional_slack; 972*51c0b2f7Stbbdev } else { 973*51c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack 974*51c0b2f7Stbbdev int old; 975*51c0b2f7Stbbdev do { 976*51c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed); 977*51c0b2f7Stbbdev if( old<=0 ) goto done; 978*51c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 979*51c0b2f7Stbbdev } 980*51c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack 981*51c0b2f7Stbbdev my_asleep_list_root.store( 982*51c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 983*51c0b2f7Stbbdev std::memory_order_relaxed 984*51c0b2f7Stbbdev ); 985*51c0b2f7Stbbdev --active_threads; 986*51c0b2f7Stbbdev } 987*51c0b2f7Stbbdev if( additional_slack ) { 988*51c0b2f7Stbbdev // Contribute our unused slack to my_slack. 989*51c0b2f7Stbbdev my_slack += additional_slack; 990*51c0b2f7Stbbdev } 991*51c0b2f7Stbbdev } 992*51c0b2f7Stbbdev done: 993*51c0b2f7Stbbdev while( w>wakee ) { 994*51c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) { 995*51c0b2f7Stbbdev add_stop_thread(); 996*51c0b2f7Stbbdev do { 997*51c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) ); 998*51c0b2f7Stbbdev release_active_thread(); 999*51c0b2f7Stbbdev } 1000*51c0b2f7Stbbdev } 1001*51c0b2f7Stbbdev while( active_threads ) { 1002*51c0b2f7Stbbdev release_active_thread(); 1003*51c0b2f7Stbbdev --active_threads; 1004*51c0b2f7Stbbdev } 1005*51c0b2f7Stbbdev } 1006*51c0b2f7Stbbdev 1007*51c0b2f7Stbbdev void ipc_server::wake_one_forced( int additional_slack ) { 1008*51c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr ); 1009*51c0b2f7Stbbdev ipc_worker* wakee[1]; 1010*51c0b2f7Stbbdev ipc_worker **w = wakee; 1011*51c0b2f7Stbbdev { 1012*51c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 1013*51c0b2f7Stbbdev while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) { 1014*51c0b2f7Stbbdev if( additional_slack>0 ) { 1015*51c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 1016*51c0b2f7Stbbdev break; 1017*51c0b2f7Stbbdev --additional_slack; 1018*51c0b2f7Stbbdev } else { 1019*51c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack 1020*51c0b2f7Stbbdev int old; 1021*51c0b2f7Stbbdev do { 1022*51c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed); 1023*51c0b2f7Stbbdev if( old<=0 ) goto done; 1024*51c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 1025*51c0b2f7Stbbdev } 1026*51c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack 1027*51c0b2f7Stbbdev my_asleep_list_root.store( 1028*51c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 1029*51c0b2f7Stbbdev std::memory_order_relaxed); 1030*51c0b2f7Stbbdev } 1031*51c0b2f7Stbbdev if( additional_slack ) { 1032*51c0b2f7Stbbdev // Contribute our unused slack to my_slack. 1033*51c0b2f7Stbbdev my_slack += additional_slack; 1034*51c0b2f7Stbbdev } 1035*51c0b2f7Stbbdev } 1036*51c0b2f7Stbbdev done: 1037*51c0b2f7Stbbdev while( w>wakee ) { 1038*51c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) { 1039*51c0b2f7Stbbdev add_stop_thread(); 1040*51c0b2f7Stbbdev do { 1041*51c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) ); 1042*51c0b2f7Stbbdev } 1043*51c0b2f7Stbbdev } 1044*51c0b2f7Stbbdev } 1045*51c0b2f7Stbbdev 1046*51c0b2f7Stbbdev bool ipc_server::stop_one() { 1047*51c0b2f7Stbbdev ipc_worker* current = nullptr; 1048*51c0b2f7Stbbdev ipc_worker* next = nullptr; 1049*51c0b2f7Stbbdev { 1050*51c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 1051*51c0b2f7Stbbdev if( my_asleep_list_root.load(std::memory_order_relaxed) ) { 1052*51c0b2f7Stbbdev current = my_asleep_list_root.load(std::memory_order_relaxed); 1053*51c0b2f7Stbbdev if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 1054*51c0b2f7Stbbdev next = current->my_next; 1055*51c0b2f7Stbbdev while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 1056*51c0b2f7Stbbdev current = next; 1057*51c0b2f7Stbbdev next = current->my_next; 1058*51c0b2f7Stbbdev } 1059*51c0b2f7Stbbdev current->start_stopping( my_join_workers ); 1060*51c0b2f7Stbbdev return true; 1061*51c0b2f7Stbbdev } 1062*51c0b2f7Stbbdev } 1063*51c0b2f7Stbbdev } 1064*51c0b2f7Stbbdev return false; 1065*51c0b2f7Stbbdev } 1066*51c0b2f7Stbbdev 1067*51c0b2f7Stbbdev void ipc_server::adjust_job_count_estimate( int delta ) { 1068*51c0b2f7Stbbdev #if TBB_USE_ASSERT 1069*51c0b2f7Stbbdev my_net_slack_requests+=delta; 1070*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 1071*51c0b2f7Stbbdev if( my_n_thread > 1 ) { 1072*51c0b2f7Stbbdev if( delta<0 ) { 1073*51c0b2f7Stbbdev my_slack+=delta; 1074*51c0b2f7Stbbdev } else if( delta>0 ) { 1075*51c0b2f7Stbbdev int active_threads = 0; 1076*51c0b2f7Stbbdev if( try_get_active_thread() ) { 1077*51c0b2f7Stbbdev ++active_threads; 1078*51c0b2f7Stbbdev if( try_get_active_thread() ) { 1079*51c0b2f7Stbbdev ++active_threads; 1080*51c0b2f7Stbbdev } 1081*51c0b2f7Stbbdev } 1082*51c0b2f7Stbbdev wake_some( delta, active_threads ); 1083*51c0b2f7Stbbdev 1084*51c0b2f7Stbbdev if( !my_waker->wake_or_launch() ) { 1085*51c0b2f7Stbbdev add_stop_thread(); 1086*51c0b2f7Stbbdev } 1087*51c0b2f7Stbbdev if( !my_stopper->wake_or_launch() ) { 1088*51c0b2f7Stbbdev add_stop_thread(); 1089*51c0b2f7Stbbdev } 1090*51c0b2f7Stbbdev } 1091*51c0b2f7Stbbdev } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one 1092*51c0b2f7Stbbdev if( delta<0 ) { 1093*51c0b2f7Stbbdev my_slack += delta; 1094*51c0b2f7Stbbdev } else { 1095*51c0b2f7Stbbdev wake_one_forced( delta ); 1096*51c0b2f7Stbbdev } 1097*51c0b2f7Stbbdev } 1098*51c0b2f7Stbbdev } 1099*51c0b2f7Stbbdev 1100*51c0b2f7Stbbdev //------------------------------------------------------------------------ 1101*51c0b2f7Stbbdev // RML factory methods 1102*51c0b2f7Stbbdev //------------------------------------------------------------------------ 1103*51c0b2f7Stbbdev 1104*51c0b2f7Stbbdev #if USE_PTHREAD 1105*51c0b2f7Stbbdev 1106*51c0b2f7Stbbdev static tbb_client* my_global_client = nullptr; 1107*51c0b2f7Stbbdev static tbb_server* my_global_server = nullptr; 1108*51c0b2f7Stbbdev 1109*51c0b2f7Stbbdev void rml_atexit() { 1110*51c0b2f7Stbbdev release_resources(); 1111*51c0b2f7Stbbdev } 1112*51c0b2f7Stbbdev 1113*51c0b2f7Stbbdev void rml_atfork_child() { 1114*51c0b2f7Stbbdev if( my_global_server!=nullptr && my_global_client!=nullptr ) { 1115*51c0b2f7Stbbdev ipc_server* server = static_cast<ipc_server*>( my_global_server ); 1116*51c0b2f7Stbbdev server->~ipc_server(); 1117*51c0b2f7Stbbdev // memset( server, 0, sizeof(ipc_server) ); 1118*51c0b2f7Stbbdev new( server ) ipc_server( *my_global_client ); 1119*51c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child ); 1120*51c0b2f7Stbbdev atexit( rml_atexit ); 1121*51c0b2f7Stbbdev } 1122*51c0b2f7Stbbdev } 1123*51c0b2f7Stbbdev 1124*51c0b2f7Stbbdev #endif /* USE_PTHREAD */ 1125*51c0b2f7Stbbdev 1126*51c0b2f7Stbbdev extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) { 1127*51c0b2f7Stbbdev server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client); 1128*51c0b2f7Stbbdev #if USE_PTHREAD 1129*51c0b2f7Stbbdev my_global_client = &client; 1130*51c0b2f7Stbbdev my_global_server = server; 1131*51c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child ); 1132*51c0b2f7Stbbdev atexit( rml_atexit ); 1133*51c0b2f7Stbbdev #endif /* USE_PTHREAD */ 1134*51c0b2f7Stbbdev if( getenv( "RML_DEBUG" ) ) { 1135*51c0b2f7Stbbdev runtime_warning("IPC server is started"); 1136*51c0b2f7Stbbdev } 1137*51c0b2f7Stbbdev return tbb_factory::st_success; 1138*51c0b2f7Stbbdev } 1139*51c0b2f7Stbbdev 1140*51c0b2f7Stbbdev extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) { 1141*51c0b2f7Stbbdev } 1142*51c0b2f7Stbbdev 1143*51c0b2f7Stbbdev } // namespace rml 1144*51c0b2f7Stbbdev } // namespace detail 1145*51c0b2f7Stbbdev 1146*51c0b2f7Stbbdev } // namespace tbb 1147