151c0b2f7Stbbdev /*
2690aaf49SSergey Zheltov Copyright (c) 2017-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev #include <atomic>
1851c0b2f7Stbbdev #include <cstring>
1951c0b2f7Stbbdev #include <cstdlib>
2051c0b2f7Stbbdev
2151c0b2f7Stbbdev #include "../../src/tbb/rml_tbb.h"
2251c0b2f7Stbbdev #include "../../src/tbb/rml_thread_monitor.h"
2351c0b2f7Stbbdev #include "../../src/tbb/scheduler_common.h"
2451c0b2f7Stbbdev #include "../../src/tbb/governor.h"
2551c0b2f7Stbbdev #include "../../src/tbb/misc.h"
2651c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h"
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev #include "ipc_utils.h"
2951c0b2f7Stbbdev
3051c0b2f7Stbbdev #include <fcntl.h>
3151c0b2f7Stbbdev #include <stdlib.h>
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev namespace rml {
3451c0b2f7Stbbdev namespace internal {
3551c0b2f7Stbbdev
3651c0b2f7Stbbdev static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE";
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev typedef versioned_object::version_type version_type;
3951c0b2f7Stbbdev
__RML_open_factory(factory & f,version_type &,version_type)4051c0b2f7Stbbdev extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) {
4151c0b2f7Stbbdev if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) {
4251c0b2f7Stbbdev return factory::st_incompatible;
4351c0b2f7Stbbdev }
4451c0b2f7Stbbdev
4551c0b2f7Stbbdev // Hack to keep this library from being closed
4651c0b2f7Stbbdev static std::atomic<bool> one_time_flag{false};
4751c0b2f7Stbbdev bool expected = false;
4851c0b2f7Stbbdev
4951c0b2f7Stbbdev if( one_time_flag.compare_exchange_strong(expected, true) ) {
5051c0b2f7Stbbdev __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr );
5151c0b2f7Stbbdev #if _WIN32||_WIN64
5251c0b2f7Stbbdev f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
5351c0b2f7Stbbdev #else
5451c0b2f7Stbbdev f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
5551c0b2f7Stbbdev #endif
5651c0b2f7Stbbdev }
5751c0b2f7Stbbdev // End of hack
5851c0b2f7Stbbdev
5951c0b2f7Stbbdev return factory::st_success;
6051c0b2f7Stbbdev }
6151c0b2f7Stbbdev
__RML_close_factory(factory &)6251c0b2f7Stbbdev extern "C" void __RML_close_factory(factory& /*f*/) {
6351c0b2f7Stbbdev }
6451c0b2f7Stbbdev
6551c0b2f7Stbbdev class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor {
6651c0b2f7Stbbdev public:
ipc_thread_monitor()6751c0b2f7Stbbdev ipc_thread_monitor() : thread_monitor() {}
6851c0b2f7Stbbdev
6951c0b2f7Stbbdev #if USE_WINTHREAD
7051c0b2f7Stbbdev #elif USE_PTHREAD
7151c0b2f7Stbbdev static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size);
7251c0b2f7Stbbdev #endif
7351c0b2f7Stbbdev };
7451c0b2f7Stbbdev
7551c0b2f7Stbbdev #if USE_WINTHREAD
7651c0b2f7Stbbdev #elif USE_PTHREAD
launch(void * (* thread_routine)(void *),void * arg,size_t stack_size)7751c0b2f7Stbbdev inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) {
7851c0b2f7Stbbdev pthread_attr_t s;
7951c0b2f7Stbbdev if( pthread_attr_init( &s ) ) return 0;
8051c0b2f7Stbbdev if( stack_size>0 ) {
8151c0b2f7Stbbdev if( pthread_attr_setstacksize( &s, stack_size ) ) return 0;
8251c0b2f7Stbbdev }
8351c0b2f7Stbbdev pthread_t handle;
8451c0b2f7Stbbdev if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0;
8551c0b2f7Stbbdev if( pthread_attr_destroy( &s ) ) return 0;
8651c0b2f7Stbbdev return handle;
8751c0b2f7Stbbdev }
8851c0b2f7Stbbdev #endif
8951c0b2f7Stbbdev
9051c0b2f7Stbbdev }} // rml::internal
9151c0b2f7Stbbdev
9251c0b2f7Stbbdev using rml::internal::ipc_thread_monitor;
9351c0b2f7Stbbdev using tbb::internal::rml::get_shared_name;
9451c0b2f7Stbbdev
9551c0b2f7Stbbdev namespace tbb {
9651c0b2f7Stbbdev namespace detail {
9751c0b2f7Stbbdev
9851c0b2f7Stbbdev namespace r1 {
terminate_on_exception()9951c0b2f7Stbbdev bool terminate_on_exception() {
10051c0b2f7Stbbdev return false;
10151c0b2f7Stbbdev }
10251c0b2f7Stbbdev }
10351c0b2f7Stbbdev
10451c0b2f7Stbbdev namespace rml {
10551c0b2f7Stbbdev
10651c0b2f7Stbbdev typedef ipc_thread_monitor::handle_type thread_handle;
10751c0b2f7Stbbdev
10851c0b2f7Stbbdev class ipc_server;
10951c0b2f7Stbbdev
11051c0b2f7Stbbdev static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS";
11151c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active";
11251c0b2f7Stbbdev static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop";
11351c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE";
11451c0b2f7Stbbdev static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE";
11551c0b2f7Stbbdev static const mode_t IPC_SEM_MODE = 0660;
11651c0b2f7Stbbdev
11751c0b2f7Stbbdev static std::atomic<int> my_global_thread_count;
11851c0b2f7Stbbdev using tbb_client = tbb::detail::r1::rml::tbb_client;
11951c0b2f7Stbbdev using tbb_server = tbb::detail::r1::rml::tbb_server;
12051c0b2f7Stbbdev using tbb_factory = tbb::detail::r1::rml::tbb_factory;
12151c0b2f7Stbbdev
12251c0b2f7Stbbdev using tbb::detail::r1::runtime_warning;
12351c0b2f7Stbbdev
get_sem_name(const char * name,const char * prefix)12451c0b2f7Stbbdev char* get_sem_name(const char* name, const char* prefix) {
12551c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr);
12651c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr);
12751c0b2f7Stbbdev char* value = std::getenv(name);
12851c0b2f7Stbbdev std::size_t len = value == nullptr ? 0 : std::strlen(value);
12951c0b2f7Stbbdev if (len > 0) {
13051c0b2f7Stbbdev // TODO: consider returning the original string instead of the copied string.
13151c0b2f7Stbbdev char* sem_name = new char[len + 1];
13251c0b2f7Stbbdev __TBB_ASSERT(sem_name != nullptr, nullptr);
13351c0b2f7Stbbdev std::strncpy(sem_name, value, len+1);
13451c0b2f7Stbbdev __TBB_ASSERT(sem_name[len] == 0, nullptr);
13551c0b2f7Stbbdev return sem_name;
13651c0b2f7Stbbdev } else {
13751c0b2f7Stbbdev return get_shared_name(prefix);
13851c0b2f7Stbbdev }
13951c0b2f7Stbbdev }
14051c0b2f7Stbbdev
get_active_sem_name()14151c0b2f7Stbbdev char* get_active_sem_name() {
14251c0b2f7Stbbdev return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev
get_stop_sem_name()14551c0b2f7Stbbdev char* get_stop_sem_name() {
14651c0b2f7Stbbdev return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
14751c0b2f7Stbbdev }
14851c0b2f7Stbbdev
release_thread_sem(sem_t * my_sem)14951c0b2f7Stbbdev static void release_thread_sem(sem_t* my_sem) {
15051c0b2f7Stbbdev int old = my_global_thread_count.load(std::memory_order_relaxed);
15151c0b2f7Stbbdev do {
15251c0b2f7Stbbdev if( old<=0 ) return;
15351c0b2f7Stbbdev } while( !my_global_thread_count.compare_exchange_strong(old, old-1) );
15451c0b2f7Stbbdev if( old>0 ) {
15551c0b2f7Stbbdev sem_post( my_sem );
15651c0b2f7Stbbdev }
15751c0b2f7Stbbdev }
15851c0b2f7Stbbdev
set_sem_name(const char * name,const char * prefix)15951c0b2f7Stbbdev void set_sem_name(const char* name, const char* prefix) {
16051c0b2f7Stbbdev __TBB_ASSERT(name != nullptr, nullptr);
16151c0b2f7Stbbdev __TBB_ASSERT(prefix != nullptr, nullptr);
16251c0b2f7Stbbdev const char* postfix = "_XXXXXX";
16351c0b2f7Stbbdev std::size_t plen = std::strlen(prefix);
16451c0b2f7Stbbdev std::size_t xlen = std::strlen(postfix);
16551c0b2f7Stbbdev char* templ = new char[plen + xlen + 1];
16651c0b2f7Stbbdev __TBB_ASSERT(templ != nullptr, nullptr);
16751c0b2f7Stbbdev strncpy(templ, prefix, plen+1);
16851c0b2f7Stbbdev __TBB_ASSERT(templ[plen] == 0, nullptr);
16951c0b2f7Stbbdev strncat(templ, postfix, xlen + 1);
17051c0b2f7Stbbdev __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr);
17151c0b2f7Stbbdev // TODO: consider using mkstemp instead of mktemp.
17251c0b2f7Stbbdev char* sem_name = mktemp(templ);
17351c0b2f7Stbbdev if (sem_name != nullptr) {
17451c0b2f7Stbbdev int status = setenv(name, sem_name, /*overwrite*/ 1);
17551c0b2f7Stbbdev __TBB_ASSERT_EX(status == 0, nullptr);
17651c0b2f7Stbbdev }
17751c0b2f7Stbbdev delete[] templ;
17851c0b2f7Stbbdev }
17951c0b2f7Stbbdev
set_active_sem_name()18051c0b2f7Stbbdev extern "C" void set_active_sem_name() {
18151c0b2f7Stbbdev set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
18251c0b2f7Stbbdev }
18351c0b2f7Stbbdev
set_stop_sem_name()18451c0b2f7Stbbdev extern "C" void set_stop_sem_name() {
18551c0b2f7Stbbdev set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev
release_resources()18851c0b2f7Stbbdev extern "C" void release_resources() {
18951c0b2f7Stbbdev if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) {
19051c0b2f7Stbbdev char* active_sem_name = get_active_sem_name();
19151c0b2f7Stbbdev sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT );
19251c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
19351c0b2f7Stbbdev delete[] active_sem_name;
19451c0b2f7Stbbdev
19551c0b2f7Stbbdev do {
19651c0b2f7Stbbdev release_thread_sem( my_active_sem );
19751c0b2f7Stbbdev } while( my_global_thread_count.load(std::memory_order_acquire)!=0 );
19851c0b2f7Stbbdev }
19951c0b2f7Stbbdev }
20051c0b2f7Stbbdev
release_semaphores()20151c0b2f7Stbbdev extern "C" void release_semaphores() {
20251c0b2f7Stbbdev int status = 0;
20351c0b2f7Stbbdev char* sem_name = nullptr;
20451c0b2f7Stbbdev
20551c0b2f7Stbbdev sem_name = get_active_sem_name();
20651c0b2f7Stbbdev if( sem_name==nullptr ) {
20751c0b2f7Stbbdev runtime_warning("Can not get RML semaphore name");
20851c0b2f7Stbbdev return;
20951c0b2f7Stbbdev }
21051c0b2f7Stbbdev status = sem_unlink( sem_name );
21151c0b2f7Stbbdev if( status!=0 ) {
21251c0b2f7Stbbdev if( errno==ENOENT ) {
21351c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */
21451c0b2f7Stbbdev } else {
21551c0b2f7Stbbdev runtime_warning("Can not release RML semaphore");
21651c0b2f7Stbbdev return;
21751c0b2f7Stbbdev }
21851c0b2f7Stbbdev }
21951c0b2f7Stbbdev delete[] sem_name;
22051c0b2f7Stbbdev
22151c0b2f7Stbbdev sem_name = get_stop_sem_name();
22251c0b2f7Stbbdev if( sem_name==nullptr ) {
22351c0b2f7Stbbdev runtime_warning( "Can not get RML semaphore name" );
22451c0b2f7Stbbdev return;
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev status = sem_unlink( sem_name );
22751c0b2f7Stbbdev if( status!=0 ) {
22851c0b2f7Stbbdev if( errno==ENOENT ) {
22951c0b2f7Stbbdev /* There is no semaphore with the given name, nothing to do */
23051c0b2f7Stbbdev } else {
23151c0b2f7Stbbdev runtime_warning("Can not release RML semaphore");
23251c0b2f7Stbbdev return;
23351c0b2f7Stbbdev }
23451c0b2f7Stbbdev }
23551c0b2f7Stbbdev delete[] sem_name;
23651c0b2f7Stbbdev }
23751c0b2f7Stbbdev
23851c0b2f7Stbbdev class ipc_worker: no_copy {
23951c0b2f7Stbbdev protected:
24051c0b2f7Stbbdev //! State in finite-state machine that controls the worker.
24151c0b2f7Stbbdev /** State diagram:
24251c0b2f7Stbbdev /----------stop---\
24351c0b2f7Stbbdev | ^ |
24451c0b2f7Stbbdev V | |
24551c0b2f7Stbbdev init --> starting --> normal |
24651c0b2f7Stbbdev | | | |
24751c0b2f7Stbbdev | V | |
24851c0b2f7Stbbdev \------> quit <-------/<----/
24951c0b2f7Stbbdev */
25051c0b2f7Stbbdev enum state_t {
25151c0b2f7Stbbdev //! *this is initialized
25251c0b2f7Stbbdev st_init,
25351c0b2f7Stbbdev //! *this has associated thread that is starting up.
25451c0b2f7Stbbdev st_starting,
25551c0b2f7Stbbdev //! Associated thread is doing normal life sequence.
25651c0b2f7Stbbdev st_normal,
25751c0b2f7Stbbdev //! Associated thread is stopped but can be started again.
25851c0b2f7Stbbdev st_stop,
25951c0b2f7Stbbdev //! Associated thread has ended normal life sequence and promises to never touch *this again.
26051c0b2f7Stbbdev st_quit
26151c0b2f7Stbbdev };
26251c0b2f7Stbbdev std::atomic<state_t> my_state;
26351c0b2f7Stbbdev
26451c0b2f7Stbbdev //! Associated server
26551c0b2f7Stbbdev ipc_server& my_server;
26651c0b2f7Stbbdev
26751c0b2f7Stbbdev //! Associated client
26851c0b2f7Stbbdev tbb_client& my_client;
26951c0b2f7Stbbdev
27051c0b2f7Stbbdev //! index used for avoiding the 64K aliasing problem
27151c0b2f7Stbbdev const size_t my_index;
27251c0b2f7Stbbdev
27351c0b2f7Stbbdev //! Monitor for sleeping when there is no work to do.
27451c0b2f7Stbbdev /** The invariant that holds for sleeping workers is:
27551c0b2f7Stbbdev "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
27651c0b2f7Stbbdev ipc_thread_monitor my_thread_monitor;
27751c0b2f7Stbbdev
27851c0b2f7Stbbdev //! Handle of the OS thread associated with this worker
27951c0b2f7Stbbdev thread_handle my_handle;
28051c0b2f7Stbbdev
28151c0b2f7Stbbdev //! Link for list of workers that are sleeping or have no associated thread.
28251c0b2f7Stbbdev ipc_worker* my_next;
28351c0b2f7Stbbdev
28451c0b2f7Stbbdev friend class ipc_server;
28551c0b2f7Stbbdev
28651c0b2f7Stbbdev //! Actions executed by the associated thread
28751c0b2f7Stbbdev void run();
28851c0b2f7Stbbdev
28951c0b2f7Stbbdev //! Wake up associated thread (or launch a thread if there is none)
29051c0b2f7Stbbdev bool wake_or_launch();
29151c0b2f7Stbbdev
29251c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence termination.
29351c0b2f7Stbbdev void start_shutdown(bool join);
29451c0b2f7Stbbdev
29551c0b2f7Stbbdev //! Called by a thread (usually not the associated thread) to commence stopping.
29651c0b2f7Stbbdev void start_stopping(bool join);
29751c0b2f7Stbbdev
29851c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
29951c0b2f7Stbbdev
30051c0b2f7Stbbdev static void release_handle(thread_handle my_handle, bool join);
30151c0b2f7Stbbdev
30251c0b2f7Stbbdev protected:
ipc_worker(ipc_server & server,tbb_client & client,const size_t i)30351c0b2f7Stbbdev ipc_worker(ipc_server& server, tbb_client& client, const size_t i) :
30451c0b2f7Stbbdev my_server(server),
30551c0b2f7Stbbdev my_client(client),
30651c0b2f7Stbbdev my_index(i)
30751c0b2f7Stbbdev {
30851c0b2f7Stbbdev my_state = st_init;
30951c0b2f7Stbbdev }
31051c0b2f7Stbbdev };
31151c0b2f7Stbbdev
31251c0b2f7Stbbdev //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit
31351c0b2f7Stbbdev constexpr static size_t cache_line_sz = 128;
31451c0b2f7Stbbdev
31551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
31651c0b2f7Stbbdev // Suppress overzealous compiler warnings about uninstantiable class
31751c0b2f7Stbbdev #pragma warning(push)
31851c0b2f7Stbbdev #pragma warning(disable:4510 4610)
31951c0b2f7Stbbdev #endif
32051c0b2f7Stbbdev class padded_ipc_worker: public ipc_worker {
32151c0b2f7Stbbdev char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz];
32251c0b2f7Stbbdev public:
padded_ipc_worker(ipc_server & server,tbb_client & client,const size_t i)32351c0b2f7Stbbdev padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i)
32451c0b2f7Stbbdev : ipc_worker( server,client,i ) { suppress_unused_warning(pad); }
32551c0b2f7Stbbdev };
32651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
32751c0b2f7Stbbdev #pragma warning(pop)
32851c0b2f7Stbbdev #endif
32951c0b2f7Stbbdev
33051c0b2f7Stbbdev class ipc_waker : public padded_ipc_worker {
33151c0b2f7Stbbdev private:
33251c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
33351c0b2f7Stbbdev void run();
33451c0b2f7Stbbdev bool wake_or_launch();
33551c0b2f7Stbbdev
33651c0b2f7Stbbdev friend class ipc_server;
33751c0b2f7Stbbdev
33851c0b2f7Stbbdev public:
ipc_waker(ipc_server & server,tbb_client & client,const size_t i)33951c0b2f7Stbbdev ipc_waker(ipc_server& server, tbb_client& client, const size_t i)
34051c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {}
34151c0b2f7Stbbdev };
34251c0b2f7Stbbdev
34351c0b2f7Stbbdev class ipc_stopper : public padded_ipc_worker {
34451c0b2f7Stbbdev private:
34551c0b2f7Stbbdev static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
34651c0b2f7Stbbdev void run();
34751c0b2f7Stbbdev bool wake_or_launch();
34851c0b2f7Stbbdev
34951c0b2f7Stbbdev friend class ipc_server;
35051c0b2f7Stbbdev
35151c0b2f7Stbbdev public:
ipc_stopper(ipc_server & server,tbb_client & client,const size_t i)35251c0b2f7Stbbdev ipc_stopper(ipc_server& server, tbb_client& client, const size_t i)
35351c0b2f7Stbbdev : padded_ipc_worker( server, client, i ) {}
35451c0b2f7Stbbdev };
35551c0b2f7Stbbdev
35651c0b2f7Stbbdev class ipc_server: public tbb_server, no_copy {
35751c0b2f7Stbbdev private:
35851c0b2f7Stbbdev tbb_client& my_client;
35951c0b2f7Stbbdev //! Maximum number of threads to be created.
36051c0b2f7Stbbdev /** Threads are created lazily, so maximum might not actually be reached. */
36151c0b2f7Stbbdev tbb_client::size_type my_n_thread;
36251c0b2f7Stbbdev
36351c0b2f7Stbbdev //! Stack size for each thread. */
36451c0b2f7Stbbdev const size_t my_stack_size;
36551c0b2f7Stbbdev
36651c0b2f7Stbbdev //! Number of jobs that could use their associated thread minus number of active threads.
36751c0b2f7Stbbdev /** If negative, indicates oversubscription.
36851c0b2f7Stbbdev If positive, indicates that more threads should run.
36951c0b2f7Stbbdev Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
37051c0b2f7Stbbdev because raising it impacts the invariant for sleeping threads. */
37151c0b2f7Stbbdev std::atomic<int> my_slack;
37251c0b2f7Stbbdev
37351c0b2f7Stbbdev //! Counter used to determine when to delete this.
37451c0b2f7Stbbdev std::atomic<int> my_ref_count;
37551c0b2f7Stbbdev
37651c0b2f7Stbbdev padded_ipc_worker* my_thread_array;
37751c0b2f7Stbbdev
37851c0b2f7Stbbdev //! List of workers that are asleep or committed to sleeping until notified by another thread.
37951c0b2f7Stbbdev std::atomic<ipc_worker*> my_asleep_list_root;
38051c0b2f7Stbbdev
38151c0b2f7Stbbdev //! Protects my_asleep_list_root
38251c0b2f7Stbbdev typedef scheduler_mutex_type asleep_list_mutex_type;
38351c0b2f7Stbbdev asleep_list_mutex_type my_asleep_list_mutex;
38451c0b2f7Stbbdev
38551c0b2f7Stbbdev //! Should server wait workers while terminate
38651c0b2f7Stbbdev const bool my_join_workers;
38751c0b2f7Stbbdev
38851c0b2f7Stbbdev //! Service thread for waking of workers
38951c0b2f7Stbbdev ipc_waker* my_waker;
39051c0b2f7Stbbdev
39151c0b2f7Stbbdev //! Service thread to stop threads
39251c0b2f7Stbbdev ipc_stopper* my_stopper;
39351c0b2f7Stbbdev
39451c0b2f7Stbbdev //! Semaphore to account active threads
39551c0b2f7Stbbdev sem_t* my_active_sem;
39651c0b2f7Stbbdev
39751c0b2f7Stbbdev //! Semaphore to account stop threads
39851c0b2f7Stbbdev sem_t* my_stop_sem;
39951c0b2f7Stbbdev
40051c0b2f7Stbbdev #if TBB_USE_ASSERT
40151c0b2f7Stbbdev std::atomic<int> my_net_slack_requests;
40251c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
40351c0b2f7Stbbdev
40451c0b2f7Stbbdev //! Wake up to two sleeping workers, if there are any sleeping.
40551c0b2f7Stbbdev /** The call is used to propagate a chain reaction where each thread wakes up two threads,
40651c0b2f7Stbbdev which in turn each wake up two threads, etc. */
propagate_chain_reaction()40751c0b2f7Stbbdev void propagate_chain_reaction() {
40851c0b2f7Stbbdev // First test of a double-check idiom. Second test is inside wake_some(0).
40951c0b2f7Stbbdev if( my_slack.load(std::memory_order_acquire)>0 ) {
41051c0b2f7Stbbdev int active_threads = 0;
41151c0b2f7Stbbdev if( try_get_active_thread() ) {
41251c0b2f7Stbbdev ++active_threads;
41351c0b2f7Stbbdev if( try_get_active_thread() ) {
41451c0b2f7Stbbdev ++active_threads;
41551c0b2f7Stbbdev }
41651c0b2f7Stbbdev wake_some( 0, active_threads );
41751c0b2f7Stbbdev }
41851c0b2f7Stbbdev }
41951c0b2f7Stbbdev }
42051c0b2f7Stbbdev
42151c0b2f7Stbbdev //! Try to add t to list of sleeping workers
42251c0b2f7Stbbdev bool try_insert_in_asleep_list(ipc_worker& t);
42351c0b2f7Stbbdev
42451c0b2f7Stbbdev //! Try to add t to list of sleeping workers even if there is some work to do
42551c0b2f7Stbbdev bool try_insert_in_asleep_list_forced(ipc_worker& t);
42651c0b2f7Stbbdev
42751c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
42851c0b2f7Stbbdev void wake_some(int additional_slack, int active_threads);
42951c0b2f7Stbbdev
43051c0b2f7Stbbdev //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits.
43151c0b2f7Stbbdev void wake_one_forced(int additional_slack);
43251c0b2f7Stbbdev
43351c0b2f7Stbbdev //! Stop one thread from asleep list
43451c0b2f7Stbbdev bool stop_one();
43551c0b2f7Stbbdev
43651c0b2f7Stbbdev //! Wait for active thread
43751c0b2f7Stbbdev bool wait_active_thread();
43851c0b2f7Stbbdev
43951c0b2f7Stbbdev //! Try to get active thread
44051c0b2f7Stbbdev bool try_get_active_thread();
44151c0b2f7Stbbdev
44251c0b2f7Stbbdev //! Release active thread
44351c0b2f7Stbbdev void release_active_thread();
44451c0b2f7Stbbdev
44551c0b2f7Stbbdev //! Wait for thread to stop
44651c0b2f7Stbbdev bool wait_stop_thread();
44751c0b2f7Stbbdev
44851c0b2f7Stbbdev //! Add thread to stop list
44951c0b2f7Stbbdev void add_stop_thread();
45051c0b2f7Stbbdev
remove_server_ref()45151c0b2f7Stbbdev void remove_server_ref() {
45251c0b2f7Stbbdev if( --my_ref_count==0 ) {
45351c0b2f7Stbbdev my_client.acknowledge_close_connection();
45451c0b2f7Stbbdev this->~ipc_server();
45551c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 );
45651c0b2f7Stbbdev }
45751c0b2f7Stbbdev }
45851c0b2f7Stbbdev
45951c0b2f7Stbbdev friend class ipc_worker;
46051c0b2f7Stbbdev friend class ipc_waker;
46151c0b2f7Stbbdev friend class ipc_stopper;
46251c0b2f7Stbbdev public:
46351c0b2f7Stbbdev ipc_server(tbb_client& client);
46451c0b2f7Stbbdev virtual ~ipc_server();
46551c0b2f7Stbbdev
version() const46651c0b2f7Stbbdev version_type version() const override {
46751c0b2f7Stbbdev return 0;
46851c0b2f7Stbbdev }
46951c0b2f7Stbbdev
request_close_connection(bool)47051c0b2f7Stbbdev void request_close_connection(bool /*exiting*/) override {
47151c0b2f7Stbbdev my_waker->start_shutdown(false);
47251c0b2f7Stbbdev my_stopper->start_shutdown(false);
47351c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i )
47451c0b2f7Stbbdev my_thread_array[i].start_shutdown( my_join_workers );
47551c0b2f7Stbbdev remove_server_ref();
47651c0b2f7Stbbdev }
47751c0b2f7Stbbdev
yield()47851c0b2f7Stbbdev void yield() override {d0::yield();}
47951c0b2f7Stbbdev
independent_thread_number_changed(int)48051c0b2f7Stbbdev void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); }
48151c0b2f7Stbbdev
default_concurrency() const48251c0b2f7Stbbdev unsigned default_concurrency() const override { return my_n_thread - 1; }
48351c0b2f7Stbbdev
48451c0b2f7Stbbdev void adjust_job_count_estimate(int delta) override;
48551c0b2f7Stbbdev
48651c0b2f7Stbbdev #if _WIN32||_WIN64
register_external_thread(::rml::server::execution_resource_t &)487b15aabb3Stbbdev void register_external_thread(::rml::server::execution_resource_t&) override {}
unregister_external_thread(::rml::server::execution_resource_t)488b15aabb3Stbbdev void unregister_external_thread(::rml::server::execution_resource_t) override {}
48951c0b2f7Stbbdev #endif /* _WIN32||_WIN64 */
49051c0b2f7Stbbdev };
49151c0b2f7Stbbdev
49251c0b2f7Stbbdev //------------------------------------------------------------------------
49351c0b2f7Stbbdev // Methods of ipc_worker
49451c0b2f7Stbbdev //------------------------------------------------------------------------
49551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
49651c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
49751c0b2f7Stbbdev #pragma warning(push)
49851c0b2f7Stbbdev #pragma warning(disable:4189)
49951c0b2f7Stbbdev #endif
50051c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
50151c0b2f7Stbbdev // ensure that stack is properly aligned
50251c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
50351c0b2f7Stbbdev #endif
thread_routine(void * arg)50451c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) {
50551c0b2f7Stbbdev ipc_worker* self = static_cast<ipc_worker*>(arg);
50651c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index );
50751c0b2f7Stbbdev self->run();
50851c0b2f7Stbbdev return 0;
50951c0b2f7Stbbdev }
51051c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
51151c0b2f7Stbbdev #pragma warning(pop)
51251c0b2f7Stbbdev #endif
51351c0b2f7Stbbdev
release_handle(thread_handle handle,bool join)51451c0b2f7Stbbdev void ipc_worker::release_handle(thread_handle handle, bool join) {
51551c0b2f7Stbbdev if( join )
51651c0b2f7Stbbdev ipc_thread_monitor::join( handle );
51751c0b2f7Stbbdev else
51851c0b2f7Stbbdev ipc_thread_monitor::detach_thread( handle );
51951c0b2f7Stbbdev }
52051c0b2f7Stbbdev
start_shutdown(bool join)52151c0b2f7Stbbdev void ipc_worker::start_shutdown(bool join) {
522*5e91b2c0SVladislav Shchapov state_t s = my_state.load(std::memory_order_relaxed);
52351c0b2f7Stbbdev
52451c0b2f7Stbbdev do {
52551c0b2f7Stbbdev __TBB_ASSERT( s!=st_quit, nullptr );
52651c0b2f7Stbbdev } while( !my_state.compare_exchange_strong( s, st_quit ) );
52751c0b2f7Stbbdev if( s==st_normal || s==st_starting ) {
52851c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread.
52951c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack.
53051c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack.
53151c0b2f7Stbbdev my_thread_monitor.notify();
53251c0b2f7Stbbdev // Do not need release handle in st_init state,
53351c0b2f7Stbbdev // because in this case the thread wasn't started yet.
53451c0b2f7Stbbdev // For st_starting release is done at launch site.
53551c0b2f7Stbbdev if( s==st_normal )
53651c0b2f7Stbbdev release_handle( my_handle, join );
53751c0b2f7Stbbdev }
53851c0b2f7Stbbdev }
53951c0b2f7Stbbdev
start_stopping(bool join)54051c0b2f7Stbbdev void ipc_worker::start_stopping(bool join) {
541*5e91b2c0SVladislav Shchapov state_t s = my_state.load(std::memory_order_relaxed);
54251c0b2f7Stbbdev
54351c0b2f7Stbbdev while( !my_state.compare_exchange_strong( s, st_quit ) ) {};
54451c0b2f7Stbbdev if( s==st_normal || s==st_starting ) {
54551c0b2f7Stbbdev // May have invalidated invariant for sleeping, so wake up the thread.
54651c0b2f7Stbbdev // Note that the notify() here occurs without maintaining invariants for my_slack.
54751c0b2f7Stbbdev // It does not matter, because my_state==st_quit overrides checking of my_slack.
54851c0b2f7Stbbdev my_thread_monitor.notify();
54951c0b2f7Stbbdev // Do not need release handle in st_init state,
55051c0b2f7Stbbdev // because in this case the thread wasn't started yet.
55151c0b2f7Stbbdev // For st_starting release is done at launch site.
55251c0b2f7Stbbdev if( s==st_normal )
55351c0b2f7Stbbdev release_handle( my_handle, join );
55451c0b2f7Stbbdev }
55551c0b2f7Stbbdev }
55651c0b2f7Stbbdev
run()55751c0b2f7Stbbdev void ipc_worker::run() {
55851c0b2f7Stbbdev my_server.propagate_chain_reaction();
55951c0b2f7Stbbdev
56051c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle,
56151c0b2f7Stbbdev // which would create race with the launching thread and
56251c0b2f7Stbbdev // complications in handle management on Windows.
56351c0b2f7Stbbdev
56451c0b2f7Stbbdev ::rml::job& j = *my_client.create_one_job();
56551c0b2f7Stbbdev state_t state = my_state.load(std::memory_order_acquire);
56651c0b2f7Stbbdev while( state!=st_quit && state!=st_stop ) {
56751c0b2f7Stbbdev if( my_server.my_slack>=0 ) {
56851c0b2f7Stbbdev my_client.process(j);
56951c0b2f7Stbbdev } else {
57051c0b2f7Stbbdev // Check/set the invariant for sleeping
57118e06f7fSAlex state = my_state.load(std::memory_order_seq_cst);
57251c0b2f7Stbbdev if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
57351c0b2f7Stbbdev if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
57418e06f7fSAlex my_thread_monitor.wait();
57551c0b2f7Stbbdev my_server.propagate_chain_reaction();
57651c0b2f7Stbbdev }
57751c0b2f7Stbbdev }
57818e06f7fSAlex // memory_order_seq_cst to be strictly ordered after thread_monitor::wait
57918e06f7fSAlex state = my_state.load(std::memory_order_seq_cst);
58051c0b2f7Stbbdev }
58151c0b2f7Stbbdev my_client.cleanup(j);
58251c0b2f7Stbbdev
58351c0b2f7Stbbdev my_server.remove_server_ref();
58451c0b2f7Stbbdev }
58551c0b2f7Stbbdev
wake_or_launch()58651c0b2f7Stbbdev inline bool ipc_worker::wake_or_launch() {
58751c0b2f7Stbbdev state_t excepted_stop = st_stop, expected_init = st_init;
58851c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) ||
58951c0b2f7Stbbdev ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) {
59051c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread
59151c0b2f7Stbbdev #if USE_WINTHREAD
59251c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
59351c0b2f7Stbbdev #elif USE_PTHREAD
59451c0b2f7Stbbdev {
59551c0b2f7Stbbdev affinity_helper fpa;
59651c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true );
59751c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
59851c0b2f7Stbbdev if( my_handle == 0 ) {
59951c0b2f7Stbbdev // Unable to create new thread for process
60051c0b2f7Stbbdev // However, this is expected situation for the use cases of this coordination server
60151c0b2f7Stbbdev state_t s = st_starting;
60251c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_init );
60351c0b2f7Stbbdev if (st_starting != s) {
60451c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released
60551c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet
60651c0b2f7Stbbdev // at time of transition from st_starting to st_quit.
60751c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr );
60851c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers );
60951c0b2f7Stbbdev }
61051c0b2f7Stbbdev return false;
61151c0b2f7Stbbdev } else {
61251c0b2f7Stbbdev my_server.my_ref_count++;
61351c0b2f7Stbbdev }
61451c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask.
61551c0b2f7Stbbdev }
61651c0b2f7Stbbdev #endif /* USE_PTHREAD */
61751c0b2f7Stbbdev state_t s = st_starting;
61851c0b2f7Stbbdev my_state.compare_exchange_strong( s, st_normal );
61951c0b2f7Stbbdev if( st_starting!=s ) {
62051c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released
62151c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet
62251c0b2f7Stbbdev // at time of transition from st_starting to st_quit.
62351c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr );
62451c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers );
62551c0b2f7Stbbdev }
62651c0b2f7Stbbdev }
62751c0b2f7Stbbdev else {
62851c0b2f7Stbbdev my_thread_monitor.notify();
62951c0b2f7Stbbdev }
63051c0b2f7Stbbdev
63151c0b2f7Stbbdev return true;
63251c0b2f7Stbbdev }
63351c0b2f7Stbbdev
63451c0b2f7Stbbdev //------------------------------------------------------------------------
63551c0b2f7Stbbdev // Methods of ipc_waker
63651c0b2f7Stbbdev //------------------------------------------------------------------------
63751c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
63851c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
63951c0b2f7Stbbdev #pragma warning(push)
64051c0b2f7Stbbdev #pragma warning(disable:4189)
64151c0b2f7Stbbdev #endif
64251c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
64351c0b2f7Stbbdev // ensure that stack is properly aligned
64451c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
64551c0b2f7Stbbdev #endif
thread_routine(void * arg)64651c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) {
64751c0b2f7Stbbdev ipc_waker* self = static_cast<ipc_waker*>(arg);
64851c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index );
64951c0b2f7Stbbdev self->run();
65051c0b2f7Stbbdev return 0;
65151c0b2f7Stbbdev }
65251c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
65351c0b2f7Stbbdev #pragma warning(pop)
65451c0b2f7Stbbdev #endif
65551c0b2f7Stbbdev
run()65651c0b2f7Stbbdev void ipc_waker::run() {
65751c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle,
65851c0b2f7Stbbdev // which would create race with the launching thread and
65951c0b2f7Stbbdev // complications in handle management on Windows.
66051c0b2f7Stbbdev
66118e06f7fSAlex // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
66218e06f7fSAlex while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
66351c0b2f7Stbbdev bool have_to_sleep = false;
66451c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
66551c0b2f7Stbbdev if( my_server.wait_active_thread() ) {
66651c0b2f7Stbbdev if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
66751c0b2f7Stbbdev my_server.wake_some( 0, 1 );
66851c0b2f7Stbbdev } else {
66951c0b2f7Stbbdev my_server.release_active_thread();
67051c0b2f7Stbbdev have_to_sleep = true;
67151c0b2f7Stbbdev }
67251c0b2f7Stbbdev }
67351c0b2f7Stbbdev } else {
67451c0b2f7Stbbdev have_to_sleep = true;
67551c0b2f7Stbbdev }
67651c0b2f7Stbbdev if( have_to_sleep ) {
67751c0b2f7Stbbdev // Check/set the invariant for sleeping
67818e06f7fSAlex if( my_server.my_slack.load(std::memory_order_acquire)<0 ) {
67918e06f7fSAlex my_thread_monitor.wait();
68051c0b2f7Stbbdev }
68151c0b2f7Stbbdev }
68251c0b2f7Stbbdev }
68351c0b2f7Stbbdev
68451c0b2f7Stbbdev my_server.remove_server_ref();
68551c0b2f7Stbbdev }
68651c0b2f7Stbbdev
wake_or_launch()68751c0b2f7Stbbdev inline bool ipc_waker::wake_or_launch() {
68851c0b2f7Stbbdev state_t excepted = st_init;
68951c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
69051c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread
69151c0b2f7Stbbdev #if USE_WINTHREAD
69251c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
69351c0b2f7Stbbdev #elif USE_PTHREAD
69451c0b2f7Stbbdev {
69551c0b2f7Stbbdev affinity_helper fpa;
69651c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true );
69751c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
69851c0b2f7Stbbdev if( my_handle == 0 ) {
69951c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() );
70051c0b2f7Stbbdev state_t s = st_starting;
70151c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init);
70251c0b2f7Stbbdev if (st_starting != s) {
70351c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released
70451c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet
70551c0b2f7Stbbdev // at time of transition from st_starting to st_quit.
70651c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr );
70751c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers );
70851c0b2f7Stbbdev }
70951c0b2f7Stbbdev return false;
71051c0b2f7Stbbdev } else {
71151c0b2f7Stbbdev my_server.my_ref_count++;
71251c0b2f7Stbbdev }
71351c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask.
71451c0b2f7Stbbdev }
71551c0b2f7Stbbdev #endif /* USE_PTHREAD */
71651c0b2f7Stbbdev state_t s = st_starting;
71751c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal);
71851c0b2f7Stbbdev if( st_starting!=s ) {
71951c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released
72051c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet
72151c0b2f7Stbbdev // at time of transition from st_starting to st_quit.
72251c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr );
72351c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers );
72451c0b2f7Stbbdev }
72551c0b2f7Stbbdev }
72651c0b2f7Stbbdev else {
72751c0b2f7Stbbdev my_thread_monitor.notify();
72851c0b2f7Stbbdev }
72951c0b2f7Stbbdev
73051c0b2f7Stbbdev return true;
73151c0b2f7Stbbdev }
73251c0b2f7Stbbdev
73351c0b2f7Stbbdev //------------------------------------------------------------------------
73451c0b2f7Stbbdev // Methods of ipc_stopper
73551c0b2f7Stbbdev //------------------------------------------------------------------------
73651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
73751c0b2f7Stbbdev // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
73851c0b2f7Stbbdev #pragma warning(push)
73951c0b2f7Stbbdev #pragma warning(disable:4189)
74051c0b2f7Stbbdev #endif
74151c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
74251c0b2f7Stbbdev // ensure that stack is properly aligned
74351c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
74451c0b2f7Stbbdev #endif
thread_routine(void * arg)74551c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) {
74651c0b2f7Stbbdev ipc_stopper* self = static_cast<ipc_stopper*>(arg);
74751c0b2f7Stbbdev AVOID_64K_ALIASING( self->my_index );
74851c0b2f7Stbbdev self->run();
74951c0b2f7Stbbdev return 0;
75051c0b2f7Stbbdev }
75151c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
75251c0b2f7Stbbdev #pragma warning(pop)
75351c0b2f7Stbbdev #endif
75451c0b2f7Stbbdev
run()75551c0b2f7Stbbdev void ipc_stopper::run() {
75651c0b2f7Stbbdev // Transiting to st_normal here would require setting my_handle,
75751c0b2f7Stbbdev // which would create race with the launching thread and
75851c0b2f7Stbbdev // complications in handle management on Windows.
75951c0b2f7Stbbdev
76051c0b2f7Stbbdev while( my_state.load(std::memory_order_acquire)!=st_quit ) {
76151c0b2f7Stbbdev if( my_server.wait_stop_thread() ) {
76251c0b2f7Stbbdev if( my_state.load(std::memory_order_acquire)!=st_quit ) {
76351c0b2f7Stbbdev if( !my_server.stop_one() ) {
76451c0b2f7Stbbdev my_server.add_stop_thread();
76551c0b2f7Stbbdev tbb::detail::r1::prolonged_pause();
76651c0b2f7Stbbdev }
76751c0b2f7Stbbdev }
76851c0b2f7Stbbdev }
76951c0b2f7Stbbdev }
77051c0b2f7Stbbdev
77151c0b2f7Stbbdev my_server.remove_server_ref();
77251c0b2f7Stbbdev }
77351c0b2f7Stbbdev
wake_or_launch()77451c0b2f7Stbbdev inline bool ipc_stopper::wake_or_launch() {
77551c0b2f7Stbbdev state_t excepted = st_init;
77651c0b2f7Stbbdev if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
77751c0b2f7Stbbdev // after this point, remove_server_ref() must be done by created thread
77851c0b2f7Stbbdev #if USE_WINTHREAD
77951c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
78051c0b2f7Stbbdev #elif USE_PTHREAD
78151c0b2f7Stbbdev {
78251c0b2f7Stbbdev affinity_helper fpa;
78351c0b2f7Stbbdev fpa.protect_affinity_mask( /*restore_process_mask=*/true );
78451c0b2f7Stbbdev my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
78551c0b2f7Stbbdev if( my_handle == 0 ) {
78651c0b2f7Stbbdev runtime_warning( "Unable to create new thread for process %d", getpid() );
78751c0b2f7Stbbdev state_t s = st_starting;
78851c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_init);
78951c0b2f7Stbbdev if (st_starting != s) {
79051c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released
79151c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet
79251c0b2f7Stbbdev // at time of transition from st_starting to st_quit.
79351c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr );
79451c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers );
79551c0b2f7Stbbdev }
79651c0b2f7Stbbdev return false;
79751c0b2f7Stbbdev } else {
79851c0b2f7Stbbdev my_server.my_ref_count++;
79951c0b2f7Stbbdev }
80051c0b2f7Stbbdev // Implicit destruction of fpa resets original affinity mask.
80151c0b2f7Stbbdev }
80251c0b2f7Stbbdev #endif /* USE_PTHREAD */
80351c0b2f7Stbbdev state_t s = st_starting;
80451c0b2f7Stbbdev my_state.compare_exchange_strong(s, st_normal);
80551c0b2f7Stbbdev if( st_starting!=s ) {
80651c0b2f7Stbbdev // Do shutdown during startup. my_handle can't be released
80751c0b2f7Stbbdev // by start_shutdown, because my_handle value might be not set yet
80851c0b2f7Stbbdev // at time of transition from st_starting to st_quit.
80951c0b2f7Stbbdev __TBB_ASSERT( s==st_quit, nullptr );
81051c0b2f7Stbbdev release_handle( my_handle, my_server.my_join_workers );
81151c0b2f7Stbbdev }
81251c0b2f7Stbbdev }
81351c0b2f7Stbbdev else {
81451c0b2f7Stbbdev my_thread_monitor.notify();
81551c0b2f7Stbbdev }
81651c0b2f7Stbbdev
81751c0b2f7Stbbdev return true;
81851c0b2f7Stbbdev }
81951c0b2f7Stbbdev
82051c0b2f7Stbbdev //------------------------------------------------------------------------
82151c0b2f7Stbbdev // Methods of ipc_server
82251c0b2f7Stbbdev //------------------------------------------------------------------------
ipc_server(tbb_client & client)82351c0b2f7Stbbdev ipc_server::ipc_server(tbb_client& client) :
82451c0b2f7Stbbdev my_client( client ),
82551c0b2f7Stbbdev my_stack_size( client.min_stack_size() ),
82651c0b2f7Stbbdev my_thread_array(nullptr),
82751c0b2f7Stbbdev my_join_workers(false),
82851c0b2f7Stbbdev my_waker(nullptr),
82951c0b2f7Stbbdev my_stopper(nullptr)
83051c0b2f7Stbbdev {
83151c0b2f7Stbbdev my_ref_count = 1;
83251c0b2f7Stbbdev my_slack = 0;
83351c0b2f7Stbbdev #if TBB_USE_ASSERT
83451c0b2f7Stbbdev my_net_slack_requests = 0;
83551c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
83651c0b2f7Stbbdev my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME);
83751c0b2f7Stbbdev if( my_n_thread==0 ) {
83851c0b2f7Stbbdev my_n_thread = tbb::detail::r1::AvailableHwConcurrency();
83951c0b2f7Stbbdev __TBB_ASSERT( my_n_thread>0, nullptr );
84051c0b2f7Stbbdev }
84151c0b2f7Stbbdev
84251c0b2f7Stbbdev my_asleep_list_root = nullptr;
84351c0b2f7Stbbdev my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread );
84451c0b2f7Stbbdev for( size_t i=0; i<my_n_thread; ++i ) {
84551c0b2f7Stbbdev ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i );
84651c0b2f7Stbbdev t->my_next = my_asleep_list_root;
84751c0b2f7Stbbdev my_asleep_list_root = t;
84851c0b2f7Stbbdev }
84951c0b2f7Stbbdev
85051c0b2f7Stbbdev my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1);
85151c0b2f7Stbbdev new( my_waker ) ipc_waker( *this, client, my_n_thread );
85251c0b2f7Stbbdev
85351c0b2f7Stbbdev my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1);
85451c0b2f7Stbbdev new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 );
85551c0b2f7Stbbdev
85651c0b2f7Stbbdev char* active_sem_name = get_active_sem_name();
85751c0b2f7Stbbdev my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 );
85851c0b2f7Stbbdev __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
85951c0b2f7Stbbdev delete[] active_sem_name;
86051c0b2f7Stbbdev
86151c0b2f7Stbbdev char* stop_sem_name = get_stop_sem_name();
86251c0b2f7Stbbdev my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 );
86351c0b2f7Stbbdev __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" );
86451c0b2f7Stbbdev delete[] stop_sem_name;
86551c0b2f7Stbbdev }
86651c0b2f7Stbbdev
~ipc_server()86751c0b2f7Stbbdev ipc_server::~ipc_server() {
86851c0b2f7Stbbdev __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr );
86951c0b2f7Stbbdev
87051c0b2f7Stbbdev for( size_t i=my_n_thread; i--; )
87151c0b2f7Stbbdev my_thread_array[i].~padded_ipc_worker();
87251c0b2f7Stbbdev tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread );
87351c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_thread_array );
87451c0b2f7Stbbdev
87551c0b2f7Stbbdev my_waker->~ipc_waker();
87651c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 );
87751c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_waker );
87851c0b2f7Stbbdev
87951c0b2f7Stbbdev my_stopper->~ipc_stopper();
88051c0b2f7Stbbdev tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 );
88151c0b2f7Stbbdev tbb::detail::d0::poison_pointer( my_stopper );
88251c0b2f7Stbbdev
88351c0b2f7Stbbdev sem_close( my_active_sem );
88451c0b2f7Stbbdev sem_close( my_stop_sem );
88551c0b2f7Stbbdev }
88651c0b2f7Stbbdev
try_insert_in_asleep_list(ipc_worker & t)88751c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) {
88851c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock;
88951c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) )
89051c0b2f7Stbbdev return false;
89151c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack,
89251c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up.
89351c0b2f7Stbbdev int k = ++my_slack;
89451c0b2f7Stbbdev if( k<=0 ) {
89551c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
89651c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed);
89751c0b2f7Stbbdev return true;
89851c0b2f7Stbbdev } else {
89951c0b2f7Stbbdev --my_slack;
90051c0b2f7Stbbdev return false;
90151c0b2f7Stbbdev }
90251c0b2f7Stbbdev }
90351c0b2f7Stbbdev
try_insert_in_asleep_list_forced(ipc_worker & t)90451c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) {
90551c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock;
90651c0b2f7Stbbdev if( !lock.try_acquire( my_asleep_list_mutex ) )
90751c0b2f7Stbbdev return false;
90851c0b2f7Stbbdev // Contribute to slack under lock so that if another takes that unit of slack,
90951c0b2f7Stbbdev // it sees us sleeping on the list and wakes us up.
91051c0b2f7Stbbdev ++my_slack;
91151c0b2f7Stbbdev t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
91251c0b2f7Stbbdev my_asleep_list_root.store(&t, std::memory_order_relaxed);
91351c0b2f7Stbbdev return true;
91451c0b2f7Stbbdev }
91551c0b2f7Stbbdev
wait_active_thread()91651c0b2f7Stbbdev inline bool ipc_server::wait_active_thread() {
91751c0b2f7Stbbdev if( sem_wait( my_active_sem ) == 0 ) {
91851c0b2f7Stbbdev ++my_global_thread_count;
91951c0b2f7Stbbdev return true;
92051c0b2f7Stbbdev }
92151c0b2f7Stbbdev return false;
92251c0b2f7Stbbdev }
92351c0b2f7Stbbdev
try_get_active_thread()92451c0b2f7Stbbdev inline bool ipc_server::try_get_active_thread() {
92551c0b2f7Stbbdev if( sem_trywait( my_active_sem ) == 0 ) {
92651c0b2f7Stbbdev ++my_global_thread_count;
92751c0b2f7Stbbdev return true;
92851c0b2f7Stbbdev }
92951c0b2f7Stbbdev return false;
93051c0b2f7Stbbdev }
93151c0b2f7Stbbdev
release_active_thread()93251c0b2f7Stbbdev inline void ipc_server::release_active_thread() {
93351c0b2f7Stbbdev release_thread_sem( my_active_sem );
93451c0b2f7Stbbdev }
93551c0b2f7Stbbdev
wait_stop_thread()93651c0b2f7Stbbdev inline bool ipc_server::wait_stop_thread() {
93751c0b2f7Stbbdev struct timespec ts;
93851c0b2f7Stbbdev if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) {
93951c0b2f7Stbbdev ts.tv_sec++;
94051c0b2f7Stbbdev if( sem_timedwait( my_stop_sem, &ts )==0 ) {
94151c0b2f7Stbbdev return true;
94251c0b2f7Stbbdev }
94351c0b2f7Stbbdev }
94451c0b2f7Stbbdev return false;
94551c0b2f7Stbbdev }
94651c0b2f7Stbbdev
add_stop_thread()94751c0b2f7Stbbdev inline void ipc_server::add_stop_thread() {
94851c0b2f7Stbbdev sem_post( my_stop_sem );
94951c0b2f7Stbbdev }
95051c0b2f7Stbbdev
wake_some(int additional_slack,int active_threads)95151c0b2f7Stbbdev void ipc_server::wake_some( int additional_slack, int active_threads ) {
95251c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr );
95351c0b2f7Stbbdev ipc_worker* wakee[2];
95451c0b2f7Stbbdev ipc_worker **w = wakee;
95551c0b2f7Stbbdev {
95651c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
95751c0b2f7Stbbdev while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) {
95851c0b2f7Stbbdev if( additional_slack>0 ) {
95951c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
96051c0b2f7Stbbdev break;
96151c0b2f7Stbbdev --additional_slack;
96251c0b2f7Stbbdev } else {
96351c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack
96451c0b2f7Stbbdev int old;
96551c0b2f7Stbbdev do {
96651c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed);
96751c0b2f7Stbbdev if( old<=0 ) goto done;
96851c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) );
96951c0b2f7Stbbdev }
97051c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack
97151c0b2f7Stbbdev my_asleep_list_root.store(
97251c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
97351c0b2f7Stbbdev std::memory_order_relaxed
97451c0b2f7Stbbdev );
97551c0b2f7Stbbdev --active_threads;
97651c0b2f7Stbbdev }
97751c0b2f7Stbbdev if( additional_slack ) {
97851c0b2f7Stbbdev // Contribute our unused slack to my_slack.
97951c0b2f7Stbbdev my_slack += additional_slack;
98051c0b2f7Stbbdev }
98151c0b2f7Stbbdev }
98251c0b2f7Stbbdev done:
98351c0b2f7Stbbdev while( w>wakee ) {
98451c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) {
98551c0b2f7Stbbdev add_stop_thread();
98651c0b2f7Stbbdev do {
98751c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) );
98851c0b2f7Stbbdev release_active_thread();
98951c0b2f7Stbbdev }
99051c0b2f7Stbbdev }
99151c0b2f7Stbbdev while( active_threads ) {
99251c0b2f7Stbbdev release_active_thread();
99351c0b2f7Stbbdev --active_threads;
99451c0b2f7Stbbdev }
99551c0b2f7Stbbdev }
99651c0b2f7Stbbdev
wake_one_forced(int additional_slack)99751c0b2f7Stbbdev void ipc_server::wake_one_forced( int additional_slack ) {
99851c0b2f7Stbbdev __TBB_ASSERT( additional_slack>=0, nullptr );
99951c0b2f7Stbbdev ipc_worker* wakee[1];
100051c0b2f7Stbbdev ipc_worker **w = wakee;
100151c0b2f7Stbbdev {
100251c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
100351c0b2f7Stbbdev while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) {
100451c0b2f7Stbbdev if( additional_slack>0 ) {
100551c0b2f7Stbbdev if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
100651c0b2f7Stbbdev break;
100751c0b2f7Stbbdev --additional_slack;
100851c0b2f7Stbbdev } else {
100951c0b2f7Stbbdev // Chain reaction; Try to claim unit of slack
101051c0b2f7Stbbdev int old;
101151c0b2f7Stbbdev do {
101251c0b2f7Stbbdev old = my_slack.load(std::memory_order_relaxed);
101351c0b2f7Stbbdev if( old<=0 ) goto done;
101451c0b2f7Stbbdev } while( !my_slack.compare_exchange_strong( old, old-1 ) );
101551c0b2f7Stbbdev }
101651c0b2f7Stbbdev // Pop sleeping worker to combine with claimed unit of slack
101751c0b2f7Stbbdev my_asleep_list_root.store(
101851c0b2f7Stbbdev (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
101951c0b2f7Stbbdev std::memory_order_relaxed);
102051c0b2f7Stbbdev }
102151c0b2f7Stbbdev if( additional_slack ) {
102251c0b2f7Stbbdev // Contribute our unused slack to my_slack.
102351c0b2f7Stbbdev my_slack += additional_slack;
102451c0b2f7Stbbdev }
102551c0b2f7Stbbdev }
102651c0b2f7Stbbdev done:
102751c0b2f7Stbbdev while( w>wakee ) {
102851c0b2f7Stbbdev if( !(*--w)->wake_or_launch() ) {
102951c0b2f7Stbbdev add_stop_thread();
103051c0b2f7Stbbdev do {
103151c0b2f7Stbbdev } while( !try_insert_in_asleep_list_forced(**w) );
103251c0b2f7Stbbdev }
103351c0b2f7Stbbdev }
103451c0b2f7Stbbdev }
103551c0b2f7Stbbdev
stop_one()103651c0b2f7Stbbdev bool ipc_server::stop_one() {
103751c0b2f7Stbbdev ipc_worker* current = nullptr;
103851c0b2f7Stbbdev ipc_worker* next = nullptr;
103951c0b2f7Stbbdev {
104051c0b2f7Stbbdev asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
104151c0b2f7Stbbdev if( my_asleep_list_root.load(std::memory_order_relaxed) ) {
104251c0b2f7Stbbdev current = my_asleep_list_root.load(std::memory_order_relaxed);
104351c0b2f7Stbbdev if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
104451c0b2f7Stbbdev next = current->my_next;
104551c0b2f7Stbbdev while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
104651c0b2f7Stbbdev current = next;
104751c0b2f7Stbbdev next = current->my_next;
104851c0b2f7Stbbdev }
104951c0b2f7Stbbdev current->start_stopping( my_join_workers );
105051c0b2f7Stbbdev return true;
105151c0b2f7Stbbdev }
105251c0b2f7Stbbdev }
105351c0b2f7Stbbdev }
105451c0b2f7Stbbdev return false;
105551c0b2f7Stbbdev }
105651c0b2f7Stbbdev
adjust_job_count_estimate(int delta)105751c0b2f7Stbbdev void ipc_server::adjust_job_count_estimate( int delta ) {
105851c0b2f7Stbbdev #if TBB_USE_ASSERT
105951c0b2f7Stbbdev my_net_slack_requests+=delta;
106051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
106151c0b2f7Stbbdev if( my_n_thread > 1 ) {
106251c0b2f7Stbbdev if( delta<0 ) {
106351c0b2f7Stbbdev my_slack+=delta;
106451c0b2f7Stbbdev } else if( delta>0 ) {
106551c0b2f7Stbbdev int active_threads = 0;
106651c0b2f7Stbbdev if( try_get_active_thread() ) {
106751c0b2f7Stbbdev ++active_threads;
106851c0b2f7Stbbdev if( try_get_active_thread() ) {
106951c0b2f7Stbbdev ++active_threads;
107051c0b2f7Stbbdev }
107151c0b2f7Stbbdev }
107251c0b2f7Stbbdev wake_some( delta, active_threads );
107351c0b2f7Stbbdev
107451c0b2f7Stbbdev if( !my_waker->wake_or_launch() ) {
107551c0b2f7Stbbdev add_stop_thread();
107651c0b2f7Stbbdev }
107751c0b2f7Stbbdev if( !my_stopper->wake_or_launch() ) {
107851c0b2f7Stbbdev add_stop_thread();
107951c0b2f7Stbbdev }
108051c0b2f7Stbbdev }
108151c0b2f7Stbbdev } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one
108251c0b2f7Stbbdev if( delta<0 ) {
108351c0b2f7Stbbdev my_slack += delta;
108451c0b2f7Stbbdev } else {
108551c0b2f7Stbbdev wake_one_forced( delta );
108651c0b2f7Stbbdev }
108751c0b2f7Stbbdev }
108851c0b2f7Stbbdev }
108951c0b2f7Stbbdev
109051c0b2f7Stbbdev //------------------------------------------------------------------------
109151c0b2f7Stbbdev // RML factory methods
109251c0b2f7Stbbdev //------------------------------------------------------------------------
109351c0b2f7Stbbdev
109451c0b2f7Stbbdev #if USE_PTHREAD
109551c0b2f7Stbbdev
109651c0b2f7Stbbdev static tbb_client* my_global_client = nullptr;
109751c0b2f7Stbbdev static tbb_server* my_global_server = nullptr;
109851c0b2f7Stbbdev
rml_atexit()109951c0b2f7Stbbdev void rml_atexit() {
110051c0b2f7Stbbdev release_resources();
110151c0b2f7Stbbdev }
110251c0b2f7Stbbdev
rml_atfork_child()110351c0b2f7Stbbdev void rml_atfork_child() {
110451c0b2f7Stbbdev if( my_global_server!=nullptr && my_global_client!=nullptr ) {
110551c0b2f7Stbbdev ipc_server* server = static_cast<ipc_server*>( my_global_server );
110651c0b2f7Stbbdev server->~ipc_server();
110751c0b2f7Stbbdev // memset( server, 0, sizeof(ipc_server) );
110851c0b2f7Stbbdev new( server ) ipc_server( *my_global_client );
110951c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child );
111051c0b2f7Stbbdev atexit( rml_atexit );
111151c0b2f7Stbbdev }
111251c0b2f7Stbbdev }
111351c0b2f7Stbbdev
111451c0b2f7Stbbdev #endif /* USE_PTHREAD */
111551c0b2f7Stbbdev
__TBB_make_rml_server(tbb_factory &,tbb_server * & server,tbb_client & client)111651c0b2f7Stbbdev extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) {
111751c0b2f7Stbbdev server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client);
111851c0b2f7Stbbdev #if USE_PTHREAD
111951c0b2f7Stbbdev my_global_client = &client;
112051c0b2f7Stbbdev my_global_server = server;
112151c0b2f7Stbbdev pthread_atfork( nullptr, nullptr, rml_atfork_child );
112251c0b2f7Stbbdev atexit( rml_atexit );
112351c0b2f7Stbbdev #endif /* USE_PTHREAD */
112451c0b2f7Stbbdev if( getenv( "RML_DEBUG" ) ) {
112551c0b2f7Stbbdev runtime_warning("IPC server is started");
112651c0b2f7Stbbdev }
112751c0b2f7Stbbdev return tbb_factory::st_success;
112851c0b2f7Stbbdev }
112951c0b2f7Stbbdev
__TBB_call_with_my_server_info(::rml::server_info_callback_t,void *)113051c0b2f7Stbbdev extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) {
113151c0b2f7Stbbdev }
113251c0b2f7Stbbdev
113351c0b2f7Stbbdev } // namespace rml
113451c0b2f7Stbbdev } // namespace detail
113551c0b2f7Stbbdev
113651c0b2f7Stbbdev } // namespace tbb
1137