xref: /oneTBB/python/rml/ipc_server.cpp (revision b15aabb3)
151c0b2f7Stbbdev /*
2*b15aabb3Stbbdev     Copyright (c) 2017-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include <atomic>
1851c0b2f7Stbbdev #include <cstring>
1951c0b2f7Stbbdev #include <cstdlib>
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "../../src/tbb/rml_tbb.h"
2251c0b2f7Stbbdev #include "../../src/tbb/rml_thread_monitor.h"
2351c0b2f7Stbbdev #include "../../src/tbb/scheduler_common.h"
2451c0b2f7Stbbdev #include "../../src/tbb/governor.h"
2551c0b2f7Stbbdev #include "../../src/tbb/misc.h"
2651c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h"
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev #include "ipc_utils.h"
2951c0b2f7Stbbdev 
3051c0b2f7Stbbdev #include <fcntl.h>
3151c0b2f7Stbbdev #include <stdlib.h>
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev namespace rml {
3451c0b2f7Stbbdev namespace internal {
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE";
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev typedef versioned_object::version_type version_type;
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) {
4151c0b2f7Stbbdev     if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) {
4251c0b2f7Stbbdev         return factory::st_incompatible;
4351c0b2f7Stbbdev     }
4451c0b2f7Stbbdev 
4551c0b2f7Stbbdev     // Hack to keep this library from being closed
4651c0b2f7Stbbdev     static std::atomic<bool> one_time_flag{false};
4751c0b2f7Stbbdev     bool expected = false;
4851c0b2f7Stbbdev 
4951c0b2f7Stbbdev     if( one_time_flag.compare_exchange_strong(expected, true) ) {
5051c0b2f7Stbbdev         __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr );
5151c0b2f7Stbbdev #if _WIN32||_WIN64
5251c0b2f7Stbbdev         f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
5351c0b2f7Stbbdev #else
5451c0b2f7Stbbdev         f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
5551c0b2f7Stbbdev #endif
5651c0b2f7Stbbdev     }
5751c0b2f7Stbbdev     // End of hack
5851c0b2f7Stbbdev 
5951c0b2f7Stbbdev     return factory::st_success;
6051c0b2f7Stbbdev }
6151c0b2f7Stbbdev 
6251c0b2f7Stbbdev extern "C" void __RML_close_factory(factory& /*f*/) {
6351c0b2f7Stbbdev }
6451c0b2f7Stbbdev 
6551c0b2f7Stbbdev class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor {
6651c0b2f7Stbbdev public:
6751c0b2f7Stbbdev     ipc_thread_monitor() : thread_monitor() {}
6851c0b2f7Stbbdev 
6951c0b2f7Stbbdev #if USE_WINTHREAD
7051c0b2f7Stbbdev #elif USE_PTHREAD
7151c0b2f7Stbbdev     static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size);
7251c0b2f7Stbbdev #endif
7351c0b2f7Stbbdev };
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev #if USE_WINTHREAD
7651c0b2f7Stbbdev #elif USE_PTHREAD
7751c0b2f7Stbbdev inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) {
7851c0b2f7Stbbdev     pthread_attr_t s;
7951c0b2f7Stbbdev     if( pthread_attr_init( &s ) ) return 0;
8051c0b2f7Stbbdev     if( stack_size>0 ) {
8151c0b2f7Stbbdev         if( pthread_attr_setstacksize( &s, stack_size ) ) return 0;
8251c0b2f7Stbbdev     }
8351c0b2f7Stbbdev     pthread_t handle;
8451c0b2f7Stbbdev     if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0;
8551c0b2f7Stbbdev     if( pthread_attr_destroy( &s ) ) return 0;
8651c0b2f7Stbbdev     return handle;
8751c0b2f7Stbbdev }
8851c0b2f7Stbbdev #endif
8951c0b2f7Stbbdev 
9051c0b2f7Stbbdev }} // rml::internal
9151c0b2f7Stbbdev 
9251c0b2f7Stbbdev using rml::internal::ipc_thread_monitor;
9351c0b2f7Stbbdev using tbb::internal::rml::get_shared_name;
9451c0b2f7Stbbdev 
9551c0b2f7Stbbdev namespace tbb {
9651c0b2f7Stbbdev namespace detail {
9751c0b2f7Stbbdev 
9851c0b2f7Stbbdev namespace r1 {
9951c0b2f7Stbbdev bool terminate_on_exception() {
10051c0b2f7Stbbdev     return false;
10151c0b2f7Stbbdev }
10251c0b2f7Stbbdev }
10351c0b2f7Stbbdev 
10451c0b2f7Stbbdev namespace rml {
10551c0b2f7Stbbdev 
10651c0b2f7Stbbdev typedef ipc_thread_monitor::handle_type thread_handle;
10751c0b2f7Stbbdev 
10851c0b2f7Stbbdev class ipc_server;
10951c0b2f7Stbbdev 
11051c0b2f7Stbbdev static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS";
11151c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active";
11251c0b2f7Stbbdev static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop";
11351c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE";
11451c0b2f7Stbbdev static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE";
11551c0b2f7Stbbdev static const mode_t IPC_SEM_MODE = 0660;
11651c0b2f7Stbbdev 
11751c0b2f7Stbbdev static std::atomic<int> my_global_thread_count;
11851c0b2f7Stbbdev using tbb_client = tbb::detail::r1::rml::tbb_client;
11951c0b2f7Stbbdev using tbb_server = tbb::detail::r1::rml::tbb_server;
12051c0b2f7Stbbdev using tbb_factory = tbb::detail::r1::rml::tbb_factory;
12151c0b2f7Stbbdev 
12251c0b2f7Stbbdev using tbb::detail::r1::runtime_warning;
12351c0b2f7Stbbdev 
12451c0b2f7Stbbdev char* get_sem_name(const char* name, const char* prefix) {
12551c0b2f7Stbbdev     __TBB_ASSERT(name != nullptr, nullptr);
12651c0b2f7Stbbdev     __TBB_ASSERT(prefix != nullptr, nullptr);
12751c0b2f7Stbbdev     char* value = std::getenv(name);
12851c0b2f7Stbbdev     std::size_t len = value == nullptr ? 0 : std::strlen(value);
12951c0b2f7Stbbdev     if (len > 0) {
13051c0b2f7Stbbdev         // TODO: consider returning the original string instead of the copied string.
13151c0b2f7Stbbdev         char* sem_name = new char[len + 1];
13251c0b2f7Stbbdev         __TBB_ASSERT(sem_name != nullptr, nullptr);
13351c0b2f7Stbbdev         std::strncpy(sem_name, value, len+1);
13451c0b2f7Stbbdev         __TBB_ASSERT(sem_name[len] == 0, nullptr);
13551c0b2f7Stbbdev         return sem_name;
13651c0b2f7Stbbdev     } else {
13751c0b2f7Stbbdev         return get_shared_name(prefix);
13851c0b2f7Stbbdev     }
13951c0b2f7Stbbdev }
14051c0b2f7Stbbdev 
14151c0b2f7Stbbdev char* get_active_sem_name() {
14251c0b2f7Stbbdev     return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev 
14551c0b2f7Stbbdev char* get_stop_sem_name() {
14651c0b2f7Stbbdev     return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
14751c0b2f7Stbbdev }
14851c0b2f7Stbbdev 
14951c0b2f7Stbbdev static void release_thread_sem(sem_t* my_sem) {
15051c0b2f7Stbbdev     int old = my_global_thread_count.load(std::memory_order_relaxed);
15151c0b2f7Stbbdev     do {
15251c0b2f7Stbbdev         if( old<=0 ) return;
15351c0b2f7Stbbdev     } while( !my_global_thread_count.compare_exchange_strong(old, old-1) );
15451c0b2f7Stbbdev     if( old>0 ) {
15551c0b2f7Stbbdev         sem_post( my_sem );
15651c0b2f7Stbbdev     }
15751c0b2f7Stbbdev }
15851c0b2f7Stbbdev 
15951c0b2f7Stbbdev void set_sem_name(const char* name, const char* prefix) {
16051c0b2f7Stbbdev     __TBB_ASSERT(name != nullptr, nullptr);
16151c0b2f7Stbbdev     __TBB_ASSERT(prefix != nullptr, nullptr);
16251c0b2f7Stbbdev     const char* postfix = "_XXXXXX";
16351c0b2f7Stbbdev     std::size_t plen = std::strlen(prefix);
16451c0b2f7Stbbdev     std::size_t xlen = std::strlen(postfix);
16551c0b2f7Stbbdev     char* templ = new char[plen + xlen + 1];
16651c0b2f7Stbbdev     __TBB_ASSERT(templ != nullptr, nullptr);
16751c0b2f7Stbbdev     strncpy(templ, prefix, plen+1);
16851c0b2f7Stbbdev     __TBB_ASSERT(templ[plen] == 0, nullptr);
16951c0b2f7Stbbdev     strncat(templ, postfix, xlen + 1);
17051c0b2f7Stbbdev     __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr);
17151c0b2f7Stbbdev     // TODO: consider using mkstemp instead of mktemp.
17251c0b2f7Stbbdev     char* sem_name = mktemp(templ);
17351c0b2f7Stbbdev     if (sem_name != nullptr) {
17451c0b2f7Stbbdev         int status = setenv(name, sem_name,  /*overwrite*/ 1);
17551c0b2f7Stbbdev         __TBB_ASSERT_EX(status == 0, nullptr);
17651c0b2f7Stbbdev     }
17751c0b2f7Stbbdev     delete[] templ;
17851c0b2f7Stbbdev }
17951c0b2f7Stbbdev 
18051c0b2f7Stbbdev extern "C" void set_active_sem_name() {
18151c0b2f7Stbbdev     set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
18251c0b2f7Stbbdev }
18351c0b2f7Stbbdev 
18451c0b2f7Stbbdev extern "C" void set_stop_sem_name() {
18551c0b2f7Stbbdev     set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev 
18851c0b2f7Stbbdev extern "C" void release_resources() {
18951c0b2f7Stbbdev     if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) {
19051c0b2f7Stbbdev         char* active_sem_name = get_active_sem_name();
19151c0b2f7Stbbdev         sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT );
19251c0b2f7Stbbdev         __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
19351c0b2f7Stbbdev         delete[] active_sem_name;
19451c0b2f7Stbbdev 
19551c0b2f7Stbbdev         do {
19651c0b2f7Stbbdev             release_thread_sem( my_active_sem );
19751c0b2f7Stbbdev         } while( my_global_thread_count.load(std::memory_order_acquire)!=0 );
19851c0b2f7Stbbdev     }
19951c0b2f7Stbbdev }
20051c0b2f7Stbbdev 
20151c0b2f7Stbbdev extern "C" void release_semaphores() {
20251c0b2f7Stbbdev     int status = 0;
20351c0b2f7Stbbdev     char* sem_name = nullptr;
20451c0b2f7Stbbdev 
20551c0b2f7Stbbdev     sem_name = get_active_sem_name();
20651c0b2f7Stbbdev     if( sem_name==nullptr ) {
20751c0b2f7Stbbdev         runtime_warning("Can not get RML semaphore name");
20851c0b2f7Stbbdev         return;
20951c0b2f7Stbbdev     }
21051c0b2f7Stbbdev     status = sem_unlink( sem_name );
21151c0b2f7Stbbdev     if( status!=0 ) {
21251c0b2f7Stbbdev         if( errno==ENOENT ) {
21351c0b2f7Stbbdev             /* There is no semaphore with the given name, nothing to do */
21451c0b2f7Stbbdev         } else {
21551c0b2f7Stbbdev             runtime_warning("Can not release RML semaphore");
21651c0b2f7Stbbdev             return;
21751c0b2f7Stbbdev         }
21851c0b2f7Stbbdev     }
21951c0b2f7Stbbdev     delete[] sem_name;
22051c0b2f7Stbbdev 
22151c0b2f7Stbbdev     sem_name = get_stop_sem_name();
22251c0b2f7Stbbdev     if( sem_name==nullptr ) {
22351c0b2f7Stbbdev         runtime_warning( "Can not get RML semaphore name" );
22451c0b2f7Stbbdev         return;
22551c0b2f7Stbbdev     }
22651c0b2f7Stbbdev     status = sem_unlink( sem_name );
22751c0b2f7Stbbdev     if( status!=0 ) {
22851c0b2f7Stbbdev         if( errno==ENOENT ) {
22951c0b2f7Stbbdev             /* There is no semaphore with the given name, nothing to do */
23051c0b2f7Stbbdev         } else {
23151c0b2f7Stbbdev             runtime_warning("Can not release RML semaphore");
23251c0b2f7Stbbdev             return;
23351c0b2f7Stbbdev         }
23451c0b2f7Stbbdev     }
23551c0b2f7Stbbdev     delete[] sem_name;
23651c0b2f7Stbbdev }
23751c0b2f7Stbbdev 
23851c0b2f7Stbbdev class ipc_worker: no_copy {
23951c0b2f7Stbbdev protected:
24051c0b2f7Stbbdev     //! State in finite-state machine that controls the worker.
24151c0b2f7Stbbdev     /** State diagram:
24251c0b2f7Stbbdev                     /----------stop---\
24351c0b2f7Stbbdev                     |           ^     |
24451c0b2f7Stbbdev                     V           |     |
24551c0b2f7Stbbdev         init --> starting --> normal  |
24651c0b2f7Stbbdev           |         |           |     |
24751c0b2f7Stbbdev           |         V           |     |
24851c0b2f7Stbbdev           \------> quit <-------/<----/
24951c0b2f7Stbbdev       */
25051c0b2f7Stbbdev     enum state_t {
25151c0b2f7Stbbdev         //! *this is initialized
25251c0b2f7Stbbdev         st_init,
25351c0b2f7Stbbdev         //! *this has associated thread that is starting up.
25451c0b2f7Stbbdev         st_starting,
25551c0b2f7Stbbdev         //! Associated thread is doing normal life sequence.
25651c0b2f7Stbbdev         st_normal,
25751c0b2f7Stbbdev         //! Associated thread is stopped but can be started again.
25851c0b2f7Stbbdev         st_stop,
25951c0b2f7Stbbdev         //! Associated thread has ended normal life sequence and promises to never touch *this again.
26051c0b2f7Stbbdev         st_quit
26151c0b2f7Stbbdev     };
26251c0b2f7Stbbdev     std::atomic<state_t> my_state;
26351c0b2f7Stbbdev 
26451c0b2f7Stbbdev     //! Associated server
26551c0b2f7Stbbdev     ipc_server& my_server;
26651c0b2f7Stbbdev 
26751c0b2f7Stbbdev     //! Associated client
26851c0b2f7Stbbdev     tbb_client& my_client;
26951c0b2f7Stbbdev 
27051c0b2f7Stbbdev     //! index used for avoiding the 64K aliasing problem
27151c0b2f7Stbbdev     const size_t my_index;
27251c0b2f7Stbbdev 
27351c0b2f7Stbbdev     //! Monitor for sleeping when there is no work to do.
27451c0b2f7Stbbdev     /** The invariant that holds for sleeping workers is:
27551c0b2f7Stbbdev         "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
27651c0b2f7Stbbdev     ipc_thread_monitor my_thread_monitor;
27751c0b2f7Stbbdev 
27851c0b2f7Stbbdev     //! Handle of the OS thread associated with this worker
27951c0b2f7Stbbdev     thread_handle my_handle;
28051c0b2f7Stbbdev 
28151c0b2f7Stbbdev     //! Link for list of workers that are sleeping or have no associated thread.
28251c0b2f7Stbbdev     ipc_worker* my_next;
28351c0b2f7Stbbdev 
28451c0b2f7Stbbdev     friend class ipc_server;
28551c0b2f7Stbbdev 
28651c0b2f7Stbbdev     //! Actions executed by the associated thread
28751c0b2f7Stbbdev     void run();
28851c0b2f7Stbbdev 
28951c0b2f7Stbbdev     //! Wake up associated thread (or launch a thread if there is none)
29051c0b2f7Stbbdev     bool wake_or_launch();
29151c0b2f7Stbbdev 
29251c0b2f7Stbbdev     //! Called by a thread (usually not the associated thread) to commence termination.
29351c0b2f7Stbbdev     void start_shutdown(bool join);
29451c0b2f7Stbbdev 
29551c0b2f7Stbbdev     //! Called by a thread (usually not the associated thread) to commence stopping.
29651c0b2f7Stbbdev     void start_stopping(bool join);
29751c0b2f7Stbbdev 
29851c0b2f7Stbbdev     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
29951c0b2f7Stbbdev 
30051c0b2f7Stbbdev     static void release_handle(thread_handle my_handle, bool join);
30151c0b2f7Stbbdev 
30251c0b2f7Stbbdev protected:
30351c0b2f7Stbbdev     ipc_worker(ipc_server& server, tbb_client& client, const size_t i) :
30451c0b2f7Stbbdev         my_server(server),
30551c0b2f7Stbbdev         my_client(client),
30651c0b2f7Stbbdev         my_index(i)
30751c0b2f7Stbbdev     {
30851c0b2f7Stbbdev         my_state = st_init;
30951c0b2f7Stbbdev     }
31051c0b2f7Stbbdev };
31151c0b2f7Stbbdev 
31251c0b2f7Stbbdev //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit
31351c0b2f7Stbbdev constexpr static size_t cache_line_sz = 128;
31451c0b2f7Stbbdev 
31551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
31651c0b2f7Stbbdev     // Suppress overzealous compiler warnings about uninstantiable class
31751c0b2f7Stbbdev     #pragma warning(push)
31851c0b2f7Stbbdev     #pragma warning(disable:4510 4610)
31951c0b2f7Stbbdev #endif
32051c0b2f7Stbbdev class padded_ipc_worker: public ipc_worker {
32151c0b2f7Stbbdev     char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz];
32251c0b2f7Stbbdev public:
32351c0b2f7Stbbdev     padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i)
32451c0b2f7Stbbdev     : ipc_worker( server,client,i ) { suppress_unused_warning(pad); }
32551c0b2f7Stbbdev };
32651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
32751c0b2f7Stbbdev     #pragma warning(pop)
32851c0b2f7Stbbdev #endif
32951c0b2f7Stbbdev 
33051c0b2f7Stbbdev class ipc_waker : public padded_ipc_worker {
33151c0b2f7Stbbdev private:
33251c0b2f7Stbbdev     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
33351c0b2f7Stbbdev     void run();
33451c0b2f7Stbbdev     bool wake_or_launch();
33551c0b2f7Stbbdev 
33651c0b2f7Stbbdev     friend class ipc_server;
33751c0b2f7Stbbdev 
33851c0b2f7Stbbdev public:
33951c0b2f7Stbbdev     ipc_waker(ipc_server& server, tbb_client& client, const size_t i)
34051c0b2f7Stbbdev     : padded_ipc_worker( server, client, i ) {}
34151c0b2f7Stbbdev };
34251c0b2f7Stbbdev 
34351c0b2f7Stbbdev class ipc_stopper : public padded_ipc_worker {
34451c0b2f7Stbbdev private:
34551c0b2f7Stbbdev     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
34651c0b2f7Stbbdev     void run();
34751c0b2f7Stbbdev     bool wake_or_launch();
34851c0b2f7Stbbdev 
34951c0b2f7Stbbdev     friend class ipc_server;
35051c0b2f7Stbbdev 
35151c0b2f7Stbbdev public:
35251c0b2f7Stbbdev     ipc_stopper(ipc_server& server, tbb_client& client, const size_t i)
35351c0b2f7Stbbdev     : padded_ipc_worker( server, client, i ) {}
35451c0b2f7Stbbdev };
35551c0b2f7Stbbdev 
35651c0b2f7Stbbdev class ipc_server: public tbb_server, no_copy {
35751c0b2f7Stbbdev private:
35851c0b2f7Stbbdev     tbb_client& my_client;
35951c0b2f7Stbbdev     //! Maximum number of threads to be created.
36051c0b2f7Stbbdev     /** Threads are created lazily, so maximum might not actually be reached. */
36151c0b2f7Stbbdev     tbb_client::size_type my_n_thread;
36251c0b2f7Stbbdev 
36351c0b2f7Stbbdev     //! Stack size for each thread. */
36451c0b2f7Stbbdev     const size_t my_stack_size;
36551c0b2f7Stbbdev 
36651c0b2f7Stbbdev     //! Number of jobs that could use their associated thread minus number of active threads.
36751c0b2f7Stbbdev     /** If negative, indicates oversubscription.
36851c0b2f7Stbbdev         If positive, indicates that more threads should run.
36951c0b2f7Stbbdev         Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
37051c0b2f7Stbbdev         because raising it impacts the invariant for sleeping threads. */
37151c0b2f7Stbbdev     std::atomic<int> my_slack;
37251c0b2f7Stbbdev 
37351c0b2f7Stbbdev     //! Counter used to determine when to delete this.
37451c0b2f7Stbbdev     std::atomic<int> my_ref_count;
37551c0b2f7Stbbdev 
37651c0b2f7Stbbdev     padded_ipc_worker* my_thread_array;
37751c0b2f7Stbbdev 
37851c0b2f7Stbbdev     //! List of workers that are asleep or committed to sleeping until notified by another thread.
37951c0b2f7Stbbdev     std::atomic<ipc_worker*> my_asleep_list_root;
38051c0b2f7Stbbdev 
38151c0b2f7Stbbdev     //! Protects my_asleep_list_root
38251c0b2f7Stbbdev     typedef scheduler_mutex_type asleep_list_mutex_type;
38351c0b2f7Stbbdev     asleep_list_mutex_type my_asleep_list_mutex;
38451c0b2f7Stbbdev 
38551c0b2f7Stbbdev     //! Should server wait workers while terminate
38651c0b2f7Stbbdev     const bool my_join_workers;
38751c0b2f7Stbbdev 
38851c0b2f7Stbbdev     //! Service thread for waking of workers
38951c0b2f7Stbbdev     ipc_waker* my_waker;
39051c0b2f7Stbbdev 
39151c0b2f7Stbbdev     //! Service thread to stop threads
39251c0b2f7Stbbdev     ipc_stopper* my_stopper;
39351c0b2f7Stbbdev 
39451c0b2f7Stbbdev     //! Semaphore to account active threads
39551c0b2f7Stbbdev     sem_t* my_active_sem;
39651c0b2f7Stbbdev 
39751c0b2f7Stbbdev     //! Semaphore to account stop threads
39851c0b2f7Stbbdev     sem_t* my_stop_sem;
39951c0b2f7Stbbdev 
40051c0b2f7Stbbdev #if TBB_USE_ASSERT
40151c0b2f7Stbbdev     std::atomic<int> my_net_slack_requests;
40251c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
40351c0b2f7Stbbdev 
40451c0b2f7Stbbdev     //! Wake up to two sleeping workers, if there are any sleeping.
40551c0b2f7Stbbdev     /** The call is used to propagate a chain reaction where each thread wakes up two threads,
40651c0b2f7Stbbdev         which in turn each wake up two threads, etc. */
40751c0b2f7Stbbdev     void propagate_chain_reaction() {
40851c0b2f7Stbbdev         // First test of a double-check idiom.  Second test is inside wake_some(0).
40951c0b2f7Stbbdev         if( my_slack.load(std::memory_order_acquire)>0 ) {
41051c0b2f7Stbbdev             int active_threads = 0;
41151c0b2f7Stbbdev             if( try_get_active_thread() ) {
41251c0b2f7Stbbdev                 ++active_threads;
41351c0b2f7Stbbdev                 if( try_get_active_thread() ) {
41451c0b2f7Stbbdev                     ++active_threads;
41551c0b2f7Stbbdev                 }
41651c0b2f7Stbbdev                 wake_some( 0, active_threads );
41751c0b2f7Stbbdev             }
41851c0b2f7Stbbdev         }
41951c0b2f7Stbbdev     }
42051c0b2f7Stbbdev 
42151c0b2f7Stbbdev     //! Try to add t to list of sleeping workers
42251c0b2f7Stbbdev     bool try_insert_in_asleep_list(ipc_worker& t);
42351c0b2f7Stbbdev 
42451c0b2f7Stbbdev     //! Try to add t to list of sleeping workers even if there is some work to do
42551c0b2f7Stbbdev     bool try_insert_in_asleep_list_forced(ipc_worker& t);
42651c0b2f7Stbbdev 
42751c0b2f7Stbbdev     //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
42851c0b2f7Stbbdev     void wake_some(int additional_slack, int active_threads);
42951c0b2f7Stbbdev 
43051c0b2f7Stbbdev     //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits.
43151c0b2f7Stbbdev     void wake_one_forced(int additional_slack);
43251c0b2f7Stbbdev 
43351c0b2f7Stbbdev     //! Stop one thread from asleep list
43451c0b2f7Stbbdev     bool stop_one();
43551c0b2f7Stbbdev 
43651c0b2f7Stbbdev     //! Wait for active thread
43751c0b2f7Stbbdev     bool wait_active_thread();
43851c0b2f7Stbbdev 
43951c0b2f7Stbbdev     //! Try to get active thread
44051c0b2f7Stbbdev     bool try_get_active_thread();
44151c0b2f7Stbbdev 
44251c0b2f7Stbbdev     //! Release active thread
44351c0b2f7Stbbdev     void release_active_thread();
44451c0b2f7Stbbdev 
44551c0b2f7Stbbdev     //! Wait for thread to stop
44651c0b2f7Stbbdev     bool wait_stop_thread();
44751c0b2f7Stbbdev 
44851c0b2f7Stbbdev     //! Add thread to stop list
44951c0b2f7Stbbdev     void add_stop_thread();
45051c0b2f7Stbbdev 
45151c0b2f7Stbbdev     void remove_server_ref() {
45251c0b2f7Stbbdev         if( --my_ref_count==0 ) {
45351c0b2f7Stbbdev             my_client.acknowledge_close_connection();
45451c0b2f7Stbbdev             this->~ipc_server();
45551c0b2f7Stbbdev             tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 );
45651c0b2f7Stbbdev         }
45751c0b2f7Stbbdev     }
45851c0b2f7Stbbdev 
45951c0b2f7Stbbdev     friend class ipc_worker;
46051c0b2f7Stbbdev     friend class ipc_waker;
46151c0b2f7Stbbdev     friend class ipc_stopper;
46251c0b2f7Stbbdev public:
46351c0b2f7Stbbdev     ipc_server(tbb_client& client);
46451c0b2f7Stbbdev     virtual ~ipc_server();
46551c0b2f7Stbbdev 
46651c0b2f7Stbbdev     version_type version() const override {
46751c0b2f7Stbbdev         return 0;
46851c0b2f7Stbbdev     }
46951c0b2f7Stbbdev 
47051c0b2f7Stbbdev     void request_close_connection(bool /*exiting*/) override {
47151c0b2f7Stbbdev         my_waker->start_shutdown(false);
47251c0b2f7Stbbdev         my_stopper->start_shutdown(false);
47351c0b2f7Stbbdev         for( size_t i=0; i<my_n_thread; ++i )
47451c0b2f7Stbbdev             my_thread_array[i].start_shutdown( my_join_workers );
47551c0b2f7Stbbdev         remove_server_ref();
47651c0b2f7Stbbdev     }
47751c0b2f7Stbbdev 
47851c0b2f7Stbbdev     void yield() override {d0::yield();}
47951c0b2f7Stbbdev 
48051c0b2f7Stbbdev     void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); }
48151c0b2f7Stbbdev 
48251c0b2f7Stbbdev     unsigned default_concurrency() const override { return my_n_thread - 1; }
48351c0b2f7Stbbdev 
48451c0b2f7Stbbdev     void adjust_job_count_estimate(int delta) override;
48551c0b2f7Stbbdev 
48651c0b2f7Stbbdev #if _WIN32||_WIN64
487*b15aabb3Stbbdev     void register_external_thread(::rml::server::execution_resource_t&) override {}
488*b15aabb3Stbbdev     void unregister_external_thread(::rml::server::execution_resource_t) override {}
48951c0b2f7Stbbdev #endif /* _WIN32||_WIN64 */
49051c0b2f7Stbbdev };
49151c0b2f7Stbbdev 
49251c0b2f7Stbbdev //------------------------------------------------------------------------
49351c0b2f7Stbbdev // Methods of ipc_worker
49451c0b2f7Stbbdev //------------------------------------------------------------------------
49551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
49651c0b2f7Stbbdev     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
49751c0b2f7Stbbdev     #pragma warning(push)
49851c0b2f7Stbbdev     #pragma warning(disable:4189)
49951c0b2f7Stbbdev #endif
50051c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
50151c0b2f7Stbbdev // ensure that stack is properly aligned
50251c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
50351c0b2f7Stbbdev #endif
50451c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) {
50551c0b2f7Stbbdev     ipc_worker* self = static_cast<ipc_worker*>(arg);
50651c0b2f7Stbbdev     AVOID_64K_ALIASING( self->my_index );
50751c0b2f7Stbbdev     self->run();
50851c0b2f7Stbbdev     return 0;
50951c0b2f7Stbbdev }
51051c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
51151c0b2f7Stbbdev     #pragma warning(pop)
51251c0b2f7Stbbdev #endif
51351c0b2f7Stbbdev 
51451c0b2f7Stbbdev void ipc_worker::release_handle(thread_handle handle, bool join) {
51551c0b2f7Stbbdev     if( join )
51651c0b2f7Stbbdev         ipc_thread_monitor::join( handle );
51751c0b2f7Stbbdev     else
51851c0b2f7Stbbdev         ipc_thread_monitor::detach_thread( handle );
51951c0b2f7Stbbdev }
52051c0b2f7Stbbdev 
52151c0b2f7Stbbdev void ipc_worker::start_shutdown(bool join) {
52251c0b2f7Stbbdev     state_t s = my_state.load(std::memory_order_relaxed);;
52351c0b2f7Stbbdev 
52451c0b2f7Stbbdev     do {
52551c0b2f7Stbbdev         __TBB_ASSERT( s!=st_quit, nullptr );
52651c0b2f7Stbbdev     } while( !my_state.compare_exchange_strong( s, st_quit ) );
52751c0b2f7Stbbdev     if( s==st_normal || s==st_starting ) {
52851c0b2f7Stbbdev         // May have invalidated invariant for sleeping, so wake up the thread.
52951c0b2f7Stbbdev         // Note that the notify() here occurs without maintaining invariants for my_slack.
53051c0b2f7Stbbdev         // It does not matter, because my_state==st_quit overrides checking of my_slack.
53151c0b2f7Stbbdev         my_thread_monitor.notify();
53251c0b2f7Stbbdev         // Do not need release handle in st_init state,
53351c0b2f7Stbbdev         // because in this case the thread wasn't started yet.
53451c0b2f7Stbbdev         // For st_starting release is done at launch site.
53551c0b2f7Stbbdev         if( s==st_normal )
53651c0b2f7Stbbdev             release_handle( my_handle, join );
53751c0b2f7Stbbdev     }
53851c0b2f7Stbbdev }
53951c0b2f7Stbbdev 
54051c0b2f7Stbbdev void ipc_worker::start_stopping(bool join) {
54151c0b2f7Stbbdev     state_t s = my_state.load(std::memory_order_relaxed);;
54251c0b2f7Stbbdev 
54351c0b2f7Stbbdev     while( !my_state.compare_exchange_strong( s, st_quit ) ) {};
54451c0b2f7Stbbdev     if( s==st_normal || s==st_starting ) {
54551c0b2f7Stbbdev         // May have invalidated invariant for sleeping, so wake up the thread.
54651c0b2f7Stbbdev         // Note that the notify() here occurs without maintaining invariants for my_slack.
54751c0b2f7Stbbdev         // It does not matter, because my_state==st_quit overrides checking of my_slack.
54851c0b2f7Stbbdev         my_thread_monitor.notify();
54951c0b2f7Stbbdev         // Do not need release handle in st_init state,
55051c0b2f7Stbbdev         // because in this case the thread wasn't started yet.
55151c0b2f7Stbbdev         // For st_starting release is done at launch site.
55251c0b2f7Stbbdev         if( s==st_normal )
55351c0b2f7Stbbdev             release_handle( my_handle, join );
55451c0b2f7Stbbdev     }
55551c0b2f7Stbbdev }
55651c0b2f7Stbbdev 
55751c0b2f7Stbbdev void ipc_worker::run() {
55851c0b2f7Stbbdev     my_server.propagate_chain_reaction();
55951c0b2f7Stbbdev 
56051c0b2f7Stbbdev     // Transiting to st_normal here would require setting my_handle,
56151c0b2f7Stbbdev     // which would create race with the launching thread and
56251c0b2f7Stbbdev     // complications in handle management on Windows.
56351c0b2f7Stbbdev 
56451c0b2f7Stbbdev     ::rml::job& j = *my_client.create_one_job();
56551c0b2f7Stbbdev     state_t state = my_state.load(std::memory_order_acquire);
56651c0b2f7Stbbdev     while( state!=st_quit && state!=st_stop ) {
56751c0b2f7Stbbdev         if( my_server.my_slack>=0 ) {
56851c0b2f7Stbbdev             my_client.process(j);
56951c0b2f7Stbbdev         } else {
57051c0b2f7Stbbdev             ipc_thread_monitor::cookie c;
57151c0b2f7Stbbdev             // Prepare to wait
57251c0b2f7Stbbdev             my_thread_monitor.prepare_wait(c);
57351c0b2f7Stbbdev             // Check/set the invariant for sleeping
57451c0b2f7Stbbdev             state = my_state.load(std::memory_order_acquire);
57551c0b2f7Stbbdev             if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
57651c0b2f7Stbbdev                 if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
57751c0b2f7Stbbdev                 my_thread_monitor.commit_wait(c);
57851c0b2f7Stbbdev                 my_server.propagate_chain_reaction();
57951c0b2f7Stbbdev             } else {
58051c0b2f7Stbbdev                 // Invariant broken
58151c0b2f7Stbbdev                 my_thread_monitor.cancel_wait();
58251c0b2f7Stbbdev             }
58351c0b2f7Stbbdev         }
58451c0b2f7Stbbdev         state = my_state.load(std::memory_order_acquire);
58551c0b2f7Stbbdev     }
58651c0b2f7Stbbdev     my_client.cleanup(j);
58751c0b2f7Stbbdev 
58851c0b2f7Stbbdev     my_server.remove_server_ref();
58951c0b2f7Stbbdev }
59051c0b2f7Stbbdev 
59151c0b2f7Stbbdev inline bool ipc_worker::wake_or_launch() {
59251c0b2f7Stbbdev     state_t excepted_stop = st_stop, expected_init = st_init;
59351c0b2f7Stbbdev     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) ||
59451c0b2f7Stbbdev         ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) {
59551c0b2f7Stbbdev         // after this point, remove_server_ref() must be done by created thread
59651c0b2f7Stbbdev #if USE_WINTHREAD
59751c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
59851c0b2f7Stbbdev #elif USE_PTHREAD
59951c0b2f7Stbbdev         {
60051c0b2f7Stbbdev         affinity_helper fpa;
60151c0b2f7Stbbdev         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
60251c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
60351c0b2f7Stbbdev         if( my_handle == 0 ) {
60451c0b2f7Stbbdev             // Unable to create new thread for process
60551c0b2f7Stbbdev             // However, this is expected situation for the use cases of this coordination server
60651c0b2f7Stbbdev             state_t s = st_starting;
60751c0b2f7Stbbdev             my_state.compare_exchange_strong( s, st_init );
60851c0b2f7Stbbdev             if (st_starting != s) {
60951c0b2f7Stbbdev                 // Do shutdown during startup. my_handle can't be released
61051c0b2f7Stbbdev                 // by start_shutdown, because my_handle value might be not set yet
61151c0b2f7Stbbdev                 // at time of transition from st_starting to st_quit.
61251c0b2f7Stbbdev                 __TBB_ASSERT( s==st_quit, nullptr );
61351c0b2f7Stbbdev                 release_handle( my_handle, my_server.my_join_workers );
61451c0b2f7Stbbdev             }
61551c0b2f7Stbbdev             return false;
61651c0b2f7Stbbdev         } else {
61751c0b2f7Stbbdev             my_server.my_ref_count++;
61851c0b2f7Stbbdev         }
61951c0b2f7Stbbdev         // Implicit destruction of fpa resets original affinity mask.
62051c0b2f7Stbbdev         }
62151c0b2f7Stbbdev #endif /* USE_PTHREAD */
62251c0b2f7Stbbdev         state_t s = st_starting;
62351c0b2f7Stbbdev         my_state.compare_exchange_strong( s, st_normal );
62451c0b2f7Stbbdev         if( st_starting!=s ) {
62551c0b2f7Stbbdev             // Do shutdown during startup. my_handle can't be released
62651c0b2f7Stbbdev             // by start_shutdown, because my_handle value might be not set yet
62751c0b2f7Stbbdev             // at time of transition from st_starting to st_quit.
62851c0b2f7Stbbdev             __TBB_ASSERT( s==st_quit, nullptr );
62951c0b2f7Stbbdev             release_handle( my_handle, my_server.my_join_workers );
63051c0b2f7Stbbdev         }
63151c0b2f7Stbbdev     }
63251c0b2f7Stbbdev     else {
63351c0b2f7Stbbdev         my_thread_monitor.notify();
63451c0b2f7Stbbdev     }
63551c0b2f7Stbbdev 
63651c0b2f7Stbbdev     return true;
63751c0b2f7Stbbdev }
63851c0b2f7Stbbdev 
63951c0b2f7Stbbdev //------------------------------------------------------------------------
64051c0b2f7Stbbdev // Methods of ipc_waker
64151c0b2f7Stbbdev //------------------------------------------------------------------------
64251c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
64351c0b2f7Stbbdev     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
64451c0b2f7Stbbdev     #pragma warning(push)
64551c0b2f7Stbbdev     #pragma warning(disable:4189)
64651c0b2f7Stbbdev #endif
64751c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
64851c0b2f7Stbbdev // ensure that stack is properly aligned
64951c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
65051c0b2f7Stbbdev #endif
65151c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) {
65251c0b2f7Stbbdev     ipc_waker* self = static_cast<ipc_waker*>(arg);
65351c0b2f7Stbbdev     AVOID_64K_ALIASING( self->my_index );
65451c0b2f7Stbbdev     self->run();
65551c0b2f7Stbbdev     return 0;
65651c0b2f7Stbbdev }
65751c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
65851c0b2f7Stbbdev     #pragma warning(pop)
65951c0b2f7Stbbdev #endif
66051c0b2f7Stbbdev 
66151c0b2f7Stbbdev void ipc_waker::run() {
66251c0b2f7Stbbdev     // Transiting to st_normal here would require setting my_handle,
66351c0b2f7Stbbdev     // which would create race with the launching thread and
66451c0b2f7Stbbdev     // complications in handle management on Windows.
66551c0b2f7Stbbdev 
66651c0b2f7Stbbdev     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
66751c0b2f7Stbbdev         bool have_to_sleep = false;
66851c0b2f7Stbbdev         if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
66951c0b2f7Stbbdev             if( my_server.wait_active_thread() ) {
67051c0b2f7Stbbdev                 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
67151c0b2f7Stbbdev                     my_server.wake_some( 0, 1 );
67251c0b2f7Stbbdev                 } else {
67351c0b2f7Stbbdev                     my_server.release_active_thread();
67451c0b2f7Stbbdev                     have_to_sleep = true;
67551c0b2f7Stbbdev                 }
67651c0b2f7Stbbdev             }
67751c0b2f7Stbbdev         } else {
67851c0b2f7Stbbdev             have_to_sleep = true;
67951c0b2f7Stbbdev         }
68051c0b2f7Stbbdev         if( have_to_sleep ) {
68151c0b2f7Stbbdev             ipc_thread_monitor::cookie c;
68251c0b2f7Stbbdev             // Prepare to wait
68351c0b2f7Stbbdev             my_thread_monitor.prepare_wait(c);
68451c0b2f7Stbbdev             // Check/set the invariant for sleeping
68551c0b2f7Stbbdev             if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) {
68651c0b2f7Stbbdev                 my_thread_monitor.commit_wait(c);
68751c0b2f7Stbbdev             } else {
68851c0b2f7Stbbdev                 // Invariant broken
68951c0b2f7Stbbdev                 my_thread_monitor.cancel_wait();
69051c0b2f7Stbbdev             }
69151c0b2f7Stbbdev         }
69251c0b2f7Stbbdev     }
69351c0b2f7Stbbdev 
69451c0b2f7Stbbdev     my_server.remove_server_ref();
69551c0b2f7Stbbdev }
69651c0b2f7Stbbdev 
69751c0b2f7Stbbdev inline bool ipc_waker::wake_or_launch() {
69851c0b2f7Stbbdev     state_t excepted = st_init;
69951c0b2f7Stbbdev     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
70051c0b2f7Stbbdev         // after this point, remove_server_ref() must be done by created thread
70151c0b2f7Stbbdev #if USE_WINTHREAD
70251c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
70351c0b2f7Stbbdev #elif USE_PTHREAD
70451c0b2f7Stbbdev         {
70551c0b2f7Stbbdev         affinity_helper fpa;
70651c0b2f7Stbbdev         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
70751c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
70851c0b2f7Stbbdev         if( my_handle == 0 ) {
70951c0b2f7Stbbdev             runtime_warning( "Unable to create new thread for process %d", getpid() );
71051c0b2f7Stbbdev             state_t s = st_starting;
71151c0b2f7Stbbdev             my_state.compare_exchange_strong(s, st_init);
71251c0b2f7Stbbdev             if (st_starting != s) {
71351c0b2f7Stbbdev                 // Do shutdown during startup. my_handle can't be released
71451c0b2f7Stbbdev                 // by start_shutdown, because my_handle value might be not set yet
71551c0b2f7Stbbdev                 // at time of transition from st_starting to st_quit.
71651c0b2f7Stbbdev                 __TBB_ASSERT( s==st_quit, nullptr );
71751c0b2f7Stbbdev                 release_handle( my_handle, my_server.my_join_workers );
71851c0b2f7Stbbdev             }
71951c0b2f7Stbbdev             return false;
72051c0b2f7Stbbdev         } else {
72151c0b2f7Stbbdev             my_server.my_ref_count++;
72251c0b2f7Stbbdev         }
72351c0b2f7Stbbdev         // Implicit destruction of fpa resets original affinity mask.
72451c0b2f7Stbbdev         }
72551c0b2f7Stbbdev #endif /* USE_PTHREAD */
72651c0b2f7Stbbdev         state_t s = st_starting;
72751c0b2f7Stbbdev         my_state.compare_exchange_strong(s, st_normal);
72851c0b2f7Stbbdev         if( st_starting!=s ) {
72951c0b2f7Stbbdev             // Do shutdown during startup. my_handle can't be released
73051c0b2f7Stbbdev             // by start_shutdown, because my_handle value might be not set yet
73151c0b2f7Stbbdev             // at time of transition from st_starting to st_quit.
73251c0b2f7Stbbdev             __TBB_ASSERT( s==st_quit, nullptr );
73351c0b2f7Stbbdev             release_handle( my_handle, my_server.my_join_workers );
73451c0b2f7Stbbdev         }
73551c0b2f7Stbbdev     }
73651c0b2f7Stbbdev     else {
73751c0b2f7Stbbdev         my_thread_monitor.notify();
73851c0b2f7Stbbdev     }
73951c0b2f7Stbbdev 
74051c0b2f7Stbbdev     return true;
74151c0b2f7Stbbdev }
74251c0b2f7Stbbdev 
74351c0b2f7Stbbdev //------------------------------------------------------------------------
74451c0b2f7Stbbdev // Methods of ipc_stopper
74551c0b2f7Stbbdev //------------------------------------------------------------------------
74651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
74751c0b2f7Stbbdev     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
74851c0b2f7Stbbdev     #pragma warning(push)
74951c0b2f7Stbbdev     #pragma warning(disable:4189)
75051c0b2f7Stbbdev #endif
75151c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
75251c0b2f7Stbbdev // ensure that stack is properly aligned
75351c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
75451c0b2f7Stbbdev #endif
75551c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) {
75651c0b2f7Stbbdev     ipc_stopper* self = static_cast<ipc_stopper*>(arg);
75751c0b2f7Stbbdev     AVOID_64K_ALIASING( self->my_index );
75851c0b2f7Stbbdev     self->run();
75951c0b2f7Stbbdev     return 0;
76051c0b2f7Stbbdev }
76151c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
76251c0b2f7Stbbdev     #pragma warning(pop)
76351c0b2f7Stbbdev #endif
76451c0b2f7Stbbdev 
76551c0b2f7Stbbdev void ipc_stopper::run() {
76651c0b2f7Stbbdev     // Transiting to st_normal here would require setting my_handle,
76751c0b2f7Stbbdev     // which would create race with the launching thread and
76851c0b2f7Stbbdev     // complications in handle management on Windows.
76951c0b2f7Stbbdev 
77051c0b2f7Stbbdev     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
77151c0b2f7Stbbdev         if( my_server.wait_stop_thread() ) {
77251c0b2f7Stbbdev             if( my_state.load(std::memory_order_acquire)!=st_quit ) {
77351c0b2f7Stbbdev                 if( !my_server.stop_one() ) {
77451c0b2f7Stbbdev                     my_server.add_stop_thread();
77551c0b2f7Stbbdev                     tbb::detail::r1::prolonged_pause();
77651c0b2f7Stbbdev                 }
77751c0b2f7Stbbdev             }
77851c0b2f7Stbbdev         }
77951c0b2f7Stbbdev     }
78051c0b2f7Stbbdev 
78151c0b2f7Stbbdev     my_server.remove_server_ref();
78251c0b2f7Stbbdev }
78351c0b2f7Stbbdev 
78451c0b2f7Stbbdev inline bool ipc_stopper::wake_or_launch() {
78551c0b2f7Stbbdev     state_t excepted = st_init;
78651c0b2f7Stbbdev     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
78751c0b2f7Stbbdev         // after this point, remove_server_ref() must be done by created thread
78851c0b2f7Stbbdev #if USE_WINTHREAD
78951c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
79051c0b2f7Stbbdev #elif USE_PTHREAD
79151c0b2f7Stbbdev         {
79251c0b2f7Stbbdev         affinity_helper fpa;
79351c0b2f7Stbbdev         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
79451c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
79551c0b2f7Stbbdev         if( my_handle == 0 ) {
79651c0b2f7Stbbdev             runtime_warning( "Unable to create new thread for process %d", getpid() );
79751c0b2f7Stbbdev             state_t s = st_starting;
79851c0b2f7Stbbdev             my_state.compare_exchange_strong(s, st_init);
79951c0b2f7Stbbdev             if (st_starting != s) {
80051c0b2f7Stbbdev                 // Do shutdown during startup. my_handle can't be released
80151c0b2f7Stbbdev                 // by start_shutdown, because my_handle value might be not set yet
80251c0b2f7Stbbdev                 // at time of transition from st_starting to st_quit.
80351c0b2f7Stbbdev                 __TBB_ASSERT( s==st_quit, nullptr );
80451c0b2f7Stbbdev                 release_handle( my_handle, my_server.my_join_workers );
80551c0b2f7Stbbdev             }
80651c0b2f7Stbbdev             return false;
80751c0b2f7Stbbdev         } else {
80851c0b2f7Stbbdev             my_server.my_ref_count++;
80951c0b2f7Stbbdev         }
81051c0b2f7Stbbdev         // Implicit destruction of fpa resets original affinity mask.
81151c0b2f7Stbbdev         }
81251c0b2f7Stbbdev #endif /* USE_PTHREAD */
81351c0b2f7Stbbdev         state_t s = st_starting;
81451c0b2f7Stbbdev         my_state.compare_exchange_strong(s, st_normal);
81551c0b2f7Stbbdev         if( st_starting!=s ) {
81651c0b2f7Stbbdev             // Do shutdown during startup. my_handle can't be released
81751c0b2f7Stbbdev             // by start_shutdown, because my_handle value might be not set yet
81851c0b2f7Stbbdev             // at time of transition from st_starting to st_quit.
81951c0b2f7Stbbdev             __TBB_ASSERT( s==st_quit, nullptr );
82051c0b2f7Stbbdev             release_handle( my_handle, my_server.my_join_workers );
82151c0b2f7Stbbdev         }
82251c0b2f7Stbbdev     }
82351c0b2f7Stbbdev     else {
82451c0b2f7Stbbdev         my_thread_monitor.notify();
82551c0b2f7Stbbdev     }
82651c0b2f7Stbbdev 
82751c0b2f7Stbbdev     return true;
82851c0b2f7Stbbdev }
82951c0b2f7Stbbdev 
83051c0b2f7Stbbdev //------------------------------------------------------------------------
83151c0b2f7Stbbdev // Methods of ipc_server
83251c0b2f7Stbbdev //------------------------------------------------------------------------
83351c0b2f7Stbbdev ipc_server::ipc_server(tbb_client& client) :
83451c0b2f7Stbbdev     my_client( client ),
83551c0b2f7Stbbdev     my_stack_size( client.min_stack_size() ),
83651c0b2f7Stbbdev     my_thread_array(nullptr),
83751c0b2f7Stbbdev     my_join_workers(false),
83851c0b2f7Stbbdev     my_waker(nullptr),
83951c0b2f7Stbbdev     my_stopper(nullptr)
84051c0b2f7Stbbdev {
84151c0b2f7Stbbdev     my_ref_count = 1;
84251c0b2f7Stbbdev     my_slack = 0;
84351c0b2f7Stbbdev #if TBB_USE_ASSERT
84451c0b2f7Stbbdev     my_net_slack_requests = 0;
84551c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
84651c0b2f7Stbbdev     my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME);
84751c0b2f7Stbbdev     if( my_n_thread==0 ) {
84851c0b2f7Stbbdev         my_n_thread = tbb::detail::r1::AvailableHwConcurrency();
84951c0b2f7Stbbdev         __TBB_ASSERT( my_n_thread>0, nullptr );
85051c0b2f7Stbbdev     }
85151c0b2f7Stbbdev 
85251c0b2f7Stbbdev     my_asleep_list_root = nullptr;
85351c0b2f7Stbbdev     my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread );
85451c0b2f7Stbbdev     for( size_t i=0; i<my_n_thread; ++i ) {
85551c0b2f7Stbbdev         ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i );
85651c0b2f7Stbbdev         t->my_next = my_asleep_list_root;
85751c0b2f7Stbbdev         my_asleep_list_root = t;
85851c0b2f7Stbbdev     }
85951c0b2f7Stbbdev 
86051c0b2f7Stbbdev     my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1);
86151c0b2f7Stbbdev     new( my_waker ) ipc_waker( *this, client, my_n_thread );
86251c0b2f7Stbbdev 
86351c0b2f7Stbbdev     my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1);
86451c0b2f7Stbbdev     new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 );
86551c0b2f7Stbbdev 
86651c0b2f7Stbbdev     char* active_sem_name = get_active_sem_name();
86751c0b2f7Stbbdev     my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 );
86851c0b2f7Stbbdev     __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
86951c0b2f7Stbbdev     delete[] active_sem_name;
87051c0b2f7Stbbdev 
87151c0b2f7Stbbdev     char* stop_sem_name = get_stop_sem_name();
87251c0b2f7Stbbdev     my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 );
87351c0b2f7Stbbdev     __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" );
87451c0b2f7Stbbdev     delete[] stop_sem_name;
87551c0b2f7Stbbdev }
87651c0b2f7Stbbdev 
87751c0b2f7Stbbdev ipc_server::~ipc_server() {
87851c0b2f7Stbbdev     __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr );
87951c0b2f7Stbbdev 
88051c0b2f7Stbbdev     for( size_t i=my_n_thread; i--; )
88151c0b2f7Stbbdev         my_thread_array[i].~padded_ipc_worker();
88251c0b2f7Stbbdev     tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread );
88351c0b2f7Stbbdev     tbb::detail::d0::poison_pointer( my_thread_array );
88451c0b2f7Stbbdev 
88551c0b2f7Stbbdev     my_waker->~ipc_waker();
88651c0b2f7Stbbdev     tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 );
88751c0b2f7Stbbdev     tbb::detail::d0::poison_pointer( my_waker );
88851c0b2f7Stbbdev 
88951c0b2f7Stbbdev     my_stopper->~ipc_stopper();
89051c0b2f7Stbbdev     tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 );
89151c0b2f7Stbbdev     tbb::detail::d0::poison_pointer( my_stopper );
89251c0b2f7Stbbdev 
89351c0b2f7Stbbdev     sem_close( my_active_sem );
89451c0b2f7Stbbdev     sem_close( my_stop_sem );
89551c0b2f7Stbbdev }
89651c0b2f7Stbbdev 
89751c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) {
89851c0b2f7Stbbdev     asleep_list_mutex_type::scoped_lock lock;
89951c0b2f7Stbbdev     if( !lock.try_acquire( my_asleep_list_mutex ) )
90051c0b2f7Stbbdev         return false;
90151c0b2f7Stbbdev     // Contribute to slack under lock so that if another takes that unit of slack,
90251c0b2f7Stbbdev     // it sees us sleeping on the list and wakes us up.
90351c0b2f7Stbbdev     int k = ++my_slack;
90451c0b2f7Stbbdev     if( k<=0 ) {
90551c0b2f7Stbbdev         t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
90651c0b2f7Stbbdev         my_asleep_list_root.store(&t, std::memory_order_relaxed);
90751c0b2f7Stbbdev         return true;
90851c0b2f7Stbbdev     } else {
90951c0b2f7Stbbdev         --my_slack;
91051c0b2f7Stbbdev         return false;
91151c0b2f7Stbbdev     }
91251c0b2f7Stbbdev }
91351c0b2f7Stbbdev 
91451c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) {
91551c0b2f7Stbbdev     asleep_list_mutex_type::scoped_lock lock;
91651c0b2f7Stbbdev     if( !lock.try_acquire( my_asleep_list_mutex ) )
91751c0b2f7Stbbdev         return false;
91851c0b2f7Stbbdev     // Contribute to slack under lock so that if another takes that unit of slack,
91951c0b2f7Stbbdev     // it sees us sleeping on the list and wakes us up.
92051c0b2f7Stbbdev     ++my_slack;
92151c0b2f7Stbbdev     t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
92251c0b2f7Stbbdev     my_asleep_list_root.store(&t, std::memory_order_relaxed);
92351c0b2f7Stbbdev     return true;
92451c0b2f7Stbbdev }
92551c0b2f7Stbbdev 
92651c0b2f7Stbbdev inline bool ipc_server::wait_active_thread() {
92751c0b2f7Stbbdev     if( sem_wait( my_active_sem ) == 0 ) {
92851c0b2f7Stbbdev         ++my_global_thread_count;
92951c0b2f7Stbbdev         return true;
93051c0b2f7Stbbdev     }
93151c0b2f7Stbbdev     return false;
93251c0b2f7Stbbdev }
93351c0b2f7Stbbdev 
93451c0b2f7Stbbdev inline bool ipc_server::try_get_active_thread() {
93551c0b2f7Stbbdev     if( sem_trywait( my_active_sem ) == 0 ) {
93651c0b2f7Stbbdev         ++my_global_thread_count;
93751c0b2f7Stbbdev         return true;
93851c0b2f7Stbbdev     }
93951c0b2f7Stbbdev     return false;
94051c0b2f7Stbbdev }
94151c0b2f7Stbbdev 
94251c0b2f7Stbbdev inline void ipc_server::release_active_thread() {
94351c0b2f7Stbbdev     release_thread_sem( my_active_sem );
94451c0b2f7Stbbdev }
94551c0b2f7Stbbdev 
94651c0b2f7Stbbdev inline bool ipc_server::wait_stop_thread() {
94751c0b2f7Stbbdev     struct timespec ts;
94851c0b2f7Stbbdev     if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) {
94951c0b2f7Stbbdev         ts.tv_sec++;
95051c0b2f7Stbbdev         if( sem_timedwait( my_stop_sem, &ts )==0 ) {
95151c0b2f7Stbbdev             return true;
95251c0b2f7Stbbdev         }
95351c0b2f7Stbbdev     }
95451c0b2f7Stbbdev     return false;
95551c0b2f7Stbbdev }
95651c0b2f7Stbbdev 
95751c0b2f7Stbbdev inline void ipc_server::add_stop_thread() {
95851c0b2f7Stbbdev     sem_post( my_stop_sem );
95951c0b2f7Stbbdev }
96051c0b2f7Stbbdev 
96151c0b2f7Stbbdev void ipc_server::wake_some( int additional_slack, int active_threads ) {
96251c0b2f7Stbbdev     __TBB_ASSERT( additional_slack>=0, nullptr );
96351c0b2f7Stbbdev     ipc_worker* wakee[2];
96451c0b2f7Stbbdev     ipc_worker **w = wakee;
96551c0b2f7Stbbdev     {
96651c0b2f7Stbbdev         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
96751c0b2f7Stbbdev         while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) {
96851c0b2f7Stbbdev             if( additional_slack>0 ) {
96951c0b2f7Stbbdev                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
97051c0b2f7Stbbdev                     break;
97151c0b2f7Stbbdev                 --additional_slack;
97251c0b2f7Stbbdev             } else {
97351c0b2f7Stbbdev                 // Chain reaction; Try to claim unit of slack
97451c0b2f7Stbbdev                 int old;
97551c0b2f7Stbbdev                 do {
97651c0b2f7Stbbdev                     old = my_slack.load(std::memory_order_relaxed);
97751c0b2f7Stbbdev                     if( old<=0 ) goto done;
97851c0b2f7Stbbdev                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
97951c0b2f7Stbbdev             }
98051c0b2f7Stbbdev             // Pop sleeping worker to combine with claimed unit of slack
98151c0b2f7Stbbdev             my_asleep_list_root.store(
98251c0b2f7Stbbdev                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
98351c0b2f7Stbbdev                 std::memory_order_relaxed
98451c0b2f7Stbbdev             );
98551c0b2f7Stbbdev             --active_threads;
98651c0b2f7Stbbdev         }
98751c0b2f7Stbbdev         if( additional_slack ) {
98851c0b2f7Stbbdev             // Contribute our unused slack to my_slack.
98951c0b2f7Stbbdev             my_slack += additional_slack;
99051c0b2f7Stbbdev         }
99151c0b2f7Stbbdev     }
99251c0b2f7Stbbdev done:
99351c0b2f7Stbbdev     while( w>wakee ) {
99451c0b2f7Stbbdev         if( !(*--w)->wake_or_launch() ) {
99551c0b2f7Stbbdev             add_stop_thread();
99651c0b2f7Stbbdev             do {
99751c0b2f7Stbbdev             } while( !try_insert_in_asleep_list_forced(**w) );
99851c0b2f7Stbbdev             release_active_thread();
99951c0b2f7Stbbdev         }
100051c0b2f7Stbbdev     }
100151c0b2f7Stbbdev     while( active_threads ) {
100251c0b2f7Stbbdev         release_active_thread();
100351c0b2f7Stbbdev         --active_threads;
100451c0b2f7Stbbdev     }
100551c0b2f7Stbbdev }
100651c0b2f7Stbbdev 
100751c0b2f7Stbbdev void ipc_server::wake_one_forced( int additional_slack ) {
100851c0b2f7Stbbdev     __TBB_ASSERT( additional_slack>=0, nullptr );
100951c0b2f7Stbbdev     ipc_worker* wakee[1];
101051c0b2f7Stbbdev     ipc_worker **w = wakee;
101151c0b2f7Stbbdev     {
101251c0b2f7Stbbdev         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
101351c0b2f7Stbbdev         while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) {
101451c0b2f7Stbbdev             if( additional_slack>0 ) {
101551c0b2f7Stbbdev                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
101651c0b2f7Stbbdev                     break;
101751c0b2f7Stbbdev                 --additional_slack;
101851c0b2f7Stbbdev             } else {
101951c0b2f7Stbbdev                 // Chain reaction; Try to claim unit of slack
102051c0b2f7Stbbdev                 int old;
102151c0b2f7Stbbdev                 do {
102251c0b2f7Stbbdev                     old = my_slack.load(std::memory_order_relaxed);
102351c0b2f7Stbbdev                     if( old<=0 ) goto done;
102451c0b2f7Stbbdev                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
102551c0b2f7Stbbdev             }
102651c0b2f7Stbbdev             // Pop sleeping worker to combine with claimed unit of slack
102751c0b2f7Stbbdev             my_asleep_list_root.store(
102851c0b2f7Stbbdev                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
102951c0b2f7Stbbdev                 std::memory_order_relaxed);
103051c0b2f7Stbbdev         }
103151c0b2f7Stbbdev         if( additional_slack ) {
103251c0b2f7Stbbdev             // Contribute our unused slack to my_slack.
103351c0b2f7Stbbdev             my_slack += additional_slack;
103451c0b2f7Stbbdev         }
103551c0b2f7Stbbdev     }
103651c0b2f7Stbbdev done:
103751c0b2f7Stbbdev     while( w>wakee ) {
103851c0b2f7Stbbdev         if( !(*--w)->wake_or_launch() ) {
103951c0b2f7Stbbdev             add_stop_thread();
104051c0b2f7Stbbdev             do {
104151c0b2f7Stbbdev             } while( !try_insert_in_asleep_list_forced(**w) );
104251c0b2f7Stbbdev         }
104351c0b2f7Stbbdev     }
104451c0b2f7Stbbdev }
104551c0b2f7Stbbdev 
104651c0b2f7Stbbdev bool ipc_server::stop_one() {
104751c0b2f7Stbbdev     ipc_worker* current = nullptr;
104851c0b2f7Stbbdev     ipc_worker* next = nullptr;
104951c0b2f7Stbbdev     {
105051c0b2f7Stbbdev         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
105151c0b2f7Stbbdev         if( my_asleep_list_root.load(std::memory_order_relaxed) ) {
105251c0b2f7Stbbdev             current = my_asleep_list_root.load(std::memory_order_relaxed);
105351c0b2f7Stbbdev             if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
105451c0b2f7Stbbdev                 next = current->my_next;
105551c0b2f7Stbbdev                 while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
105651c0b2f7Stbbdev                     current = next;
105751c0b2f7Stbbdev                     next = current->my_next;
105851c0b2f7Stbbdev                 }
105951c0b2f7Stbbdev                 current->start_stopping( my_join_workers );
106051c0b2f7Stbbdev                 return true;
106151c0b2f7Stbbdev             }
106251c0b2f7Stbbdev         }
106351c0b2f7Stbbdev     }
106451c0b2f7Stbbdev     return false;
106551c0b2f7Stbbdev }
106651c0b2f7Stbbdev 
106751c0b2f7Stbbdev void ipc_server::adjust_job_count_estimate( int delta ) {
106851c0b2f7Stbbdev #if TBB_USE_ASSERT
106951c0b2f7Stbbdev     my_net_slack_requests+=delta;
107051c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
107151c0b2f7Stbbdev     if( my_n_thread > 1 ) {
107251c0b2f7Stbbdev         if( delta<0 ) {
107351c0b2f7Stbbdev             my_slack+=delta;
107451c0b2f7Stbbdev         } else if( delta>0 ) {
107551c0b2f7Stbbdev             int active_threads = 0;
107651c0b2f7Stbbdev             if( try_get_active_thread() ) {
107751c0b2f7Stbbdev                 ++active_threads;
107851c0b2f7Stbbdev                 if( try_get_active_thread() ) {
107951c0b2f7Stbbdev                     ++active_threads;
108051c0b2f7Stbbdev                 }
108151c0b2f7Stbbdev             }
108251c0b2f7Stbbdev             wake_some( delta, active_threads );
108351c0b2f7Stbbdev 
108451c0b2f7Stbbdev             if( !my_waker->wake_or_launch() ) {
108551c0b2f7Stbbdev                 add_stop_thread();
108651c0b2f7Stbbdev             }
108751c0b2f7Stbbdev             if( !my_stopper->wake_or_launch() ) {
108851c0b2f7Stbbdev                 add_stop_thread();
108951c0b2f7Stbbdev             }
109051c0b2f7Stbbdev         }
109151c0b2f7Stbbdev     } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one
109251c0b2f7Stbbdev         if( delta<0 ) {
109351c0b2f7Stbbdev             my_slack += delta;
109451c0b2f7Stbbdev         } else {
109551c0b2f7Stbbdev             wake_one_forced( delta );
109651c0b2f7Stbbdev         }
109751c0b2f7Stbbdev     }
109851c0b2f7Stbbdev }
109951c0b2f7Stbbdev 
110051c0b2f7Stbbdev //------------------------------------------------------------------------
110151c0b2f7Stbbdev // RML factory methods
110251c0b2f7Stbbdev //------------------------------------------------------------------------
110351c0b2f7Stbbdev 
110451c0b2f7Stbbdev #if USE_PTHREAD
110551c0b2f7Stbbdev 
110651c0b2f7Stbbdev static tbb_client* my_global_client = nullptr;
110751c0b2f7Stbbdev static tbb_server* my_global_server = nullptr;
110851c0b2f7Stbbdev 
110951c0b2f7Stbbdev void rml_atexit() {
111051c0b2f7Stbbdev     release_resources();
111151c0b2f7Stbbdev }
111251c0b2f7Stbbdev 
111351c0b2f7Stbbdev void rml_atfork_child() {
111451c0b2f7Stbbdev     if( my_global_server!=nullptr && my_global_client!=nullptr ) {
111551c0b2f7Stbbdev         ipc_server* server = static_cast<ipc_server*>( my_global_server );
111651c0b2f7Stbbdev         server->~ipc_server();
111751c0b2f7Stbbdev         // memset( server, 0, sizeof(ipc_server) );
111851c0b2f7Stbbdev         new( server ) ipc_server( *my_global_client );
111951c0b2f7Stbbdev         pthread_atfork( nullptr, nullptr, rml_atfork_child );
112051c0b2f7Stbbdev         atexit( rml_atexit );
112151c0b2f7Stbbdev     }
112251c0b2f7Stbbdev }
112351c0b2f7Stbbdev 
112451c0b2f7Stbbdev #endif /* USE_PTHREAD */
112551c0b2f7Stbbdev 
112651c0b2f7Stbbdev extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) {
112751c0b2f7Stbbdev     server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client);
112851c0b2f7Stbbdev #if USE_PTHREAD
112951c0b2f7Stbbdev     my_global_client = &client;
113051c0b2f7Stbbdev     my_global_server = server;
113151c0b2f7Stbbdev     pthread_atfork( nullptr, nullptr, rml_atfork_child );
113251c0b2f7Stbbdev     atexit( rml_atexit );
113351c0b2f7Stbbdev #endif /* USE_PTHREAD */
113451c0b2f7Stbbdev     if( getenv( "RML_DEBUG" ) ) {
113551c0b2f7Stbbdev         runtime_warning("IPC server is started");
113651c0b2f7Stbbdev     }
113751c0b2f7Stbbdev     return tbb_factory::st_success;
113851c0b2f7Stbbdev }
113951c0b2f7Stbbdev 
114051c0b2f7Stbbdev extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) {
114151c0b2f7Stbbdev }
114251c0b2f7Stbbdev 
114351c0b2f7Stbbdev } // namespace rml
114451c0b2f7Stbbdev } // namespace detail
114551c0b2f7Stbbdev 
114651c0b2f7Stbbdev } // namespace tbb
1147