151c0b2f7Stbbdev /* 2*b15aabb3Stbbdev Copyright (c) 2017-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev #include <atomic> 1851c0b2f7Stbbdev #include <cstring> 1951c0b2f7Stbbdev #include <cstdlib> 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev #include "../../src/tbb/rml_tbb.h" 2251c0b2f7Stbbdev #include "../../src/tbb/rml_thread_monitor.h" 2351c0b2f7Stbbdev #include "../../src/tbb/scheduler_common.h" 2451c0b2f7Stbbdev #include "../../src/tbb/governor.h" 2551c0b2f7Stbbdev #include "../../src/tbb/misc.h" 2651c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h" 2751c0b2f7Stbbdev 2851c0b2f7Stbbdev #include "ipc_utils.h" 2951c0b2f7Stbbdev 3051c0b2f7Stbbdev #include <fcntl.h> 3151c0b2f7Stbbdev #include <stdlib.h> 3251c0b2f7Stbbdev 3351c0b2f7Stbbdev namespace rml { 3451c0b2f7Stbbdev namespace internal { 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE"; 3751c0b2f7Stbbdev 3851c0b2f7Stbbdev typedef versioned_object::version_type version_type; 3951c0b2f7Stbbdev 4051c0b2f7Stbbdev extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) { 4151c0b2f7Stbbdev if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) { 4251c0b2f7Stbbdev return factory::st_incompatible; 4351c0b2f7Stbbdev } 4451c0b2f7Stbbdev 4551c0b2f7Stbbdev // Hack to keep this library from being closed 4651c0b2f7Stbbdev static std::atomic<bool> one_time_flag{false}; 4751c0b2f7Stbbdev bool expected = false; 4851c0b2f7Stbbdev 4951c0b2f7Stbbdev if( one_time_flag.compare_exchange_strong(expected, true) ) { 5051c0b2f7Stbbdev __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr ); 5151c0b2f7Stbbdev #if _WIN32||_WIN64 5251c0b2f7Stbbdev f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload); 5351c0b2f7Stbbdev #else 5451c0b2f7Stbbdev f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload); 5551c0b2f7Stbbdev #endif 5651c0b2f7Stbbdev } 5751c0b2f7Stbbdev // End of hack 5851c0b2f7Stbbdev 5951c0b2f7Stbbdev return factory::st_success; 6051c0b2f7Stbbdev } 6151c0b2f7Stbbdev 6251c0b2f7Stbbdev extern "C" void __RML_close_factory(factory& /*f*/) { 6351c0b2f7Stbbdev } 6451c0b2f7Stbbdev 6551c0b2f7Stbbdev class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor { 6651c0b2f7Stbbdev public: 6751c0b2f7Stbbdev ipc_thread_monitor() : thread_monitor() {} 6851c0b2f7Stbbdev 6951c0b2f7Stbbdev #if USE_WINTHREAD 7051c0b2f7Stbbdev #elif USE_PTHREAD 7151c0b2f7Stbbdev static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size); 7251c0b2f7Stbbdev #endif 7351c0b2f7Stbbdev }; 7451c0b2f7Stbbdev 7551c0b2f7Stbbdev #if USE_WINTHREAD 7651c0b2f7Stbbdev #elif USE_PTHREAD 7751c0b2f7Stbbdev inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) { 7851c0b2f7Stbbdev pthread_attr_t s; 7951c0b2f7Stbbdev if( pthread_attr_init( &s ) ) return 0; 8051c0b2f7Stbbdev if( stack_size>0 ) { 8151c0b2f7Stbbdev if( pthread_attr_setstacksize( &s, stack_size ) ) return 0; 8251c0b2f7Stbbdev } 8351c0b2f7Stbbdev pthread_t handle; 8451c0b2f7Stbbdev if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0; 8551c0b2f7Stbbdev if( pthread_attr_destroy( &s ) ) return 0; 8651c0b2f7Stbbdev return handle; 8751c0b2f7Stbbdev } 8851c0b2f7Stbbdev #endif 8951c0b2f7Stbbdev 9051c0b2f7Stbbdev }} // rml::internal 9151c0b2f7Stbbdev 9251c0b2f7Stbbdev using rml::internal::ipc_thread_monitor; 9351c0b2f7Stbbdev using tbb::internal::rml::get_shared_name; 9451c0b2f7Stbbdev 9551c0b2f7Stbbdev namespace tbb { 9651c0b2f7Stbbdev namespace detail { 9751c0b2f7Stbbdev 9851c0b2f7Stbbdev namespace r1 { 9951c0b2f7Stbbdev bool terminate_on_exception() { 10051c0b2f7Stbbdev return false; 10151c0b2f7Stbbdev } 10251c0b2f7Stbbdev } 10351c0b2f7Stbbdev 10451c0b2f7Stbbdev namespace rml { 10551c0b2f7Stbbdev 10651c0b2f7Stbbdev typedef ipc_thread_monitor::handle_type thread_handle; 10751c0b2f7Stbbdev 10851c0b2f7Stbbdev class ipc_server; 10951c0b2f7Stbbdev 11051c0b2f7Stbbdev static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS"; 11151c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active"; 11251c0b2f7Stbbdev static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop"; 11351c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE"; 11451c0b2f7Stbbdev static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE"; 11551c0b2f7Stbbdev static const mode_t IPC_SEM_MODE = 0660; 11651c0b2f7Stbbdev 11751c0b2f7Stbbdev static std::atomic<int> my_global_thread_count; 11851c0b2f7Stbbdev using tbb_client = tbb::detail::r1::rml::tbb_client; 11951c0b2f7Stbbdev using tbb_server = tbb::detail::r1::rml::tbb_server; 12051c0b2f7Stbbdev using tbb_factory = tbb::detail::r1::rml::tbb_factory; 12151c0b2f7Stbbdev 12251c0b2f7Stbbdev using tbb::detail::r1::runtime_warning; 12351c0b2f7Stbbdev 12451c0b2f7Stbbdev char* get_sem_name(const char* name, const char* prefix) { 12551c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr); 12651c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr); 12751c0b2f7Stbbdev char* value = std::getenv(name); 12851c0b2f7Stbbdev std::size_t len = value == nullptr ? 0 : std::strlen(value); 12951c0b2f7Stbbdev if (len > 0) { 13051c0b2f7Stbbdev // TODO: consider returning the original string instead of the copied string. 13151c0b2f7Stbbdev char* sem_name = new char[len + 1]; 13251c0b2f7Stbbdev __TBB_ASSERT(sem_name != nullptr, nullptr); 13351c0b2f7Stbbdev std::strncpy(sem_name, value, len+1); 13451c0b2f7Stbbdev __TBB_ASSERT(sem_name[len] == 0, nullptr); 13551c0b2f7Stbbdev return sem_name; 13651c0b2f7Stbbdev } else { 13751c0b2f7Stbbdev return get_shared_name(prefix); 13851c0b2f7Stbbdev } 13951c0b2f7Stbbdev } 14051c0b2f7Stbbdev 14151c0b2f7Stbbdev char* get_active_sem_name() { 14251c0b2f7Stbbdev return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 14351c0b2f7Stbbdev } 14451c0b2f7Stbbdev 14551c0b2f7Stbbdev char* get_stop_sem_name() { 14651c0b2f7Stbbdev return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 14751c0b2f7Stbbdev } 14851c0b2f7Stbbdev 14951c0b2f7Stbbdev static void release_thread_sem(sem_t* my_sem) { 15051c0b2f7Stbbdev int old = my_global_thread_count.load(std::memory_order_relaxed); 15151c0b2f7Stbbdev do { 15251c0b2f7Stbbdev if( old<=0 ) return; 15351c0b2f7Stbbdev } while( !my_global_thread_count.compare_exchange_strong(old, old-1) ); 15451c0b2f7Stbbdev if( old>0 ) { 15551c0b2f7Stbbdev sem_post( my_sem ); 15651c0b2f7Stbbdev } 15751c0b2f7Stbbdev } 15851c0b2f7Stbbdev 15951c0b2f7Stbbdev void set_sem_name(const char* name, const char* prefix) { 16051c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr); 16151c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr); 16251c0b2f7Stbbdev const char* postfix = "_XXXXXX"; 16351c0b2f7Stbbdev std::size_t plen = std::strlen(prefix); 16451c0b2f7Stbbdev std::size_t xlen = std::strlen(postfix); 16551c0b2f7Stbbdev char* templ = new char[plen + xlen + 1]; 16651c0b2f7Stbbdev __TBB_ASSERT(templ != nullptr, nullptr); 16751c0b2f7Stbbdev strncpy(templ, prefix, plen+1); 16851c0b2f7Stbbdev __TBB_ASSERT(templ[plen] == 0, nullptr); 16951c0b2f7Stbbdev strncat(templ, postfix, xlen + 1); 17051c0b2f7Stbbdev __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr); 17151c0b2f7Stbbdev // TODO: consider using mkstemp instead of mktemp. 17251c0b2f7Stbbdev char* sem_name = mktemp(templ); 17351c0b2f7Stbbdev if (sem_name != nullptr) { 17451c0b2f7Stbbdev int status = setenv(name, sem_name, /*overwrite*/ 1); 17551c0b2f7Stbbdev __TBB_ASSERT_EX(status == 0, nullptr); 17651c0b2f7Stbbdev } 17751c0b2f7Stbbdev delete[] templ; 17851c0b2f7Stbbdev } 17951c0b2f7Stbbdev 18051c0b2f7Stbbdev extern "C" void set_active_sem_name() { 18151c0b2f7Stbbdev set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 18251c0b2f7Stbbdev } 18351c0b2f7Stbbdev 18451c0b2f7Stbbdev extern "C" void set_stop_sem_name() { 18551c0b2f7Stbbdev set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 18651c0b2f7Stbbdev } 18751c0b2f7Stbbdev 18851c0b2f7Stbbdev extern "C" void release_resources() { 18951c0b2f7Stbbdev if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) { 19051c0b2f7Stbbdev char* active_sem_name = get_active_sem_name(); 19151c0b2f7Stbbdev sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT ); 19251c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 19351c0b2f7Stbbdev delete[] active_sem_name; 19451c0b2f7Stbbdev 19551c0b2f7Stbbdev do { 19651c0b2f7Stbbdev release_thread_sem( my_active_sem ); 19751c0b2f7Stbbdev } while( my_global_thread_count.load(std::memory_order_acquire)!=0 ); 19851c0b2f7Stbbdev } 19951c0b2f7Stbbdev } 20051c0b2f7Stbbdev 20151c0b2f7Stbbdev extern "C" void release_semaphores() { 20251c0b2f7Stbbdev int status = 0; 20351c0b2f7Stbbdev char* sem_name = nullptr; 20451c0b2f7Stbbdev 20551c0b2f7Stbbdev sem_name = get_active_sem_name(); 20651c0b2f7Stbbdev if( sem_name==nullptr ) { 20751c0b2f7Stbbdev runtime_warning("Can not get RML semaphore name"); 20851c0b2f7Stbbdev return; 20951c0b2f7Stbbdev } 21051c0b2f7Stbbdev status = sem_unlink( sem_name ); 21151c0b2f7Stbbdev if( status!=0 ) { 21251c0b2f7Stbbdev if( errno==ENOENT ) { 21351c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */ 21451c0b2f7Stbbdev } else { 21551c0b2f7Stbbdev runtime_warning("Can not release RML semaphore"); 21651c0b2f7Stbbdev return; 21751c0b2f7Stbbdev } 21851c0b2f7Stbbdev } 21951c0b2f7Stbbdev delete[] sem_name; 22051c0b2f7Stbbdev 22151c0b2f7Stbbdev sem_name = get_stop_sem_name(); 22251c0b2f7Stbbdev if( sem_name==nullptr ) { 22351c0b2f7Stbbdev runtime_warning( "Can not get RML semaphore name" ); 22451c0b2f7Stbbdev return; 22551c0b2f7Stbbdev } 22651c0b2f7Stbbdev status = sem_unlink( sem_name ); 22751c0b2f7Stbbdev if( status!=0 ) { 22851c0b2f7Stbbdev if( errno==ENOENT ) { 22951c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */ 23051c0b2f7Stbbdev } else { 23151c0b2f7Stbbdev runtime_warning("Can not release RML semaphore"); 23251c0b2f7Stbbdev return; 23351c0b2f7Stbbdev } 23451c0b2f7Stbbdev } 23551c0b2f7Stbbdev delete[] sem_name; 23651c0b2f7Stbbdev } 23751c0b2f7Stbbdev 23851c0b2f7Stbbdev class ipc_worker: no_copy { 23951c0b2f7Stbbdev protected: 24051c0b2f7Stbbdev //! State in finite-state machine that controls the worker. 24151c0b2f7Stbbdev /** State diagram: 24251c0b2f7Stbbdev /----------stop---\ 24351c0b2f7Stbbdev | ^ | 24451c0b2f7Stbbdev V | | 24551c0b2f7Stbbdev init --> starting --> normal | 24651c0b2f7Stbbdev | | | | 24751c0b2f7Stbbdev | V | | 24851c0b2f7Stbbdev \------> quit <-------/<----/ 24951c0b2f7Stbbdev */ 25051c0b2f7Stbbdev enum state_t { 25151c0b2f7Stbbdev //! *this is initialized 25251c0b2f7Stbbdev st_init, 25351c0b2f7Stbbdev //! *this has associated thread that is starting up. 25451c0b2f7Stbbdev st_starting, 25551c0b2f7Stbbdev //! Associated thread is doing normal life sequence. 25651c0b2f7Stbbdev st_normal, 25751c0b2f7Stbbdev //! Associated thread is stopped but can be started again. 25851c0b2f7Stbbdev st_stop, 25951c0b2f7Stbbdev //! Associated thread has ended normal life sequence and promises to never touch *this again. 26051c0b2f7Stbbdev st_quit 26151c0b2f7Stbbdev }; 26251c0b2f7Stbbdev std::atomic<state_t> my_state; 26351c0b2f7Stbbdev 26451c0b2f7Stbbdev //! Associated server 26551c0b2f7Stbbdev ipc_server& my_server; 26651c0b2f7Stbbdev 26751c0b2f7Stbbdev //! Associated client 26851c0b2f7Stbbdev tbb_client& my_client; 26951c0b2f7Stbbdev 27051c0b2f7Stbbdev //! index used for avoiding the 64K aliasing problem 27151c0b2f7Stbbdev const size_t my_index; 27251c0b2f7Stbbdev 27351c0b2f7Stbbdev //! Monitor for sleeping when there is no work to do. 27451c0b2f7Stbbdev /** The invariant that holds for sleeping workers is: 27551c0b2f7Stbbdev "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */ 27651c0b2f7Stbbdev ipc_thread_monitor my_thread_monitor; 27751c0b2f7Stbbdev 27851c0b2f7Stbbdev //! Handle of the OS thread associated with this worker 27951c0b2f7Stbbdev thread_handle my_handle; 28051c0b2f7Stbbdev 28151c0b2f7Stbbdev //! Link for list of workers that are sleeping or have no associated thread. 28251c0b2f7Stbbdev ipc_worker* my_next; 28351c0b2f7Stbbdev 28451c0b2f7Stbbdev friend class ipc_server; 28551c0b2f7Stbbdev 28651c0b2f7Stbbdev //! Actions executed by the associated thread 28751c0b2f7Stbbdev void run(); 28851c0b2f7Stbbdev 28951c0b2f7Stbbdev //! Wake up associated thread (or launch a thread if there is none) 29051c0b2f7Stbbdev bool wake_or_launch(); 29151c0b2f7Stbbdev 29251c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence termination. 29351c0b2f7Stbbdev void start_shutdown(bool join); 29451c0b2f7Stbbdev 29551c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence stopping. 29651c0b2f7Stbbdev void start_stopping(bool join); 29751c0b2f7Stbbdev 29851c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 29951c0b2f7Stbbdev 30051c0b2f7Stbbdev static void release_handle(thread_handle my_handle, bool join); 30151c0b2f7Stbbdev 30251c0b2f7Stbbdev protected: 30351c0b2f7Stbbdev ipc_worker(ipc_server& server, tbb_client& client, const size_t i) : 30451c0b2f7Stbbdev my_server(server), 30551c0b2f7Stbbdev my_client(client), 30651c0b2f7Stbbdev my_index(i) 30751c0b2f7Stbbdev { 30851c0b2f7Stbbdev my_state = st_init; 30951c0b2f7Stbbdev } 31051c0b2f7Stbbdev }; 31151c0b2f7Stbbdev 31251c0b2f7Stbbdev //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit 31351c0b2f7Stbbdev constexpr static size_t cache_line_sz = 128; 31451c0b2f7Stbbdev 31551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 31651c0b2f7Stbbdev // Suppress overzealous compiler warnings about uninstantiable class 31751c0b2f7Stbbdev #pragma warning(push) 31851c0b2f7Stbbdev #pragma warning(disable:4510 4610) 31951c0b2f7Stbbdev #endif 32051c0b2f7Stbbdev class padded_ipc_worker: public ipc_worker { 32151c0b2f7Stbbdev char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz]; 32251c0b2f7Stbbdev public: 32351c0b2f7Stbbdev padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i) 32451c0b2f7Stbbdev : ipc_worker( server,client,i ) { suppress_unused_warning(pad); } 32551c0b2f7Stbbdev }; 32651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 32751c0b2f7Stbbdev #pragma warning(pop) 32851c0b2f7Stbbdev #endif 32951c0b2f7Stbbdev 33051c0b2f7Stbbdev class ipc_waker : public padded_ipc_worker { 33151c0b2f7Stbbdev private: 33251c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 33351c0b2f7Stbbdev void run(); 33451c0b2f7Stbbdev bool wake_or_launch(); 33551c0b2f7Stbbdev 33651c0b2f7Stbbdev friend class ipc_server; 33751c0b2f7Stbbdev 33851c0b2f7Stbbdev public: 33951c0b2f7Stbbdev ipc_waker(ipc_server& server, tbb_client& client, const size_t i) 34051c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {} 34151c0b2f7Stbbdev }; 34251c0b2f7Stbbdev 34351c0b2f7Stbbdev class ipc_stopper : public padded_ipc_worker { 34451c0b2f7Stbbdev private: 34551c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 34651c0b2f7Stbbdev void run(); 34751c0b2f7Stbbdev bool wake_or_launch(); 34851c0b2f7Stbbdev 34951c0b2f7Stbbdev friend class ipc_server; 35051c0b2f7Stbbdev 35151c0b2f7Stbbdev public: 35251c0b2f7Stbbdev ipc_stopper(ipc_server& server, tbb_client& client, const size_t i) 35351c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {} 35451c0b2f7Stbbdev }; 35551c0b2f7Stbbdev 35651c0b2f7Stbbdev class ipc_server: public tbb_server, no_copy { 35751c0b2f7Stbbdev private: 35851c0b2f7Stbbdev tbb_client& my_client; 35951c0b2f7Stbbdev //! Maximum number of threads to be created. 36051c0b2f7Stbbdev /** Threads are created lazily, so maximum might not actually be reached. */ 36151c0b2f7Stbbdev tbb_client::size_type my_n_thread; 36251c0b2f7Stbbdev 36351c0b2f7Stbbdev //! Stack size for each thread. */ 36451c0b2f7Stbbdev const size_t my_stack_size; 36551c0b2f7Stbbdev 36651c0b2f7Stbbdev //! Number of jobs that could use their associated thread minus number of active threads. 36751c0b2f7Stbbdev /** If negative, indicates oversubscription. 36851c0b2f7Stbbdev If positive, indicates that more threads should run. 36951c0b2f7Stbbdev Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex, 37051c0b2f7Stbbdev because raising it impacts the invariant for sleeping threads. */ 37151c0b2f7Stbbdev std::atomic<int> my_slack; 37251c0b2f7Stbbdev 37351c0b2f7Stbbdev //! Counter used to determine when to delete this. 37451c0b2f7Stbbdev std::atomic<int> my_ref_count; 37551c0b2f7Stbbdev 37651c0b2f7Stbbdev padded_ipc_worker* my_thread_array; 37751c0b2f7Stbbdev 37851c0b2f7Stbbdev //! List of workers that are asleep or committed to sleeping until notified by another thread. 37951c0b2f7Stbbdev std::atomic<ipc_worker*> my_asleep_list_root; 38051c0b2f7Stbbdev 38151c0b2f7Stbbdev //! Protects my_asleep_list_root 38251c0b2f7Stbbdev typedef scheduler_mutex_type asleep_list_mutex_type; 38351c0b2f7Stbbdev asleep_list_mutex_type my_asleep_list_mutex; 38451c0b2f7Stbbdev 38551c0b2f7Stbbdev //! Should server wait workers while terminate 38651c0b2f7Stbbdev const bool my_join_workers; 38751c0b2f7Stbbdev 38851c0b2f7Stbbdev //! Service thread for waking of workers 38951c0b2f7Stbbdev ipc_waker* my_waker; 39051c0b2f7Stbbdev 39151c0b2f7Stbbdev //! Service thread to stop threads 39251c0b2f7Stbbdev ipc_stopper* my_stopper; 39351c0b2f7Stbbdev 39451c0b2f7Stbbdev //! Semaphore to account active threads 39551c0b2f7Stbbdev sem_t* my_active_sem; 39651c0b2f7Stbbdev 39751c0b2f7Stbbdev //! Semaphore to account stop threads 39851c0b2f7Stbbdev sem_t* my_stop_sem; 39951c0b2f7Stbbdev 40051c0b2f7Stbbdev #if TBB_USE_ASSERT 40151c0b2f7Stbbdev std::atomic<int> my_net_slack_requests; 40251c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 40351c0b2f7Stbbdev 40451c0b2f7Stbbdev //! Wake up to two sleeping workers, if there are any sleeping. 40551c0b2f7Stbbdev /** The call is used to propagate a chain reaction where each thread wakes up two threads, 40651c0b2f7Stbbdev which in turn each wake up two threads, etc. */ 40751c0b2f7Stbbdev void propagate_chain_reaction() { 40851c0b2f7Stbbdev // First test of a double-check idiom. Second test is inside wake_some(0). 40951c0b2f7Stbbdev if( my_slack.load(std::memory_order_acquire)>0 ) { 41051c0b2f7Stbbdev int active_threads = 0; 41151c0b2f7Stbbdev if( try_get_active_thread() ) { 41251c0b2f7Stbbdev ++active_threads; 41351c0b2f7Stbbdev if( try_get_active_thread() ) { 41451c0b2f7Stbbdev ++active_threads; 41551c0b2f7Stbbdev } 41651c0b2f7Stbbdev wake_some( 0, active_threads ); 41751c0b2f7Stbbdev } 41851c0b2f7Stbbdev } 41951c0b2f7Stbbdev } 42051c0b2f7Stbbdev 42151c0b2f7Stbbdev //! Try to add t to list of sleeping workers 42251c0b2f7Stbbdev bool try_insert_in_asleep_list(ipc_worker& t); 42351c0b2f7Stbbdev 42451c0b2f7Stbbdev //! Try to add t to list of sleeping workers even if there is some work to do 42551c0b2f7Stbbdev bool try_insert_in_asleep_list_forced(ipc_worker& t); 42651c0b2f7Stbbdev 42751c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits. 42851c0b2f7Stbbdev void wake_some(int additional_slack, int active_threads); 42951c0b2f7Stbbdev 43051c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits. 43151c0b2f7Stbbdev void wake_one_forced(int additional_slack); 43251c0b2f7Stbbdev 43351c0b2f7Stbbdev //! Stop one thread from asleep list 43451c0b2f7Stbbdev bool stop_one(); 43551c0b2f7Stbbdev 43651c0b2f7Stbbdev //! Wait for active thread 43751c0b2f7Stbbdev bool wait_active_thread(); 43851c0b2f7Stbbdev 43951c0b2f7Stbbdev //! Try to get active thread 44051c0b2f7Stbbdev bool try_get_active_thread(); 44151c0b2f7Stbbdev 44251c0b2f7Stbbdev //! Release active thread 44351c0b2f7Stbbdev void release_active_thread(); 44451c0b2f7Stbbdev 44551c0b2f7Stbbdev //! Wait for thread to stop 44651c0b2f7Stbbdev bool wait_stop_thread(); 44751c0b2f7Stbbdev 44851c0b2f7Stbbdev //! Add thread to stop list 44951c0b2f7Stbbdev void add_stop_thread(); 45051c0b2f7Stbbdev 45151c0b2f7Stbbdev void remove_server_ref() { 45251c0b2f7Stbbdev if( --my_ref_count==0 ) { 45351c0b2f7Stbbdev my_client.acknowledge_close_connection(); 45451c0b2f7Stbbdev this->~ipc_server(); 45551c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 ); 45651c0b2f7Stbbdev } 45751c0b2f7Stbbdev } 45851c0b2f7Stbbdev 45951c0b2f7Stbbdev friend class ipc_worker; 46051c0b2f7Stbbdev friend class ipc_waker; 46151c0b2f7Stbbdev friend class ipc_stopper; 46251c0b2f7Stbbdev public: 46351c0b2f7Stbbdev ipc_server(tbb_client& client); 46451c0b2f7Stbbdev virtual ~ipc_server(); 46551c0b2f7Stbbdev 46651c0b2f7Stbbdev version_type version() const override { 46751c0b2f7Stbbdev return 0; 46851c0b2f7Stbbdev } 46951c0b2f7Stbbdev 47051c0b2f7Stbbdev void request_close_connection(bool /*exiting*/) override { 47151c0b2f7Stbbdev my_waker->start_shutdown(false); 47251c0b2f7Stbbdev my_stopper->start_shutdown(false); 47351c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i ) 47451c0b2f7Stbbdev my_thread_array[i].start_shutdown( my_join_workers ); 47551c0b2f7Stbbdev remove_server_ref(); 47651c0b2f7Stbbdev } 47751c0b2f7Stbbdev 47851c0b2f7Stbbdev void yield() override {d0::yield();} 47951c0b2f7Stbbdev 48051c0b2f7Stbbdev void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); } 48151c0b2f7Stbbdev 48251c0b2f7Stbbdev unsigned default_concurrency() const override { return my_n_thread - 1; } 48351c0b2f7Stbbdev 48451c0b2f7Stbbdev void adjust_job_count_estimate(int delta) override; 48551c0b2f7Stbbdev 48651c0b2f7Stbbdev #if _WIN32||_WIN64 487*b15aabb3Stbbdev void register_external_thread(::rml::server::execution_resource_t&) override {} 488*b15aabb3Stbbdev void unregister_external_thread(::rml::server::execution_resource_t) override {} 48951c0b2f7Stbbdev #endif /* _WIN32||_WIN64 */ 49051c0b2f7Stbbdev }; 49151c0b2f7Stbbdev 49251c0b2f7Stbbdev //------------------------------------------------------------------------ 49351c0b2f7Stbbdev // Methods of ipc_worker 49451c0b2f7Stbbdev //------------------------------------------------------------------------ 49551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 49651c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 49751c0b2f7Stbbdev #pragma warning(push) 49851c0b2f7Stbbdev #pragma warning(disable:4189) 49951c0b2f7Stbbdev #endif 50051c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 50151c0b2f7Stbbdev // ensure that stack is properly aligned 50251c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 50351c0b2f7Stbbdev #endif 50451c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) { 50551c0b2f7Stbbdev ipc_worker* self = static_cast<ipc_worker*>(arg); 50651c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 50751c0b2f7Stbbdev self->run(); 50851c0b2f7Stbbdev return 0; 50951c0b2f7Stbbdev } 51051c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 51151c0b2f7Stbbdev #pragma warning(pop) 51251c0b2f7Stbbdev #endif 51351c0b2f7Stbbdev 51451c0b2f7Stbbdev void ipc_worker::release_handle(thread_handle handle, bool join) { 51551c0b2f7Stbbdev if( join ) 51651c0b2f7Stbbdev ipc_thread_monitor::join( handle ); 51751c0b2f7Stbbdev else 51851c0b2f7Stbbdev ipc_thread_monitor::detach_thread( handle ); 51951c0b2f7Stbbdev } 52051c0b2f7Stbbdev 52151c0b2f7Stbbdev void ipc_worker::start_shutdown(bool join) { 52251c0b2f7Stbbdev state_t s = my_state.load(std::memory_order_relaxed);; 52351c0b2f7Stbbdev 52451c0b2f7Stbbdev do { 52551c0b2f7Stbbdev __TBB_ASSERT( s!=st_quit, nullptr ); 52651c0b2f7Stbbdev } while( !my_state.compare_exchange_strong( s, st_quit ) ); 52751c0b2f7Stbbdev if( s==st_normal || s==st_starting ) { 52851c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread. 52951c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack. 53051c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack. 53151c0b2f7Stbbdev my_thread_monitor.notify(); 53251c0b2f7Stbbdev // Do not need release handle in st_init state, 53351c0b2f7Stbbdev // because in this case the thread wasn't started yet. 53451c0b2f7Stbbdev // For st_starting release is done at launch site. 53551c0b2f7Stbbdev if( s==st_normal ) 53651c0b2f7Stbbdev release_handle( my_handle, join ); 53751c0b2f7Stbbdev } 53851c0b2f7Stbbdev } 53951c0b2f7Stbbdev 54051c0b2f7Stbbdev void ipc_worker::start_stopping(bool join) { 54151c0b2f7Stbbdev state_t s = my_state.load(std::memory_order_relaxed);; 54251c0b2f7Stbbdev 54351c0b2f7Stbbdev while( !my_state.compare_exchange_strong( s, st_quit ) ) {}; 54451c0b2f7Stbbdev if( s==st_normal || s==st_starting ) { 54551c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread. 54651c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack. 54751c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack. 54851c0b2f7Stbbdev my_thread_monitor.notify(); 54951c0b2f7Stbbdev // Do not need release handle in st_init state, 55051c0b2f7Stbbdev // because in this case the thread wasn't started yet. 55151c0b2f7Stbbdev // For st_starting release is done at launch site. 55251c0b2f7Stbbdev if( s==st_normal ) 55351c0b2f7Stbbdev release_handle( my_handle, join ); 55451c0b2f7Stbbdev } 55551c0b2f7Stbbdev } 55651c0b2f7Stbbdev 55751c0b2f7Stbbdev void ipc_worker::run() { 55851c0b2f7Stbbdev my_server.propagate_chain_reaction(); 55951c0b2f7Stbbdev 56051c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 56151c0b2f7Stbbdev // which would create race with the launching thread and 56251c0b2f7Stbbdev // complications in handle management on Windows. 56351c0b2f7Stbbdev 56451c0b2f7Stbbdev ::rml::job& j = *my_client.create_one_job(); 56551c0b2f7Stbbdev state_t state = my_state.load(std::memory_order_acquire); 56651c0b2f7Stbbdev while( state!=st_quit && state!=st_stop ) { 56751c0b2f7Stbbdev if( my_server.my_slack>=0 ) { 56851c0b2f7Stbbdev my_client.process(j); 56951c0b2f7Stbbdev } else { 57051c0b2f7Stbbdev ipc_thread_monitor::cookie c; 57151c0b2f7Stbbdev // Prepare to wait 57251c0b2f7Stbbdev my_thread_monitor.prepare_wait(c); 57351c0b2f7Stbbdev // Check/set the invariant for sleeping 57451c0b2f7Stbbdev state = my_state.load(std::memory_order_acquire); 57551c0b2f7Stbbdev if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) { 57651c0b2f7Stbbdev if( my_server.my_n_thread > 1 ) my_server.release_active_thread(); 57751c0b2f7Stbbdev my_thread_monitor.commit_wait(c); 57851c0b2f7Stbbdev my_server.propagate_chain_reaction(); 57951c0b2f7Stbbdev } else { 58051c0b2f7Stbbdev // Invariant broken 58151c0b2f7Stbbdev my_thread_monitor.cancel_wait(); 58251c0b2f7Stbbdev } 58351c0b2f7Stbbdev } 58451c0b2f7Stbbdev state = my_state.load(std::memory_order_acquire); 58551c0b2f7Stbbdev } 58651c0b2f7Stbbdev my_client.cleanup(j); 58751c0b2f7Stbbdev 58851c0b2f7Stbbdev my_server.remove_server_ref(); 58951c0b2f7Stbbdev } 59051c0b2f7Stbbdev 59151c0b2f7Stbbdev inline bool ipc_worker::wake_or_launch() { 59251c0b2f7Stbbdev state_t excepted_stop = st_stop, expected_init = st_init; 59351c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) || 59451c0b2f7Stbbdev ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) { 59551c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 59651c0b2f7Stbbdev #if USE_WINTHREAD 59751c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 59851c0b2f7Stbbdev #elif USE_PTHREAD 59951c0b2f7Stbbdev { 60051c0b2f7Stbbdev affinity_helper fpa; 60151c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 60251c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 60351c0b2f7Stbbdev if( my_handle == 0 ) { 60451c0b2f7Stbbdev // Unable to create new thread for process 60551c0b2f7Stbbdev // However, this is expected situation for the use cases of this coordination server 60651c0b2f7Stbbdev state_t s = st_starting; 60751c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_init ); 60851c0b2f7Stbbdev if (st_starting != s) { 60951c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 61051c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 61151c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 61251c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 61351c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 61451c0b2f7Stbbdev } 61551c0b2f7Stbbdev return false; 61651c0b2f7Stbbdev } else { 61751c0b2f7Stbbdev my_server.my_ref_count++; 61851c0b2f7Stbbdev } 61951c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 62051c0b2f7Stbbdev } 62151c0b2f7Stbbdev #endif /* USE_PTHREAD */ 62251c0b2f7Stbbdev state_t s = st_starting; 62351c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_normal ); 62451c0b2f7Stbbdev if( st_starting!=s ) { 62551c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 62651c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 62751c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 62851c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 62951c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 63051c0b2f7Stbbdev } 63151c0b2f7Stbbdev } 63251c0b2f7Stbbdev else { 63351c0b2f7Stbbdev my_thread_monitor.notify(); 63451c0b2f7Stbbdev } 63551c0b2f7Stbbdev 63651c0b2f7Stbbdev return true; 63751c0b2f7Stbbdev } 63851c0b2f7Stbbdev 63951c0b2f7Stbbdev //------------------------------------------------------------------------ 64051c0b2f7Stbbdev // Methods of ipc_waker 64151c0b2f7Stbbdev //------------------------------------------------------------------------ 64251c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 64351c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 64451c0b2f7Stbbdev #pragma warning(push) 64551c0b2f7Stbbdev #pragma warning(disable:4189) 64651c0b2f7Stbbdev #endif 64751c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 64851c0b2f7Stbbdev // ensure that stack is properly aligned 64951c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 65051c0b2f7Stbbdev #endif 65151c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) { 65251c0b2f7Stbbdev ipc_waker* self = static_cast<ipc_waker*>(arg); 65351c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 65451c0b2f7Stbbdev self->run(); 65551c0b2f7Stbbdev return 0; 65651c0b2f7Stbbdev } 65751c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 65851c0b2f7Stbbdev #pragma warning(pop) 65951c0b2f7Stbbdev #endif 66051c0b2f7Stbbdev 66151c0b2f7Stbbdev void ipc_waker::run() { 66251c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 66351c0b2f7Stbbdev // which would create race with the launching thread and 66451c0b2f7Stbbdev // complications in handle management on Windows. 66551c0b2f7Stbbdev 66651c0b2f7Stbbdev while( my_state.load(std::memory_order_acquire)!=st_quit ) { 66751c0b2f7Stbbdev bool have_to_sleep = false; 66851c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 66951c0b2f7Stbbdev if( my_server.wait_active_thread() ) { 67051c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 67151c0b2f7Stbbdev my_server.wake_some( 0, 1 ); 67251c0b2f7Stbbdev } else { 67351c0b2f7Stbbdev my_server.release_active_thread(); 67451c0b2f7Stbbdev have_to_sleep = true; 67551c0b2f7Stbbdev } 67651c0b2f7Stbbdev } 67751c0b2f7Stbbdev } else { 67851c0b2f7Stbbdev have_to_sleep = true; 67951c0b2f7Stbbdev } 68051c0b2f7Stbbdev if( have_to_sleep ) { 68151c0b2f7Stbbdev ipc_thread_monitor::cookie c; 68251c0b2f7Stbbdev // Prepare to wait 68351c0b2f7Stbbdev my_thread_monitor.prepare_wait(c); 68451c0b2f7Stbbdev // Check/set the invariant for sleeping 68551c0b2f7Stbbdev if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) { 68651c0b2f7Stbbdev my_thread_monitor.commit_wait(c); 68751c0b2f7Stbbdev } else { 68851c0b2f7Stbbdev // Invariant broken 68951c0b2f7Stbbdev my_thread_monitor.cancel_wait(); 69051c0b2f7Stbbdev } 69151c0b2f7Stbbdev } 69251c0b2f7Stbbdev } 69351c0b2f7Stbbdev 69451c0b2f7Stbbdev my_server.remove_server_ref(); 69551c0b2f7Stbbdev } 69651c0b2f7Stbbdev 69751c0b2f7Stbbdev inline bool ipc_waker::wake_or_launch() { 69851c0b2f7Stbbdev state_t excepted = st_init; 69951c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 70051c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 70151c0b2f7Stbbdev #if USE_WINTHREAD 70251c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 70351c0b2f7Stbbdev #elif USE_PTHREAD 70451c0b2f7Stbbdev { 70551c0b2f7Stbbdev affinity_helper fpa; 70651c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 70751c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 70851c0b2f7Stbbdev if( my_handle == 0 ) { 70951c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() ); 71051c0b2f7Stbbdev state_t s = st_starting; 71151c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init); 71251c0b2f7Stbbdev if (st_starting != s) { 71351c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 71451c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 71551c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 71651c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 71751c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 71851c0b2f7Stbbdev } 71951c0b2f7Stbbdev return false; 72051c0b2f7Stbbdev } else { 72151c0b2f7Stbbdev my_server.my_ref_count++; 72251c0b2f7Stbbdev } 72351c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 72451c0b2f7Stbbdev } 72551c0b2f7Stbbdev #endif /* USE_PTHREAD */ 72651c0b2f7Stbbdev state_t s = st_starting; 72751c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal); 72851c0b2f7Stbbdev if( st_starting!=s ) { 72951c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 73051c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 73151c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 73251c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 73351c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 73451c0b2f7Stbbdev } 73551c0b2f7Stbbdev } 73651c0b2f7Stbbdev else { 73751c0b2f7Stbbdev my_thread_monitor.notify(); 73851c0b2f7Stbbdev } 73951c0b2f7Stbbdev 74051c0b2f7Stbbdev return true; 74151c0b2f7Stbbdev } 74251c0b2f7Stbbdev 74351c0b2f7Stbbdev //------------------------------------------------------------------------ 74451c0b2f7Stbbdev // Methods of ipc_stopper 74551c0b2f7Stbbdev //------------------------------------------------------------------------ 74651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 74751c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 74851c0b2f7Stbbdev #pragma warning(push) 74951c0b2f7Stbbdev #pragma warning(disable:4189) 75051c0b2f7Stbbdev #endif 75151c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 75251c0b2f7Stbbdev // ensure that stack is properly aligned 75351c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 75451c0b2f7Stbbdev #endif 75551c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) { 75651c0b2f7Stbbdev ipc_stopper* self = static_cast<ipc_stopper*>(arg); 75751c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 75851c0b2f7Stbbdev self->run(); 75951c0b2f7Stbbdev return 0; 76051c0b2f7Stbbdev } 76151c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 76251c0b2f7Stbbdev #pragma warning(pop) 76351c0b2f7Stbbdev #endif 76451c0b2f7Stbbdev 76551c0b2f7Stbbdev void ipc_stopper::run() { 76651c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 76751c0b2f7Stbbdev // which would create race with the launching thread and 76851c0b2f7Stbbdev // complications in handle management on Windows. 76951c0b2f7Stbbdev 77051c0b2f7Stbbdev while( my_state.load(std::memory_order_acquire)!=st_quit ) { 77151c0b2f7Stbbdev if( my_server.wait_stop_thread() ) { 77251c0b2f7Stbbdev if( my_state.load(std::memory_order_acquire)!=st_quit ) { 77351c0b2f7Stbbdev if( !my_server.stop_one() ) { 77451c0b2f7Stbbdev my_server.add_stop_thread(); 77551c0b2f7Stbbdev tbb::detail::r1::prolonged_pause(); 77651c0b2f7Stbbdev } 77751c0b2f7Stbbdev } 77851c0b2f7Stbbdev } 77951c0b2f7Stbbdev } 78051c0b2f7Stbbdev 78151c0b2f7Stbbdev my_server.remove_server_ref(); 78251c0b2f7Stbbdev } 78351c0b2f7Stbbdev 78451c0b2f7Stbbdev inline bool ipc_stopper::wake_or_launch() { 78551c0b2f7Stbbdev state_t excepted = st_init; 78651c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 78751c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 78851c0b2f7Stbbdev #if USE_WINTHREAD 78951c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 79051c0b2f7Stbbdev #elif USE_PTHREAD 79151c0b2f7Stbbdev { 79251c0b2f7Stbbdev affinity_helper fpa; 79351c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 79451c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 79551c0b2f7Stbbdev if( my_handle == 0 ) { 79651c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() ); 79751c0b2f7Stbbdev state_t s = st_starting; 79851c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init); 79951c0b2f7Stbbdev if (st_starting != s) { 80051c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 80151c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 80251c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 80351c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 80451c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 80551c0b2f7Stbbdev } 80651c0b2f7Stbbdev return false; 80751c0b2f7Stbbdev } else { 80851c0b2f7Stbbdev my_server.my_ref_count++; 80951c0b2f7Stbbdev } 81051c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 81151c0b2f7Stbbdev } 81251c0b2f7Stbbdev #endif /* USE_PTHREAD */ 81351c0b2f7Stbbdev state_t s = st_starting; 81451c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal); 81551c0b2f7Stbbdev if( st_starting!=s ) { 81651c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 81751c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 81851c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 81951c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 82051c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 82151c0b2f7Stbbdev } 82251c0b2f7Stbbdev } 82351c0b2f7Stbbdev else { 82451c0b2f7Stbbdev my_thread_monitor.notify(); 82551c0b2f7Stbbdev } 82651c0b2f7Stbbdev 82751c0b2f7Stbbdev return true; 82851c0b2f7Stbbdev } 82951c0b2f7Stbbdev 83051c0b2f7Stbbdev //------------------------------------------------------------------------ 83151c0b2f7Stbbdev // Methods of ipc_server 83251c0b2f7Stbbdev //------------------------------------------------------------------------ 83351c0b2f7Stbbdev ipc_server::ipc_server(tbb_client& client) : 83451c0b2f7Stbbdev my_client( client ), 83551c0b2f7Stbbdev my_stack_size( client.min_stack_size() ), 83651c0b2f7Stbbdev my_thread_array(nullptr), 83751c0b2f7Stbbdev my_join_workers(false), 83851c0b2f7Stbbdev my_waker(nullptr), 83951c0b2f7Stbbdev my_stopper(nullptr) 84051c0b2f7Stbbdev { 84151c0b2f7Stbbdev my_ref_count = 1; 84251c0b2f7Stbbdev my_slack = 0; 84351c0b2f7Stbbdev #if TBB_USE_ASSERT 84451c0b2f7Stbbdev my_net_slack_requests = 0; 84551c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 84651c0b2f7Stbbdev my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME); 84751c0b2f7Stbbdev if( my_n_thread==0 ) { 84851c0b2f7Stbbdev my_n_thread = tbb::detail::r1::AvailableHwConcurrency(); 84951c0b2f7Stbbdev __TBB_ASSERT( my_n_thread>0, nullptr ); 85051c0b2f7Stbbdev } 85151c0b2f7Stbbdev 85251c0b2f7Stbbdev my_asleep_list_root = nullptr; 85351c0b2f7Stbbdev my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread ); 85451c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i ) { 85551c0b2f7Stbbdev ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i ); 85651c0b2f7Stbbdev t->my_next = my_asleep_list_root; 85751c0b2f7Stbbdev my_asleep_list_root = t; 85851c0b2f7Stbbdev } 85951c0b2f7Stbbdev 86051c0b2f7Stbbdev my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1); 86151c0b2f7Stbbdev new( my_waker ) ipc_waker( *this, client, my_n_thread ); 86251c0b2f7Stbbdev 86351c0b2f7Stbbdev my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1); 86451c0b2f7Stbbdev new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 ); 86551c0b2f7Stbbdev 86651c0b2f7Stbbdev char* active_sem_name = get_active_sem_name(); 86751c0b2f7Stbbdev my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 ); 86851c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 86951c0b2f7Stbbdev delete[] active_sem_name; 87051c0b2f7Stbbdev 87151c0b2f7Stbbdev char* stop_sem_name = get_stop_sem_name(); 87251c0b2f7Stbbdev my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 ); 87351c0b2f7Stbbdev __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" ); 87451c0b2f7Stbbdev delete[] stop_sem_name; 87551c0b2f7Stbbdev } 87651c0b2f7Stbbdev 87751c0b2f7Stbbdev ipc_server::~ipc_server() { 87851c0b2f7Stbbdev __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr ); 87951c0b2f7Stbbdev 88051c0b2f7Stbbdev for( size_t i=my_n_thread; i--; ) 88151c0b2f7Stbbdev my_thread_array[i].~padded_ipc_worker(); 88251c0b2f7Stbbdev tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread ); 88351c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_thread_array ); 88451c0b2f7Stbbdev 88551c0b2f7Stbbdev my_waker->~ipc_waker(); 88651c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 ); 88751c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_waker ); 88851c0b2f7Stbbdev 88951c0b2f7Stbbdev my_stopper->~ipc_stopper(); 89051c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 ); 89151c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_stopper ); 89251c0b2f7Stbbdev 89351c0b2f7Stbbdev sem_close( my_active_sem ); 89451c0b2f7Stbbdev sem_close( my_stop_sem ); 89551c0b2f7Stbbdev } 89651c0b2f7Stbbdev 89751c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) { 89851c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock; 89951c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) ) 90051c0b2f7Stbbdev return false; 90151c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack, 90251c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up. 90351c0b2f7Stbbdev int k = ++my_slack; 90451c0b2f7Stbbdev if( k<=0 ) { 90551c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 90651c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed); 90751c0b2f7Stbbdev return true; 90851c0b2f7Stbbdev } else { 90951c0b2f7Stbbdev --my_slack; 91051c0b2f7Stbbdev return false; 91151c0b2f7Stbbdev } 91251c0b2f7Stbbdev } 91351c0b2f7Stbbdev 91451c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) { 91551c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock; 91651c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) ) 91751c0b2f7Stbbdev return false; 91851c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack, 91951c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up. 92051c0b2f7Stbbdev ++my_slack; 92151c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 92251c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed); 92351c0b2f7Stbbdev return true; 92451c0b2f7Stbbdev } 92551c0b2f7Stbbdev 92651c0b2f7Stbbdev inline bool ipc_server::wait_active_thread() { 92751c0b2f7Stbbdev if( sem_wait( my_active_sem ) == 0 ) { 92851c0b2f7Stbbdev ++my_global_thread_count; 92951c0b2f7Stbbdev return true; 93051c0b2f7Stbbdev } 93151c0b2f7Stbbdev return false; 93251c0b2f7Stbbdev } 93351c0b2f7Stbbdev 93451c0b2f7Stbbdev inline bool ipc_server::try_get_active_thread() { 93551c0b2f7Stbbdev if( sem_trywait( my_active_sem ) == 0 ) { 93651c0b2f7Stbbdev ++my_global_thread_count; 93751c0b2f7Stbbdev return true; 93851c0b2f7Stbbdev } 93951c0b2f7Stbbdev return false; 94051c0b2f7Stbbdev } 94151c0b2f7Stbbdev 94251c0b2f7Stbbdev inline void ipc_server::release_active_thread() { 94351c0b2f7Stbbdev release_thread_sem( my_active_sem ); 94451c0b2f7Stbbdev } 94551c0b2f7Stbbdev 94651c0b2f7Stbbdev inline bool ipc_server::wait_stop_thread() { 94751c0b2f7Stbbdev struct timespec ts; 94851c0b2f7Stbbdev if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) { 94951c0b2f7Stbbdev ts.tv_sec++; 95051c0b2f7Stbbdev if( sem_timedwait( my_stop_sem, &ts )==0 ) { 95151c0b2f7Stbbdev return true; 95251c0b2f7Stbbdev } 95351c0b2f7Stbbdev } 95451c0b2f7Stbbdev return false; 95551c0b2f7Stbbdev } 95651c0b2f7Stbbdev 95751c0b2f7Stbbdev inline void ipc_server::add_stop_thread() { 95851c0b2f7Stbbdev sem_post( my_stop_sem ); 95951c0b2f7Stbbdev } 96051c0b2f7Stbbdev 96151c0b2f7Stbbdev void ipc_server::wake_some( int additional_slack, int active_threads ) { 96251c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr ); 96351c0b2f7Stbbdev ipc_worker* wakee[2]; 96451c0b2f7Stbbdev ipc_worker **w = wakee; 96551c0b2f7Stbbdev { 96651c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 96751c0b2f7Stbbdev while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) { 96851c0b2f7Stbbdev if( additional_slack>0 ) { 96951c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 97051c0b2f7Stbbdev break; 97151c0b2f7Stbbdev --additional_slack; 97251c0b2f7Stbbdev } else { 97351c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack 97451c0b2f7Stbbdev int old; 97551c0b2f7Stbbdev do { 97651c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed); 97751c0b2f7Stbbdev if( old<=0 ) goto done; 97851c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 97951c0b2f7Stbbdev } 98051c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack 98151c0b2f7Stbbdev my_asleep_list_root.store( 98251c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 98351c0b2f7Stbbdev std::memory_order_relaxed 98451c0b2f7Stbbdev ); 98551c0b2f7Stbbdev --active_threads; 98651c0b2f7Stbbdev } 98751c0b2f7Stbbdev if( additional_slack ) { 98851c0b2f7Stbbdev // Contribute our unused slack to my_slack. 98951c0b2f7Stbbdev my_slack += additional_slack; 99051c0b2f7Stbbdev } 99151c0b2f7Stbbdev } 99251c0b2f7Stbbdev done: 99351c0b2f7Stbbdev while( w>wakee ) { 99451c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) { 99551c0b2f7Stbbdev add_stop_thread(); 99651c0b2f7Stbbdev do { 99751c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) ); 99851c0b2f7Stbbdev release_active_thread(); 99951c0b2f7Stbbdev } 100051c0b2f7Stbbdev } 100151c0b2f7Stbbdev while( active_threads ) { 100251c0b2f7Stbbdev release_active_thread(); 100351c0b2f7Stbbdev --active_threads; 100451c0b2f7Stbbdev } 100551c0b2f7Stbbdev } 100651c0b2f7Stbbdev 100751c0b2f7Stbbdev void ipc_server::wake_one_forced( int additional_slack ) { 100851c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr ); 100951c0b2f7Stbbdev ipc_worker* wakee[1]; 101051c0b2f7Stbbdev ipc_worker **w = wakee; 101151c0b2f7Stbbdev { 101251c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 101351c0b2f7Stbbdev while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) { 101451c0b2f7Stbbdev if( additional_slack>0 ) { 101551c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 101651c0b2f7Stbbdev break; 101751c0b2f7Stbbdev --additional_slack; 101851c0b2f7Stbbdev } else { 101951c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack 102051c0b2f7Stbbdev int old; 102151c0b2f7Stbbdev do { 102251c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed); 102351c0b2f7Stbbdev if( old<=0 ) goto done; 102451c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 102551c0b2f7Stbbdev } 102651c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack 102751c0b2f7Stbbdev my_asleep_list_root.store( 102851c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 102951c0b2f7Stbbdev std::memory_order_relaxed); 103051c0b2f7Stbbdev } 103151c0b2f7Stbbdev if( additional_slack ) { 103251c0b2f7Stbbdev // Contribute our unused slack to my_slack. 103351c0b2f7Stbbdev my_slack += additional_slack; 103451c0b2f7Stbbdev } 103551c0b2f7Stbbdev } 103651c0b2f7Stbbdev done: 103751c0b2f7Stbbdev while( w>wakee ) { 103851c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) { 103951c0b2f7Stbbdev add_stop_thread(); 104051c0b2f7Stbbdev do { 104151c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) ); 104251c0b2f7Stbbdev } 104351c0b2f7Stbbdev } 104451c0b2f7Stbbdev } 104551c0b2f7Stbbdev 104651c0b2f7Stbbdev bool ipc_server::stop_one() { 104751c0b2f7Stbbdev ipc_worker* current = nullptr; 104851c0b2f7Stbbdev ipc_worker* next = nullptr; 104951c0b2f7Stbbdev { 105051c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 105151c0b2f7Stbbdev if( my_asleep_list_root.load(std::memory_order_relaxed) ) { 105251c0b2f7Stbbdev current = my_asleep_list_root.load(std::memory_order_relaxed); 105351c0b2f7Stbbdev if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 105451c0b2f7Stbbdev next = current->my_next; 105551c0b2f7Stbbdev while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 105651c0b2f7Stbbdev current = next; 105751c0b2f7Stbbdev next = current->my_next; 105851c0b2f7Stbbdev } 105951c0b2f7Stbbdev current->start_stopping( my_join_workers ); 106051c0b2f7Stbbdev return true; 106151c0b2f7Stbbdev } 106251c0b2f7Stbbdev } 106351c0b2f7Stbbdev } 106451c0b2f7Stbbdev return false; 106551c0b2f7Stbbdev } 106651c0b2f7Stbbdev 106751c0b2f7Stbbdev void ipc_server::adjust_job_count_estimate( int delta ) { 106851c0b2f7Stbbdev #if TBB_USE_ASSERT 106951c0b2f7Stbbdev my_net_slack_requests+=delta; 107051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 107151c0b2f7Stbbdev if( my_n_thread > 1 ) { 107251c0b2f7Stbbdev if( delta<0 ) { 107351c0b2f7Stbbdev my_slack+=delta; 107451c0b2f7Stbbdev } else if( delta>0 ) { 107551c0b2f7Stbbdev int active_threads = 0; 107651c0b2f7Stbbdev if( try_get_active_thread() ) { 107751c0b2f7Stbbdev ++active_threads; 107851c0b2f7Stbbdev if( try_get_active_thread() ) { 107951c0b2f7Stbbdev ++active_threads; 108051c0b2f7Stbbdev } 108151c0b2f7Stbbdev } 108251c0b2f7Stbbdev wake_some( delta, active_threads ); 108351c0b2f7Stbbdev 108451c0b2f7Stbbdev if( !my_waker->wake_or_launch() ) { 108551c0b2f7Stbbdev add_stop_thread(); 108651c0b2f7Stbbdev } 108751c0b2f7Stbbdev if( !my_stopper->wake_or_launch() ) { 108851c0b2f7Stbbdev add_stop_thread(); 108951c0b2f7Stbbdev } 109051c0b2f7Stbbdev } 109151c0b2f7Stbbdev } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one 109251c0b2f7Stbbdev if( delta<0 ) { 109351c0b2f7Stbbdev my_slack += delta; 109451c0b2f7Stbbdev } else { 109551c0b2f7Stbbdev wake_one_forced( delta ); 109651c0b2f7Stbbdev } 109751c0b2f7Stbbdev } 109851c0b2f7Stbbdev } 109951c0b2f7Stbbdev 110051c0b2f7Stbbdev //------------------------------------------------------------------------ 110151c0b2f7Stbbdev // RML factory methods 110251c0b2f7Stbbdev //------------------------------------------------------------------------ 110351c0b2f7Stbbdev 110451c0b2f7Stbbdev #if USE_PTHREAD 110551c0b2f7Stbbdev 110651c0b2f7Stbbdev static tbb_client* my_global_client = nullptr; 110751c0b2f7Stbbdev static tbb_server* my_global_server = nullptr; 110851c0b2f7Stbbdev 110951c0b2f7Stbbdev void rml_atexit() { 111051c0b2f7Stbbdev release_resources(); 111151c0b2f7Stbbdev } 111251c0b2f7Stbbdev 111351c0b2f7Stbbdev void rml_atfork_child() { 111451c0b2f7Stbbdev if( my_global_server!=nullptr && my_global_client!=nullptr ) { 111551c0b2f7Stbbdev ipc_server* server = static_cast<ipc_server*>( my_global_server ); 111651c0b2f7Stbbdev server->~ipc_server(); 111751c0b2f7Stbbdev // memset( server, 0, sizeof(ipc_server) ); 111851c0b2f7Stbbdev new( server ) ipc_server( *my_global_client ); 111951c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child ); 112051c0b2f7Stbbdev atexit( rml_atexit ); 112151c0b2f7Stbbdev } 112251c0b2f7Stbbdev } 112351c0b2f7Stbbdev 112451c0b2f7Stbbdev #endif /* USE_PTHREAD */ 112551c0b2f7Stbbdev 112651c0b2f7Stbbdev extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) { 112751c0b2f7Stbbdev server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client); 112851c0b2f7Stbbdev #if USE_PTHREAD 112951c0b2f7Stbbdev my_global_client = &client; 113051c0b2f7Stbbdev my_global_server = server; 113151c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child ); 113251c0b2f7Stbbdev atexit( rml_atexit ); 113351c0b2f7Stbbdev #endif /* USE_PTHREAD */ 113451c0b2f7Stbbdev if( getenv( "RML_DEBUG" ) ) { 113551c0b2f7Stbbdev runtime_warning("IPC server is started"); 113651c0b2f7Stbbdev } 113751c0b2f7Stbbdev return tbb_factory::st_success; 113851c0b2f7Stbbdev } 113951c0b2f7Stbbdev 114051c0b2f7Stbbdev extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) { 114151c0b2f7Stbbdev } 114251c0b2f7Stbbdev 114351c0b2f7Stbbdev } // namespace rml 114451c0b2f7Stbbdev } // namespace detail 114551c0b2f7Stbbdev 114651c0b2f7Stbbdev } // namespace tbb 1147