151c0b2f7Stbbdev /* 2*690aaf49SSergey Zheltov Copyright (c) 2017-2022 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev #include <atomic> 1851c0b2f7Stbbdev #include <cstring> 1951c0b2f7Stbbdev #include <cstdlib> 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev #include "../../src/tbb/rml_tbb.h" 2251c0b2f7Stbbdev #include "../../src/tbb/rml_thread_monitor.h" 2351c0b2f7Stbbdev #include "../../src/tbb/scheduler_common.h" 2451c0b2f7Stbbdev #include "../../src/tbb/governor.h" 2551c0b2f7Stbbdev #include "../../src/tbb/misc.h" 2651c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h" 2751c0b2f7Stbbdev 2851c0b2f7Stbbdev #include "ipc_utils.h" 2951c0b2f7Stbbdev 3051c0b2f7Stbbdev #include <fcntl.h> 3151c0b2f7Stbbdev #include <stdlib.h> 3251c0b2f7Stbbdev 3351c0b2f7Stbbdev namespace rml { 3451c0b2f7Stbbdev namespace internal { 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE"; 3751c0b2f7Stbbdev 3851c0b2f7Stbbdev typedef versioned_object::version_type version_type; 3951c0b2f7Stbbdev 4051c0b2f7Stbbdev extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) { 4151c0b2f7Stbbdev if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) { 4251c0b2f7Stbbdev return factory::st_incompatible; 4351c0b2f7Stbbdev } 4451c0b2f7Stbbdev 4551c0b2f7Stbbdev // Hack to keep this library from being closed 4651c0b2f7Stbbdev static std::atomic<bool> one_time_flag{false}; 4751c0b2f7Stbbdev bool expected = false; 4851c0b2f7Stbbdev 4951c0b2f7Stbbdev if( one_time_flag.compare_exchange_strong(expected, true) ) { 5051c0b2f7Stbbdev __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr ); 5151c0b2f7Stbbdev #if _WIN32||_WIN64 5251c0b2f7Stbbdev f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload); 5351c0b2f7Stbbdev #else 5451c0b2f7Stbbdev f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload); 5551c0b2f7Stbbdev #endif 5651c0b2f7Stbbdev } 5751c0b2f7Stbbdev // End of hack 5851c0b2f7Stbbdev 5951c0b2f7Stbbdev return factory::st_success; 6051c0b2f7Stbbdev } 6151c0b2f7Stbbdev 6251c0b2f7Stbbdev extern "C" void __RML_close_factory(factory& /*f*/) { 6351c0b2f7Stbbdev } 6451c0b2f7Stbbdev 6551c0b2f7Stbbdev class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor { 6651c0b2f7Stbbdev public: 6751c0b2f7Stbbdev ipc_thread_monitor() : thread_monitor() {} 6851c0b2f7Stbbdev 6951c0b2f7Stbbdev #if USE_WINTHREAD 7051c0b2f7Stbbdev #elif USE_PTHREAD 7151c0b2f7Stbbdev static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size); 7251c0b2f7Stbbdev #endif 7351c0b2f7Stbbdev }; 7451c0b2f7Stbbdev 7551c0b2f7Stbbdev #if USE_WINTHREAD 7651c0b2f7Stbbdev #elif USE_PTHREAD 7751c0b2f7Stbbdev inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) { 7851c0b2f7Stbbdev pthread_attr_t s; 7951c0b2f7Stbbdev if( pthread_attr_init( &s ) ) return 0; 8051c0b2f7Stbbdev if( stack_size>0 ) { 8151c0b2f7Stbbdev if( pthread_attr_setstacksize( &s, stack_size ) ) return 0; 8251c0b2f7Stbbdev } 8351c0b2f7Stbbdev pthread_t handle; 8451c0b2f7Stbbdev if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0; 8551c0b2f7Stbbdev if( pthread_attr_destroy( &s ) ) return 0; 8651c0b2f7Stbbdev return handle; 8751c0b2f7Stbbdev } 8851c0b2f7Stbbdev #endif 8951c0b2f7Stbbdev 9051c0b2f7Stbbdev }} // rml::internal 9151c0b2f7Stbbdev 9251c0b2f7Stbbdev using rml::internal::ipc_thread_monitor; 9351c0b2f7Stbbdev using tbb::internal::rml::get_shared_name; 9451c0b2f7Stbbdev 9551c0b2f7Stbbdev namespace tbb { 9651c0b2f7Stbbdev namespace detail { 9751c0b2f7Stbbdev 9851c0b2f7Stbbdev namespace r1 { 9951c0b2f7Stbbdev bool terminate_on_exception() { 10051c0b2f7Stbbdev return false; 10151c0b2f7Stbbdev } 10251c0b2f7Stbbdev } 10351c0b2f7Stbbdev 10451c0b2f7Stbbdev namespace rml { 10551c0b2f7Stbbdev 10651c0b2f7Stbbdev typedef ipc_thread_monitor::handle_type thread_handle; 10751c0b2f7Stbbdev 10851c0b2f7Stbbdev class ipc_server; 10951c0b2f7Stbbdev 11051c0b2f7Stbbdev static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS"; 11151c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active"; 11251c0b2f7Stbbdev static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop"; 11351c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE"; 11451c0b2f7Stbbdev static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE"; 11551c0b2f7Stbbdev static const mode_t IPC_SEM_MODE = 0660; 11651c0b2f7Stbbdev 11751c0b2f7Stbbdev static std::atomic<int> my_global_thread_count; 11851c0b2f7Stbbdev using tbb_client = tbb::detail::r1::rml::tbb_client; 11951c0b2f7Stbbdev using tbb_server = tbb::detail::r1::rml::tbb_server; 12051c0b2f7Stbbdev using tbb_factory = tbb::detail::r1::rml::tbb_factory; 12151c0b2f7Stbbdev 12251c0b2f7Stbbdev using tbb::detail::r1::runtime_warning; 12351c0b2f7Stbbdev 12451c0b2f7Stbbdev char* get_sem_name(const char* name, const char* prefix) { 12551c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr); 12651c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr); 12751c0b2f7Stbbdev char* value = std::getenv(name); 12851c0b2f7Stbbdev std::size_t len = value == nullptr ? 0 : std::strlen(value); 12951c0b2f7Stbbdev if (len > 0) { 13051c0b2f7Stbbdev // TODO: consider returning the original string instead of the copied string. 13151c0b2f7Stbbdev char* sem_name = new char[len + 1]; 13251c0b2f7Stbbdev __TBB_ASSERT(sem_name != nullptr, nullptr); 13351c0b2f7Stbbdev std::strncpy(sem_name, value, len+1); 13451c0b2f7Stbbdev __TBB_ASSERT(sem_name[len] == 0, nullptr); 13551c0b2f7Stbbdev return sem_name; 13651c0b2f7Stbbdev } else { 13751c0b2f7Stbbdev return get_shared_name(prefix); 13851c0b2f7Stbbdev } 13951c0b2f7Stbbdev } 14051c0b2f7Stbbdev 14151c0b2f7Stbbdev char* get_active_sem_name() { 14251c0b2f7Stbbdev return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 14351c0b2f7Stbbdev } 14451c0b2f7Stbbdev 14551c0b2f7Stbbdev char* get_stop_sem_name() { 14651c0b2f7Stbbdev return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 14751c0b2f7Stbbdev } 14851c0b2f7Stbbdev 14951c0b2f7Stbbdev static void release_thread_sem(sem_t* my_sem) { 15051c0b2f7Stbbdev int old = my_global_thread_count.load(std::memory_order_relaxed); 15151c0b2f7Stbbdev do { 15251c0b2f7Stbbdev if( old<=0 ) return; 15351c0b2f7Stbbdev } while( !my_global_thread_count.compare_exchange_strong(old, old-1) ); 15451c0b2f7Stbbdev if( old>0 ) { 15551c0b2f7Stbbdev sem_post( my_sem ); 15651c0b2f7Stbbdev } 15751c0b2f7Stbbdev } 15851c0b2f7Stbbdev 15951c0b2f7Stbbdev void set_sem_name(const char* name, const char* prefix) { 16051c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr); 16151c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr); 16251c0b2f7Stbbdev const char* postfix = "_XXXXXX"; 16351c0b2f7Stbbdev std::size_t plen = std::strlen(prefix); 16451c0b2f7Stbbdev std::size_t xlen = std::strlen(postfix); 16551c0b2f7Stbbdev char* templ = new char[plen + xlen + 1]; 16651c0b2f7Stbbdev __TBB_ASSERT(templ != nullptr, nullptr); 16751c0b2f7Stbbdev strncpy(templ, prefix, plen+1); 16851c0b2f7Stbbdev __TBB_ASSERT(templ[plen] == 0, nullptr); 16951c0b2f7Stbbdev strncat(templ, postfix, xlen + 1); 17051c0b2f7Stbbdev __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr); 17151c0b2f7Stbbdev // TODO: consider using mkstemp instead of mktemp. 17251c0b2f7Stbbdev char* sem_name = mktemp(templ); 17351c0b2f7Stbbdev if (sem_name != nullptr) { 17451c0b2f7Stbbdev int status = setenv(name, sem_name, /*overwrite*/ 1); 17551c0b2f7Stbbdev __TBB_ASSERT_EX(status == 0, nullptr); 17651c0b2f7Stbbdev } 17751c0b2f7Stbbdev delete[] templ; 17851c0b2f7Stbbdev } 17951c0b2f7Stbbdev 18051c0b2f7Stbbdev extern "C" void set_active_sem_name() { 18151c0b2f7Stbbdev set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX); 18251c0b2f7Stbbdev } 18351c0b2f7Stbbdev 18451c0b2f7Stbbdev extern "C" void set_stop_sem_name() { 18551c0b2f7Stbbdev set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX); 18651c0b2f7Stbbdev } 18751c0b2f7Stbbdev 18851c0b2f7Stbbdev extern "C" void release_resources() { 18951c0b2f7Stbbdev if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) { 19051c0b2f7Stbbdev char* active_sem_name = get_active_sem_name(); 19151c0b2f7Stbbdev sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT ); 19251c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 19351c0b2f7Stbbdev delete[] active_sem_name; 19451c0b2f7Stbbdev 19551c0b2f7Stbbdev do { 19651c0b2f7Stbbdev release_thread_sem( my_active_sem ); 19751c0b2f7Stbbdev } while( my_global_thread_count.load(std::memory_order_acquire)!=0 ); 19851c0b2f7Stbbdev } 19951c0b2f7Stbbdev } 20051c0b2f7Stbbdev 20151c0b2f7Stbbdev extern "C" void release_semaphores() { 20251c0b2f7Stbbdev int status = 0; 20351c0b2f7Stbbdev char* sem_name = nullptr; 20451c0b2f7Stbbdev 20551c0b2f7Stbbdev sem_name = get_active_sem_name(); 20651c0b2f7Stbbdev if( sem_name==nullptr ) { 20751c0b2f7Stbbdev runtime_warning("Can not get RML semaphore name"); 20851c0b2f7Stbbdev return; 20951c0b2f7Stbbdev } 21051c0b2f7Stbbdev status = sem_unlink( sem_name ); 21151c0b2f7Stbbdev if( status!=0 ) { 21251c0b2f7Stbbdev if( errno==ENOENT ) { 21351c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */ 21451c0b2f7Stbbdev } else { 21551c0b2f7Stbbdev runtime_warning("Can not release RML semaphore"); 21651c0b2f7Stbbdev return; 21751c0b2f7Stbbdev } 21851c0b2f7Stbbdev } 21951c0b2f7Stbbdev delete[] sem_name; 22051c0b2f7Stbbdev 22151c0b2f7Stbbdev sem_name = get_stop_sem_name(); 22251c0b2f7Stbbdev if( sem_name==nullptr ) { 22351c0b2f7Stbbdev runtime_warning( "Can not get RML semaphore name" ); 22451c0b2f7Stbbdev return; 22551c0b2f7Stbbdev } 22651c0b2f7Stbbdev status = sem_unlink( sem_name ); 22751c0b2f7Stbbdev if( status!=0 ) { 22851c0b2f7Stbbdev if( errno==ENOENT ) { 22951c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */ 23051c0b2f7Stbbdev } else { 23151c0b2f7Stbbdev runtime_warning("Can not release RML semaphore"); 23251c0b2f7Stbbdev return; 23351c0b2f7Stbbdev } 23451c0b2f7Stbbdev } 23551c0b2f7Stbbdev delete[] sem_name; 23651c0b2f7Stbbdev } 23751c0b2f7Stbbdev 23851c0b2f7Stbbdev class ipc_worker: no_copy { 23951c0b2f7Stbbdev protected: 24051c0b2f7Stbbdev //! State in finite-state machine that controls the worker. 24151c0b2f7Stbbdev /** State diagram: 24251c0b2f7Stbbdev /----------stop---\ 24351c0b2f7Stbbdev | ^ | 24451c0b2f7Stbbdev V | | 24551c0b2f7Stbbdev init --> starting --> normal | 24651c0b2f7Stbbdev | | | | 24751c0b2f7Stbbdev | V | | 24851c0b2f7Stbbdev \------> quit <-------/<----/ 24951c0b2f7Stbbdev */ 25051c0b2f7Stbbdev enum state_t { 25151c0b2f7Stbbdev //! *this is initialized 25251c0b2f7Stbbdev st_init, 25351c0b2f7Stbbdev //! *this has associated thread that is starting up. 25451c0b2f7Stbbdev st_starting, 25551c0b2f7Stbbdev //! Associated thread is doing normal life sequence. 25651c0b2f7Stbbdev st_normal, 25751c0b2f7Stbbdev //! Associated thread is stopped but can be started again. 25851c0b2f7Stbbdev st_stop, 25951c0b2f7Stbbdev //! Associated thread has ended normal life sequence and promises to never touch *this again. 26051c0b2f7Stbbdev st_quit 26151c0b2f7Stbbdev }; 26251c0b2f7Stbbdev std::atomic<state_t> my_state; 26351c0b2f7Stbbdev 26451c0b2f7Stbbdev //! Associated server 26551c0b2f7Stbbdev ipc_server& my_server; 26651c0b2f7Stbbdev 26751c0b2f7Stbbdev //! Associated client 26851c0b2f7Stbbdev tbb_client& my_client; 26951c0b2f7Stbbdev 27051c0b2f7Stbbdev //! index used for avoiding the 64K aliasing problem 27151c0b2f7Stbbdev const size_t my_index; 27251c0b2f7Stbbdev 27351c0b2f7Stbbdev //! Monitor for sleeping when there is no work to do. 27451c0b2f7Stbbdev /** The invariant that holds for sleeping workers is: 27551c0b2f7Stbbdev "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */ 27651c0b2f7Stbbdev ipc_thread_monitor my_thread_monitor; 27751c0b2f7Stbbdev 27851c0b2f7Stbbdev //! Handle of the OS thread associated with this worker 27951c0b2f7Stbbdev thread_handle my_handle; 28051c0b2f7Stbbdev 28151c0b2f7Stbbdev //! Link for list of workers that are sleeping or have no associated thread. 28251c0b2f7Stbbdev ipc_worker* my_next; 28351c0b2f7Stbbdev 28451c0b2f7Stbbdev friend class ipc_server; 28551c0b2f7Stbbdev 28651c0b2f7Stbbdev //! Actions executed by the associated thread 28751c0b2f7Stbbdev void run(); 28851c0b2f7Stbbdev 28951c0b2f7Stbbdev //! Wake up associated thread (or launch a thread if there is none) 29051c0b2f7Stbbdev bool wake_or_launch(); 29151c0b2f7Stbbdev 29251c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence termination. 29351c0b2f7Stbbdev void start_shutdown(bool join); 29451c0b2f7Stbbdev 29551c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence stopping. 29651c0b2f7Stbbdev void start_stopping(bool join); 29751c0b2f7Stbbdev 29851c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 29951c0b2f7Stbbdev 30051c0b2f7Stbbdev static void release_handle(thread_handle my_handle, bool join); 30151c0b2f7Stbbdev 30251c0b2f7Stbbdev protected: 30351c0b2f7Stbbdev ipc_worker(ipc_server& server, tbb_client& client, const size_t i) : 30451c0b2f7Stbbdev my_server(server), 30551c0b2f7Stbbdev my_client(client), 30651c0b2f7Stbbdev my_index(i) 30751c0b2f7Stbbdev { 30851c0b2f7Stbbdev my_state = st_init; 30951c0b2f7Stbbdev } 31051c0b2f7Stbbdev }; 31151c0b2f7Stbbdev 31251c0b2f7Stbbdev //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit 31351c0b2f7Stbbdev constexpr static size_t cache_line_sz = 128; 31451c0b2f7Stbbdev 31551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 31651c0b2f7Stbbdev // Suppress overzealous compiler warnings about uninstantiable class 31751c0b2f7Stbbdev #pragma warning(push) 31851c0b2f7Stbbdev #pragma warning(disable:4510 4610) 31951c0b2f7Stbbdev #endif 32051c0b2f7Stbbdev class padded_ipc_worker: public ipc_worker { 32151c0b2f7Stbbdev char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz]; 32251c0b2f7Stbbdev public: 32351c0b2f7Stbbdev padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i) 32451c0b2f7Stbbdev : ipc_worker( server,client,i ) { suppress_unused_warning(pad); } 32551c0b2f7Stbbdev }; 32651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 32751c0b2f7Stbbdev #pragma warning(pop) 32851c0b2f7Stbbdev #endif 32951c0b2f7Stbbdev 33051c0b2f7Stbbdev class ipc_waker : public padded_ipc_worker { 33151c0b2f7Stbbdev private: 33251c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 33351c0b2f7Stbbdev void run(); 33451c0b2f7Stbbdev bool wake_or_launch(); 33551c0b2f7Stbbdev 33651c0b2f7Stbbdev friend class ipc_server; 33751c0b2f7Stbbdev 33851c0b2f7Stbbdev public: 33951c0b2f7Stbbdev ipc_waker(ipc_server& server, tbb_client& client, const size_t i) 34051c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {} 34151c0b2f7Stbbdev }; 34251c0b2f7Stbbdev 34351c0b2f7Stbbdev class ipc_stopper : public padded_ipc_worker { 34451c0b2f7Stbbdev private: 34551c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg); 34651c0b2f7Stbbdev void run(); 34751c0b2f7Stbbdev bool wake_or_launch(); 34851c0b2f7Stbbdev 34951c0b2f7Stbbdev friend class ipc_server; 35051c0b2f7Stbbdev 35151c0b2f7Stbbdev public: 35251c0b2f7Stbbdev ipc_stopper(ipc_server& server, tbb_client& client, const size_t i) 35351c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {} 35451c0b2f7Stbbdev }; 35551c0b2f7Stbbdev 35651c0b2f7Stbbdev class ipc_server: public tbb_server, no_copy { 35751c0b2f7Stbbdev private: 35851c0b2f7Stbbdev tbb_client& my_client; 35951c0b2f7Stbbdev //! Maximum number of threads to be created. 36051c0b2f7Stbbdev /** Threads are created lazily, so maximum might not actually be reached. */ 36151c0b2f7Stbbdev tbb_client::size_type my_n_thread; 36251c0b2f7Stbbdev 36351c0b2f7Stbbdev //! Stack size for each thread. */ 36451c0b2f7Stbbdev const size_t my_stack_size; 36551c0b2f7Stbbdev 36651c0b2f7Stbbdev //! Number of jobs that could use their associated thread minus number of active threads. 36751c0b2f7Stbbdev /** If negative, indicates oversubscription. 36851c0b2f7Stbbdev If positive, indicates that more threads should run. 36951c0b2f7Stbbdev Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex, 37051c0b2f7Stbbdev because raising it impacts the invariant for sleeping threads. */ 37151c0b2f7Stbbdev std::atomic<int> my_slack; 37251c0b2f7Stbbdev 37351c0b2f7Stbbdev //! Counter used to determine when to delete this. 37451c0b2f7Stbbdev std::atomic<int> my_ref_count; 37551c0b2f7Stbbdev 37651c0b2f7Stbbdev padded_ipc_worker* my_thread_array; 37751c0b2f7Stbbdev 37851c0b2f7Stbbdev //! List of workers that are asleep or committed to sleeping until notified by another thread. 37951c0b2f7Stbbdev std::atomic<ipc_worker*> my_asleep_list_root; 38051c0b2f7Stbbdev 38151c0b2f7Stbbdev //! Protects my_asleep_list_root 38251c0b2f7Stbbdev typedef scheduler_mutex_type asleep_list_mutex_type; 38351c0b2f7Stbbdev asleep_list_mutex_type my_asleep_list_mutex; 38451c0b2f7Stbbdev 38551c0b2f7Stbbdev //! Should server wait workers while terminate 38651c0b2f7Stbbdev const bool my_join_workers; 38751c0b2f7Stbbdev 38851c0b2f7Stbbdev //! Service thread for waking of workers 38951c0b2f7Stbbdev ipc_waker* my_waker; 39051c0b2f7Stbbdev 39151c0b2f7Stbbdev //! Service thread to stop threads 39251c0b2f7Stbbdev ipc_stopper* my_stopper; 39351c0b2f7Stbbdev 39451c0b2f7Stbbdev //! Semaphore to account active threads 39551c0b2f7Stbbdev sem_t* my_active_sem; 39651c0b2f7Stbbdev 39751c0b2f7Stbbdev //! Semaphore to account stop threads 39851c0b2f7Stbbdev sem_t* my_stop_sem; 39951c0b2f7Stbbdev 40051c0b2f7Stbbdev #if TBB_USE_ASSERT 40151c0b2f7Stbbdev std::atomic<int> my_net_slack_requests; 40251c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 40351c0b2f7Stbbdev 40451c0b2f7Stbbdev //! Wake up to two sleeping workers, if there are any sleeping. 40551c0b2f7Stbbdev /** The call is used to propagate a chain reaction where each thread wakes up two threads, 40651c0b2f7Stbbdev which in turn each wake up two threads, etc. */ 40751c0b2f7Stbbdev void propagate_chain_reaction() { 40851c0b2f7Stbbdev // First test of a double-check idiom. Second test is inside wake_some(0). 40951c0b2f7Stbbdev if( my_slack.load(std::memory_order_acquire)>0 ) { 41051c0b2f7Stbbdev int active_threads = 0; 41151c0b2f7Stbbdev if( try_get_active_thread() ) { 41251c0b2f7Stbbdev ++active_threads; 41351c0b2f7Stbbdev if( try_get_active_thread() ) { 41451c0b2f7Stbbdev ++active_threads; 41551c0b2f7Stbbdev } 41651c0b2f7Stbbdev wake_some( 0, active_threads ); 41751c0b2f7Stbbdev } 41851c0b2f7Stbbdev } 41951c0b2f7Stbbdev } 42051c0b2f7Stbbdev 42151c0b2f7Stbbdev //! Try to add t to list of sleeping workers 42251c0b2f7Stbbdev bool try_insert_in_asleep_list(ipc_worker& t); 42351c0b2f7Stbbdev 42451c0b2f7Stbbdev //! Try to add t to list of sleeping workers even if there is some work to do 42551c0b2f7Stbbdev bool try_insert_in_asleep_list_forced(ipc_worker& t); 42651c0b2f7Stbbdev 42751c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits. 42851c0b2f7Stbbdev void wake_some(int additional_slack, int active_threads); 42951c0b2f7Stbbdev 43051c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits. 43151c0b2f7Stbbdev void wake_one_forced(int additional_slack); 43251c0b2f7Stbbdev 43351c0b2f7Stbbdev //! Stop one thread from asleep list 43451c0b2f7Stbbdev bool stop_one(); 43551c0b2f7Stbbdev 43651c0b2f7Stbbdev //! Wait for active thread 43751c0b2f7Stbbdev bool wait_active_thread(); 43851c0b2f7Stbbdev 43951c0b2f7Stbbdev //! Try to get active thread 44051c0b2f7Stbbdev bool try_get_active_thread(); 44151c0b2f7Stbbdev 44251c0b2f7Stbbdev //! Release active thread 44351c0b2f7Stbbdev void release_active_thread(); 44451c0b2f7Stbbdev 44551c0b2f7Stbbdev //! Wait for thread to stop 44651c0b2f7Stbbdev bool wait_stop_thread(); 44751c0b2f7Stbbdev 44851c0b2f7Stbbdev //! Add thread to stop list 44951c0b2f7Stbbdev void add_stop_thread(); 45051c0b2f7Stbbdev 45151c0b2f7Stbbdev void remove_server_ref() { 45251c0b2f7Stbbdev if( --my_ref_count==0 ) { 45351c0b2f7Stbbdev my_client.acknowledge_close_connection(); 45451c0b2f7Stbbdev this->~ipc_server(); 45551c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 ); 45651c0b2f7Stbbdev } 45751c0b2f7Stbbdev } 45851c0b2f7Stbbdev 45951c0b2f7Stbbdev friend class ipc_worker; 46051c0b2f7Stbbdev friend class ipc_waker; 46151c0b2f7Stbbdev friend class ipc_stopper; 46251c0b2f7Stbbdev public: 46351c0b2f7Stbbdev ipc_server(tbb_client& client); 46451c0b2f7Stbbdev virtual ~ipc_server(); 46551c0b2f7Stbbdev 46651c0b2f7Stbbdev version_type version() const override { 46751c0b2f7Stbbdev return 0; 46851c0b2f7Stbbdev } 46951c0b2f7Stbbdev 47051c0b2f7Stbbdev void request_close_connection(bool /*exiting*/) override { 47151c0b2f7Stbbdev my_waker->start_shutdown(false); 47251c0b2f7Stbbdev my_stopper->start_shutdown(false); 47351c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i ) 47451c0b2f7Stbbdev my_thread_array[i].start_shutdown( my_join_workers ); 47551c0b2f7Stbbdev remove_server_ref(); 47651c0b2f7Stbbdev } 47751c0b2f7Stbbdev 47851c0b2f7Stbbdev void yield() override {d0::yield();} 47951c0b2f7Stbbdev 48051c0b2f7Stbbdev void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); } 48151c0b2f7Stbbdev 48251c0b2f7Stbbdev unsigned default_concurrency() const override { return my_n_thread - 1; } 48351c0b2f7Stbbdev 48451c0b2f7Stbbdev void adjust_job_count_estimate(int delta) override; 48551c0b2f7Stbbdev 48651c0b2f7Stbbdev #if _WIN32||_WIN64 487b15aabb3Stbbdev void register_external_thread(::rml::server::execution_resource_t&) override {} 488b15aabb3Stbbdev void unregister_external_thread(::rml::server::execution_resource_t) override {} 48951c0b2f7Stbbdev #endif /* _WIN32||_WIN64 */ 49051c0b2f7Stbbdev }; 49151c0b2f7Stbbdev 49251c0b2f7Stbbdev //------------------------------------------------------------------------ 49351c0b2f7Stbbdev // Methods of ipc_worker 49451c0b2f7Stbbdev //------------------------------------------------------------------------ 49551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 49651c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 49751c0b2f7Stbbdev #pragma warning(push) 49851c0b2f7Stbbdev #pragma warning(disable:4189) 49951c0b2f7Stbbdev #endif 50051c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 50151c0b2f7Stbbdev // ensure that stack is properly aligned 50251c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 50351c0b2f7Stbbdev #endif 50451c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) { 50551c0b2f7Stbbdev ipc_worker* self = static_cast<ipc_worker*>(arg); 50651c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 50751c0b2f7Stbbdev self->run(); 50851c0b2f7Stbbdev return 0; 50951c0b2f7Stbbdev } 51051c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 51151c0b2f7Stbbdev #pragma warning(pop) 51251c0b2f7Stbbdev #endif 51351c0b2f7Stbbdev 51451c0b2f7Stbbdev void ipc_worker::release_handle(thread_handle handle, bool join) { 51551c0b2f7Stbbdev if( join ) 51651c0b2f7Stbbdev ipc_thread_monitor::join( handle ); 51751c0b2f7Stbbdev else 51851c0b2f7Stbbdev ipc_thread_monitor::detach_thread( handle ); 51951c0b2f7Stbbdev } 52051c0b2f7Stbbdev 52151c0b2f7Stbbdev void ipc_worker::start_shutdown(bool join) { 52251c0b2f7Stbbdev state_t s = my_state.load(std::memory_order_relaxed);; 52351c0b2f7Stbbdev 52451c0b2f7Stbbdev do { 52551c0b2f7Stbbdev __TBB_ASSERT( s!=st_quit, nullptr ); 52651c0b2f7Stbbdev } while( !my_state.compare_exchange_strong( s, st_quit ) ); 52751c0b2f7Stbbdev if( s==st_normal || s==st_starting ) { 52851c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread. 52951c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack. 53051c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack. 53151c0b2f7Stbbdev my_thread_monitor.notify(); 53251c0b2f7Stbbdev // Do not need release handle in st_init state, 53351c0b2f7Stbbdev // because in this case the thread wasn't started yet. 53451c0b2f7Stbbdev // For st_starting release is done at launch site. 53551c0b2f7Stbbdev if( s==st_normal ) 53651c0b2f7Stbbdev release_handle( my_handle, join ); 53751c0b2f7Stbbdev } 53851c0b2f7Stbbdev } 53951c0b2f7Stbbdev 54051c0b2f7Stbbdev void ipc_worker::start_stopping(bool join) { 54151c0b2f7Stbbdev state_t s = my_state.load(std::memory_order_relaxed);; 54251c0b2f7Stbbdev 54351c0b2f7Stbbdev while( !my_state.compare_exchange_strong( s, st_quit ) ) {}; 54451c0b2f7Stbbdev if( s==st_normal || s==st_starting ) { 54551c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread. 54651c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack. 54751c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack. 54851c0b2f7Stbbdev my_thread_monitor.notify(); 54951c0b2f7Stbbdev // Do not need release handle in st_init state, 55051c0b2f7Stbbdev // because in this case the thread wasn't started yet. 55151c0b2f7Stbbdev // For st_starting release is done at launch site. 55251c0b2f7Stbbdev if( s==st_normal ) 55351c0b2f7Stbbdev release_handle( my_handle, join ); 55451c0b2f7Stbbdev } 55551c0b2f7Stbbdev } 55651c0b2f7Stbbdev 55751c0b2f7Stbbdev void ipc_worker::run() { 55851c0b2f7Stbbdev my_server.propagate_chain_reaction(); 55951c0b2f7Stbbdev 56051c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 56151c0b2f7Stbbdev // which would create race with the launching thread and 56251c0b2f7Stbbdev // complications in handle management on Windows. 56351c0b2f7Stbbdev 56451c0b2f7Stbbdev ::rml::job& j = *my_client.create_one_job(); 56551c0b2f7Stbbdev state_t state = my_state.load(std::memory_order_acquire); 56651c0b2f7Stbbdev while( state!=st_quit && state!=st_stop ) { 56751c0b2f7Stbbdev if( my_server.my_slack>=0 ) { 56851c0b2f7Stbbdev my_client.process(j); 56951c0b2f7Stbbdev } else { 57051c0b2f7Stbbdev // Check/set the invariant for sleeping 57118e06f7fSAlex state = my_state.load(std::memory_order_seq_cst); 57251c0b2f7Stbbdev if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) { 57351c0b2f7Stbbdev if( my_server.my_n_thread > 1 ) my_server.release_active_thread(); 57418e06f7fSAlex my_thread_monitor.wait(); 57551c0b2f7Stbbdev my_server.propagate_chain_reaction(); 57651c0b2f7Stbbdev } 57751c0b2f7Stbbdev } 57818e06f7fSAlex // memory_order_seq_cst to be strictly ordered after thread_monitor::wait 57918e06f7fSAlex state = my_state.load(std::memory_order_seq_cst); 58051c0b2f7Stbbdev } 58151c0b2f7Stbbdev my_client.cleanup(j); 58251c0b2f7Stbbdev 58351c0b2f7Stbbdev my_server.remove_server_ref(); 58451c0b2f7Stbbdev } 58551c0b2f7Stbbdev 58651c0b2f7Stbbdev inline bool ipc_worker::wake_or_launch() { 58751c0b2f7Stbbdev state_t excepted_stop = st_stop, expected_init = st_init; 58851c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) || 58951c0b2f7Stbbdev ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) { 59051c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 59151c0b2f7Stbbdev #if USE_WINTHREAD 59251c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 59351c0b2f7Stbbdev #elif USE_PTHREAD 59451c0b2f7Stbbdev { 59551c0b2f7Stbbdev affinity_helper fpa; 59651c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 59751c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 59851c0b2f7Stbbdev if( my_handle == 0 ) { 59951c0b2f7Stbbdev // Unable to create new thread for process 60051c0b2f7Stbbdev // However, this is expected situation for the use cases of this coordination server 60151c0b2f7Stbbdev state_t s = st_starting; 60251c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_init ); 60351c0b2f7Stbbdev if (st_starting != s) { 60451c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 60551c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 60651c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 60751c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 60851c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 60951c0b2f7Stbbdev } 61051c0b2f7Stbbdev return false; 61151c0b2f7Stbbdev } else { 61251c0b2f7Stbbdev my_server.my_ref_count++; 61351c0b2f7Stbbdev } 61451c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 61551c0b2f7Stbbdev } 61651c0b2f7Stbbdev #endif /* USE_PTHREAD */ 61751c0b2f7Stbbdev state_t s = st_starting; 61851c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_normal ); 61951c0b2f7Stbbdev if( st_starting!=s ) { 62051c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 62151c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 62251c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 62351c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 62451c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 62551c0b2f7Stbbdev } 62651c0b2f7Stbbdev } 62751c0b2f7Stbbdev else { 62851c0b2f7Stbbdev my_thread_monitor.notify(); 62951c0b2f7Stbbdev } 63051c0b2f7Stbbdev 63151c0b2f7Stbbdev return true; 63251c0b2f7Stbbdev } 63351c0b2f7Stbbdev 63451c0b2f7Stbbdev //------------------------------------------------------------------------ 63551c0b2f7Stbbdev // Methods of ipc_waker 63651c0b2f7Stbbdev //------------------------------------------------------------------------ 63751c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 63851c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 63951c0b2f7Stbbdev #pragma warning(push) 64051c0b2f7Stbbdev #pragma warning(disable:4189) 64151c0b2f7Stbbdev #endif 64251c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 64351c0b2f7Stbbdev // ensure that stack is properly aligned 64451c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 64551c0b2f7Stbbdev #endif 64651c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) { 64751c0b2f7Stbbdev ipc_waker* self = static_cast<ipc_waker*>(arg); 64851c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 64951c0b2f7Stbbdev self->run(); 65051c0b2f7Stbbdev return 0; 65151c0b2f7Stbbdev } 65251c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 65351c0b2f7Stbbdev #pragma warning(pop) 65451c0b2f7Stbbdev #endif 65551c0b2f7Stbbdev 65651c0b2f7Stbbdev void ipc_waker::run() { 65751c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 65851c0b2f7Stbbdev // which would create race with the launching thread and 65951c0b2f7Stbbdev // complications in handle management on Windows. 66051c0b2f7Stbbdev 66118e06f7fSAlex // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration 66218e06f7fSAlex while( my_state.load(std::memory_order_seq_cst)!=st_quit ) { 66351c0b2f7Stbbdev bool have_to_sleep = false; 66451c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 66551c0b2f7Stbbdev if( my_server.wait_active_thread() ) { 66651c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) { 66751c0b2f7Stbbdev my_server.wake_some( 0, 1 ); 66851c0b2f7Stbbdev } else { 66951c0b2f7Stbbdev my_server.release_active_thread(); 67051c0b2f7Stbbdev have_to_sleep = true; 67151c0b2f7Stbbdev } 67251c0b2f7Stbbdev } 67351c0b2f7Stbbdev } else { 67451c0b2f7Stbbdev have_to_sleep = true; 67551c0b2f7Stbbdev } 67651c0b2f7Stbbdev if( have_to_sleep ) { 67751c0b2f7Stbbdev // Check/set the invariant for sleeping 67818e06f7fSAlex if( my_server.my_slack.load(std::memory_order_acquire)<0 ) { 67918e06f7fSAlex my_thread_monitor.wait(); 68051c0b2f7Stbbdev } 68151c0b2f7Stbbdev } 68251c0b2f7Stbbdev } 68351c0b2f7Stbbdev 68451c0b2f7Stbbdev my_server.remove_server_ref(); 68551c0b2f7Stbbdev } 68651c0b2f7Stbbdev 68751c0b2f7Stbbdev inline bool ipc_waker::wake_or_launch() { 68851c0b2f7Stbbdev state_t excepted = st_init; 68951c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 69051c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 69151c0b2f7Stbbdev #if USE_WINTHREAD 69251c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 69351c0b2f7Stbbdev #elif USE_PTHREAD 69451c0b2f7Stbbdev { 69551c0b2f7Stbbdev affinity_helper fpa; 69651c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 69751c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 69851c0b2f7Stbbdev if( my_handle == 0 ) { 69951c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() ); 70051c0b2f7Stbbdev state_t s = st_starting; 70151c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init); 70251c0b2f7Stbbdev if (st_starting != s) { 70351c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 70451c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 70551c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 70651c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 70751c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 70851c0b2f7Stbbdev } 70951c0b2f7Stbbdev return false; 71051c0b2f7Stbbdev } else { 71151c0b2f7Stbbdev my_server.my_ref_count++; 71251c0b2f7Stbbdev } 71351c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 71451c0b2f7Stbbdev } 71551c0b2f7Stbbdev #endif /* USE_PTHREAD */ 71651c0b2f7Stbbdev state_t s = st_starting; 71751c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal); 71851c0b2f7Stbbdev if( st_starting!=s ) { 71951c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 72051c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 72151c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 72251c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 72351c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 72451c0b2f7Stbbdev } 72551c0b2f7Stbbdev } 72651c0b2f7Stbbdev else { 72751c0b2f7Stbbdev my_thread_monitor.notify(); 72851c0b2f7Stbbdev } 72951c0b2f7Stbbdev 73051c0b2f7Stbbdev return true; 73151c0b2f7Stbbdev } 73251c0b2f7Stbbdev 73351c0b2f7Stbbdev //------------------------------------------------------------------------ 73451c0b2f7Stbbdev // Methods of ipc_stopper 73551c0b2f7Stbbdev //------------------------------------------------------------------------ 73651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 73751c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 73851c0b2f7Stbbdev #pragma warning(push) 73951c0b2f7Stbbdev #pragma warning(disable:4189) 74051c0b2f7Stbbdev #endif 74151c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 74251c0b2f7Stbbdev // ensure that stack is properly aligned 74351c0b2f7Stbbdev __attribute__((force_align_arg_pointer)) 74451c0b2f7Stbbdev #endif 74551c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) { 74651c0b2f7Stbbdev ipc_stopper* self = static_cast<ipc_stopper*>(arg); 74751c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index ); 74851c0b2f7Stbbdev self->run(); 74951c0b2f7Stbbdev return 0; 75051c0b2f7Stbbdev } 75151c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER) 75251c0b2f7Stbbdev #pragma warning(pop) 75351c0b2f7Stbbdev #endif 75451c0b2f7Stbbdev 75551c0b2f7Stbbdev void ipc_stopper::run() { 75651c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle, 75751c0b2f7Stbbdev // which would create race with the launching thread and 75851c0b2f7Stbbdev // complications in handle management on Windows. 75951c0b2f7Stbbdev 76051c0b2f7Stbbdev while( my_state.load(std::memory_order_acquire)!=st_quit ) { 76151c0b2f7Stbbdev if( my_server.wait_stop_thread() ) { 76251c0b2f7Stbbdev if( my_state.load(std::memory_order_acquire)!=st_quit ) { 76351c0b2f7Stbbdev if( !my_server.stop_one() ) { 76451c0b2f7Stbbdev my_server.add_stop_thread(); 76551c0b2f7Stbbdev tbb::detail::r1::prolonged_pause(); 76651c0b2f7Stbbdev } 76751c0b2f7Stbbdev } 76851c0b2f7Stbbdev } 76951c0b2f7Stbbdev } 77051c0b2f7Stbbdev 77151c0b2f7Stbbdev my_server.remove_server_ref(); 77251c0b2f7Stbbdev } 77351c0b2f7Stbbdev 77451c0b2f7Stbbdev inline bool ipc_stopper::wake_or_launch() { 77551c0b2f7Stbbdev state_t excepted = st_init; 77651c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) { 77751c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread 77851c0b2f7Stbbdev #if USE_WINTHREAD 77951c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 78051c0b2f7Stbbdev #elif USE_PTHREAD 78151c0b2f7Stbbdev { 78251c0b2f7Stbbdev affinity_helper fpa; 78351c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 78451c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 78551c0b2f7Stbbdev if( my_handle == 0 ) { 78651c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() ); 78751c0b2f7Stbbdev state_t s = st_starting; 78851c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init); 78951c0b2f7Stbbdev if (st_starting != s) { 79051c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 79151c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 79251c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 79351c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 79451c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 79551c0b2f7Stbbdev } 79651c0b2f7Stbbdev return false; 79751c0b2f7Stbbdev } else { 79851c0b2f7Stbbdev my_server.my_ref_count++; 79951c0b2f7Stbbdev } 80051c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask. 80151c0b2f7Stbbdev } 80251c0b2f7Stbbdev #endif /* USE_PTHREAD */ 80351c0b2f7Stbbdev state_t s = st_starting; 80451c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal); 80551c0b2f7Stbbdev if( st_starting!=s ) { 80651c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released 80751c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet 80851c0b2f7Stbbdev // at time of transition from st_starting to st_quit. 80951c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr ); 81051c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers ); 81151c0b2f7Stbbdev } 81251c0b2f7Stbbdev } 81351c0b2f7Stbbdev else { 81451c0b2f7Stbbdev my_thread_monitor.notify(); 81551c0b2f7Stbbdev } 81651c0b2f7Stbbdev 81751c0b2f7Stbbdev return true; 81851c0b2f7Stbbdev } 81951c0b2f7Stbbdev 82051c0b2f7Stbbdev //------------------------------------------------------------------------ 82151c0b2f7Stbbdev // Methods of ipc_server 82251c0b2f7Stbbdev //------------------------------------------------------------------------ 82351c0b2f7Stbbdev ipc_server::ipc_server(tbb_client& client) : 82451c0b2f7Stbbdev my_client( client ), 82551c0b2f7Stbbdev my_stack_size( client.min_stack_size() ), 82651c0b2f7Stbbdev my_thread_array(nullptr), 82751c0b2f7Stbbdev my_join_workers(false), 82851c0b2f7Stbbdev my_waker(nullptr), 82951c0b2f7Stbbdev my_stopper(nullptr) 83051c0b2f7Stbbdev { 83151c0b2f7Stbbdev my_ref_count = 1; 83251c0b2f7Stbbdev my_slack = 0; 83351c0b2f7Stbbdev #if TBB_USE_ASSERT 83451c0b2f7Stbbdev my_net_slack_requests = 0; 83551c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 83651c0b2f7Stbbdev my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME); 83751c0b2f7Stbbdev if( my_n_thread==0 ) { 83851c0b2f7Stbbdev my_n_thread = tbb::detail::r1::AvailableHwConcurrency(); 83951c0b2f7Stbbdev __TBB_ASSERT( my_n_thread>0, nullptr ); 84051c0b2f7Stbbdev } 84151c0b2f7Stbbdev 84251c0b2f7Stbbdev my_asleep_list_root = nullptr; 84351c0b2f7Stbbdev my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread ); 84451c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i ) { 84551c0b2f7Stbbdev ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i ); 84651c0b2f7Stbbdev t->my_next = my_asleep_list_root; 84751c0b2f7Stbbdev my_asleep_list_root = t; 84851c0b2f7Stbbdev } 84951c0b2f7Stbbdev 85051c0b2f7Stbbdev my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1); 85151c0b2f7Stbbdev new( my_waker ) ipc_waker( *this, client, my_n_thread ); 85251c0b2f7Stbbdev 85351c0b2f7Stbbdev my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1); 85451c0b2f7Stbbdev new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 ); 85551c0b2f7Stbbdev 85651c0b2f7Stbbdev char* active_sem_name = get_active_sem_name(); 85751c0b2f7Stbbdev my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 ); 85851c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" ); 85951c0b2f7Stbbdev delete[] active_sem_name; 86051c0b2f7Stbbdev 86151c0b2f7Stbbdev char* stop_sem_name = get_stop_sem_name(); 86251c0b2f7Stbbdev my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 ); 86351c0b2f7Stbbdev __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" ); 86451c0b2f7Stbbdev delete[] stop_sem_name; 86551c0b2f7Stbbdev } 86651c0b2f7Stbbdev 86751c0b2f7Stbbdev ipc_server::~ipc_server() { 86851c0b2f7Stbbdev __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr ); 86951c0b2f7Stbbdev 87051c0b2f7Stbbdev for( size_t i=my_n_thread; i--; ) 87151c0b2f7Stbbdev my_thread_array[i].~padded_ipc_worker(); 87251c0b2f7Stbbdev tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread ); 87351c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_thread_array ); 87451c0b2f7Stbbdev 87551c0b2f7Stbbdev my_waker->~ipc_waker(); 87651c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 ); 87751c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_waker ); 87851c0b2f7Stbbdev 87951c0b2f7Stbbdev my_stopper->~ipc_stopper(); 88051c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 ); 88151c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_stopper ); 88251c0b2f7Stbbdev 88351c0b2f7Stbbdev sem_close( my_active_sem ); 88451c0b2f7Stbbdev sem_close( my_stop_sem ); 88551c0b2f7Stbbdev } 88651c0b2f7Stbbdev 88751c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) { 88851c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock; 88951c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) ) 89051c0b2f7Stbbdev return false; 89151c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack, 89251c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up. 89351c0b2f7Stbbdev int k = ++my_slack; 89451c0b2f7Stbbdev if( k<=0 ) { 89551c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 89651c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed); 89751c0b2f7Stbbdev return true; 89851c0b2f7Stbbdev } else { 89951c0b2f7Stbbdev --my_slack; 90051c0b2f7Stbbdev return false; 90151c0b2f7Stbbdev } 90251c0b2f7Stbbdev } 90351c0b2f7Stbbdev 90451c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) { 90551c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock; 90651c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) ) 90751c0b2f7Stbbdev return false; 90851c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack, 90951c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up. 91051c0b2f7Stbbdev ++my_slack; 91151c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 91251c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed); 91351c0b2f7Stbbdev return true; 91451c0b2f7Stbbdev } 91551c0b2f7Stbbdev 91651c0b2f7Stbbdev inline bool ipc_server::wait_active_thread() { 91751c0b2f7Stbbdev if( sem_wait( my_active_sem ) == 0 ) { 91851c0b2f7Stbbdev ++my_global_thread_count; 91951c0b2f7Stbbdev return true; 92051c0b2f7Stbbdev } 92151c0b2f7Stbbdev return false; 92251c0b2f7Stbbdev } 92351c0b2f7Stbbdev 92451c0b2f7Stbbdev inline bool ipc_server::try_get_active_thread() { 92551c0b2f7Stbbdev if( sem_trywait( my_active_sem ) == 0 ) { 92651c0b2f7Stbbdev ++my_global_thread_count; 92751c0b2f7Stbbdev return true; 92851c0b2f7Stbbdev } 92951c0b2f7Stbbdev return false; 93051c0b2f7Stbbdev } 93151c0b2f7Stbbdev 93251c0b2f7Stbbdev inline void ipc_server::release_active_thread() { 93351c0b2f7Stbbdev release_thread_sem( my_active_sem ); 93451c0b2f7Stbbdev } 93551c0b2f7Stbbdev 93651c0b2f7Stbbdev inline bool ipc_server::wait_stop_thread() { 93751c0b2f7Stbbdev struct timespec ts; 93851c0b2f7Stbbdev if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) { 93951c0b2f7Stbbdev ts.tv_sec++; 94051c0b2f7Stbbdev if( sem_timedwait( my_stop_sem, &ts )==0 ) { 94151c0b2f7Stbbdev return true; 94251c0b2f7Stbbdev } 94351c0b2f7Stbbdev } 94451c0b2f7Stbbdev return false; 94551c0b2f7Stbbdev } 94651c0b2f7Stbbdev 94751c0b2f7Stbbdev inline void ipc_server::add_stop_thread() { 94851c0b2f7Stbbdev sem_post( my_stop_sem ); 94951c0b2f7Stbbdev } 95051c0b2f7Stbbdev 95151c0b2f7Stbbdev void ipc_server::wake_some( int additional_slack, int active_threads ) { 95251c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr ); 95351c0b2f7Stbbdev ipc_worker* wakee[2]; 95451c0b2f7Stbbdev ipc_worker **w = wakee; 95551c0b2f7Stbbdev { 95651c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 95751c0b2f7Stbbdev while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) { 95851c0b2f7Stbbdev if( additional_slack>0 ) { 95951c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 96051c0b2f7Stbbdev break; 96151c0b2f7Stbbdev --additional_slack; 96251c0b2f7Stbbdev } else { 96351c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack 96451c0b2f7Stbbdev int old; 96551c0b2f7Stbbdev do { 96651c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed); 96751c0b2f7Stbbdev if( old<=0 ) goto done; 96851c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 96951c0b2f7Stbbdev } 97051c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack 97151c0b2f7Stbbdev my_asleep_list_root.store( 97251c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 97351c0b2f7Stbbdev std::memory_order_relaxed 97451c0b2f7Stbbdev ); 97551c0b2f7Stbbdev --active_threads; 97651c0b2f7Stbbdev } 97751c0b2f7Stbbdev if( additional_slack ) { 97851c0b2f7Stbbdev // Contribute our unused slack to my_slack. 97951c0b2f7Stbbdev my_slack += additional_slack; 98051c0b2f7Stbbdev } 98151c0b2f7Stbbdev } 98251c0b2f7Stbbdev done: 98351c0b2f7Stbbdev while( w>wakee ) { 98451c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) { 98551c0b2f7Stbbdev add_stop_thread(); 98651c0b2f7Stbbdev do { 98751c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) ); 98851c0b2f7Stbbdev release_active_thread(); 98951c0b2f7Stbbdev } 99051c0b2f7Stbbdev } 99151c0b2f7Stbbdev while( active_threads ) { 99251c0b2f7Stbbdev release_active_thread(); 99351c0b2f7Stbbdev --active_threads; 99451c0b2f7Stbbdev } 99551c0b2f7Stbbdev } 99651c0b2f7Stbbdev 99751c0b2f7Stbbdev void ipc_server::wake_one_forced( int additional_slack ) { 99851c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr ); 99951c0b2f7Stbbdev ipc_worker* wakee[1]; 100051c0b2f7Stbbdev ipc_worker **w = wakee; 100151c0b2f7Stbbdev { 100251c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 100351c0b2f7Stbbdev while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) { 100451c0b2f7Stbbdev if( additional_slack>0 ) { 100551c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply 100651c0b2f7Stbbdev break; 100751c0b2f7Stbbdev --additional_slack; 100851c0b2f7Stbbdev } else { 100951c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack 101051c0b2f7Stbbdev int old; 101151c0b2f7Stbbdev do { 101251c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed); 101351c0b2f7Stbbdev if( old<=0 ) goto done; 101451c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) ); 101551c0b2f7Stbbdev } 101651c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack 101751c0b2f7Stbbdev my_asleep_list_root.store( 101851c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next, 101951c0b2f7Stbbdev std::memory_order_relaxed); 102051c0b2f7Stbbdev } 102151c0b2f7Stbbdev if( additional_slack ) { 102251c0b2f7Stbbdev // Contribute our unused slack to my_slack. 102351c0b2f7Stbbdev my_slack += additional_slack; 102451c0b2f7Stbbdev } 102551c0b2f7Stbbdev } 102651c0b2f7Stbbdev done: 102751c0b2f7Stbbdev while( w>wakee ) { 102851c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) { 102951c0b2f7Stbbdev add_stop_thread(); 103051c0b2f7Stbbdev do { 103151c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) ); 103251c0b2f7Stbbdev } 103351c0b2f7Stbbdev } 103451c0b2f7Stbbdev } 103551c0b2f7Stbbdev 103651c0b2f7Stbbdev bool ipc_server::stop_one() { 103751c0b2f7Stbbdev ipc_worker* current = nullptr; 103851c0b2f7Stbbdev ipc_worker* next = nullptr; 103951c0b2f7Stbbdev { 104051c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 104151c0b2f7Stbbdev if( my_asleep_list_root.load(std::memory_order_relaxed) ) { 104251c0b2f7Stbbdev current = my_asleep_list_root.load(std::memory_order_relaxed); 104351c0b2f7Stbbdev if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 104451c0b2f7Stbbdev next = current->my_next; 104551c0b2f7Stbbdev while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) { 104651c0b2f7Stbbdev current = next; 104751c0b2f7Stbbdev next = current->my_next; 104851c0b2f7Stbbdev } 104951c0b2f7Stbbdev current->start_stopping( my_join_workers ); 105051c0b2f7Stbbdev return true; 105151c0b2f7Stbbdev } 105251c0b2f7Stbbdev } 105351c0b2f7Stbbdev } 105451c0b2f7Stbbdev return false; 105551c0b2f7Stbbdev } 105651c0b2f7Stbbdev 105751c0b2f7Stbbdev void ipc_server::adjust_job_count_estimate( int delta ) { 105851c0b2f7Stbbdev #if TBB_USE_ASSERT 105951c0b2f7Stbbdev my_net_slack_requests+=delta; 106051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */ 106151c0b2f7Stbbdev if( my_n_thread > 1 ) { 106251c0b2f7Stbbdev if( delta<0 ) { 106351c0b2f7Stbbdev my_slack+=delta; 106451c0b2f7Stbbdev } else if( delta>0 ) { 106551c0b2f7Stbbdev int active_threads = 0; 106651c0b2f7Stbbdev if( try_get_active_thread() ) { 106751c0b2f7Stbbdev ++active_threads; 106851c0b2f7Stbbdev if( try_get_active_thread() ) { 106951c0b2f7Stbbdev ++active_threads; 107051c0b2f7Stbbdev } 107151c0b2f7Stbbdev } 107251c0b2f7Stbbdev wake_some( delta, active_threads ); 107351c0b2f7Stbbdev 107451c0b2f7Stbbdev if( !my_waker->wake_or_launch() ) { 107551c0b2f7Stbbdev add_stop_thread(); 107651c0b2f7Stbbdev } 107751c0b2f7Stbbdev if( !my_stopper->wake_or_launch() ) { 107851c0b2f7Stbbdev add_stop_thread(); 107951c0b2f7Stbbdev } 108051c0b2f7Stbbdev } 108151c0b2f7Stbbdev } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one 108251c0b2f7Stbbdev if( delta<0 ) { 108351c0b2f7Stbbdev my_slack += delta; 108451c0b2f7Stbbdev } else { 108551c0b2f7Stbbdev wake_one_forced( delta ); 108651c0b2f7Stbbdev } 108751c0b2f7Stbbdev } 108851c0b2f7Stbbdev } 108951c0b2f7Stbbdev 109051c0b2f7Stbbdev //------------------------------------------------------------------------ 109151c0b2f7Stbbdev // RML factory methods 109251c0b2f7Stbbdev //------------------------------------------------------------------------ 109351c0b2f7Stbbdev 109451c0b2f7Stbbdev #if USE_PTHREAD 109551c0b2f7Stbbdev 109651c0b2f7Stbbdev static tbb_client* my_global_client = nullptr; 109751c0b2f7Stbbdev static tbb_server* my_global_server = nullptr; 109851c0b2f7Stbbdev 109951c0b2f7Stbbdev void rml_atexit() { 110051c0b2f7Stbbdev release_resources(); 110151c0b2f7Stbbdev } 110251c0b2f7Stbbdev 110351c0b2f7Stbbdev void rml_atfork_child() { 110451c0b2f7Stbbdev if( my_global_server!=nullptr && my_global_client!=nullptr ) { 110551c0b2f7Stbbdev ipc_server* server = static_cast<ipc_server*>( my_global_server ); 110651c0b2f7Stbbdev server->~ipc_server(); 110751c0b2f7Stbbdev // memset( server, 0, sizeof(ipc_server) ); 110851c0b2f7Stbbdev new( server ) ipc_server( *my_global_client ); 110951c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child ); 111051c0b2f7Stbbdev atexit( rml_atexit ); 111151c0b2f7Stbbdev } 111251c0b2f7Stbbdev } 111351c0b2f7Stbbdev 111451c0b2f7Stbbdev #endif /* USE_PTHREAD */ 111551c0b2f7Stbbdev 111651c0b2f7Stbbdev extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) { 111751c0b2f7Stbbdev server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client); 111851c0b2f7Stbbdev #if USE_PTHREAD 111951c0b2f7Stbbdev my_global_client = &client; 112051c0b2f7Stbbdev my_global_server = server; 112151c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child ); 112251c0b2f7Stbbdev atexit( rml_atexit ); 112351c0b2f7Stbbdev #endif /* USE_PTHREAD */ 112451c0b2f7Stbbdev if( getenv( "RML_DEBUG" ) ) { 112551c0b2f7Stbbdev runtime_warning("IPC server is started"); 112651c0b2f7Stbbdev } 112751c0b2f7Stbbdev return tbb_factory::st_success; 112851c0b2f7Stbbdev } 112951c0b2f7Stbbdev 113051c0b2f7Stbbdev extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) { 113151c0b2f7Stbbdev } 113251c0b2f7Stbbdev 113351c0b2f7Stbbdev } // namespace rml 113451c0b2f7Stbbdev } // namespace detail 113551c0b2f7Stbbdev 113651c0b2f7Stbbdev } // namespace tbb 1137