xref: /oneTBB/python/rml/ipc_server.cpp (revision 51c0b2f7)
1*51c0b2f7Stbbdev /*
2*51c0b2f7Stbbdev     Copyright (c) 2017-2020 Intel Corporation
3*51c0b2f7Stbbdev 
4*51c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5*51c0b2f7Stbbdev     you may not use this file except in compliance with the License.
6*51c0b2f7Stbbdev     You may obtain a copy of the License at
7*51c0b2f7Stbbdev 
8*51c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
9*51c0b2f7Stbbdev 
10*51c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
11*51c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12*51c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*51c0b2f7Stbbdev     See the License for the specific language governing permissions and
14*51c0b2f7Stbbdev     limitations under the License.
15*51c0b2f7Stbbdev */
16*51c0b2f7Stbbdev 
17*51c0b2f7Stbbdev #include <atomic>
18*51c0b2f7Stbbdev #include <cstring>
19*51c0b2f7Stbbdev #include <cstdlib>
20*51c0b2f7Stbbdev 
21*51c0b2f7Stbbdev #include "../../src/tbb/rml_tbb.h"
22*51c0b2f7Stbbdev #include "../../src/tbb/rml_thread_monitor.h"
23*51c0b2f7Stbbdev #include "../../src/tbb/scheduler_common.h"
24*51c0b2f7Stbbdev #include "../../src/tbb/governor.h"
25*51c0b2f7Stbbdev #include "../../src/tbb/misc.h"
26*51c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h"
27*51c0b2f7Stbbdev 
28*51c0b2f7Stbbdev #include "ipc_utils.h"
29*51c0b2f7Stbbdev 
30*51c0b2f7Stbbdev #include <fcntl.h>
31*51c0b2f7Stbbdev #include <stdlib.h>
32*51c0b2f7Stbbdev 
33*51c0b2f7Stbbdev namespace rml {
34*51c0b2f7Stbbdev namespace internal {
35*51c0b2f7Stbbdev 
36*51c0b2f7Stbbdev static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE";
37*51c0b2f7Stbbdev 
38*51c0b2f7Stbbdev typedef versioned_object::version_type version_type;
39*51c0b2f7Stbbdev 
40*51c0b2f7Stbbdev extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) {
41*51c0b2f7Stbbdev     if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) {
42*51c0b2f7Stbbdev         return factory::st_incompatible;
43*51c0b2f7Stbbdev     }
44*51c0b2f7Stbbdev 
45*51c0b2f7Stbbdev     // Hack to keep this library from being closed
46*51c0b2f7Stbbdev     static std::atomic<bool> one_time_flag{false};
47*51c0b2f7Stbbdev     bool expected = false;
48*51c0b2f7Stbbdev 
49*51c0b2f7Stbbdev     if( one_time_flag.compare_exchange_strong(expected, true) ) {
50*51c0b2f7Stbbdev         __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr );
51*51c0b2f7Stbbdev #if _WIN32||_WIN64
52*51c0b2f7Stbbdev         f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
53*51c0b2f7Stbbdev #else
54*51c0b2f7Stbbdev         f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
55*51c0b2f7Stbbdev #endif
56*51c0b2f7Stbbdev     }
57*51c0b2f7Stbbdev     // End of hack
58*51c0b2f7Stbbdev 
59*51c0b2f7Stbbdev     return factory::st_success;
60*51c0b2f7Stbbdev }
61*51c0b2f7Stbbdev 
62*51c0b2f7Stbbdev extern "C" void __RML_close_factory(factory& /*f*/) {
63*51c0b2f7Stbbdev }
64*51c0b2f7Stbbdev 
65*51c0b2f7Stbbdev class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor {
66*51c0b2f7Stbbdev public:
67*51c0b2f7Stbbdev     ipc_thread_monitor() : thread_monitor() {}
68*51c0b2f7Stbbdev 
69*51c0b2f7Stbbdev #if USE_WINTHREAD
70*51c0b2f7Stbbdev #elif USE_PTHREAD
71*51c0b2f7Stbbdev     static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size);
72*51c0b2f7Stbbdev #endif
73*51c0b2f7Stbbdev };
74*51c0b2f7Stbbdev 
75*51c0b2f7Stbbdev #if USE_WINTHREAD
76*51c0b2f7Stbbdev #elif USE_PTHREAD
77*51c0b2f7Stbbdev inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) {
78*51c0b2f7Stbbdev     pthread_attr_t s;
79*51c0b2f7Stbbdev     if( pthread_attr_init( &s ) ) return 0;
80*51c0b2f7Stbbdev     if( stack_size>0 ) {
81*51c0b2f7Stbbdev         if( pthread_attr_setstacksize( &s, stack_size ) ) return 0;
82*51c0b2f7Stbbdev     }
83*51c0b2f7Stbbdev     pthread_t handle;
84*51c0b2f7Stbbdev     if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0;
85*51c0b2f7Stbbdev     if( pthread_attr_destroy( &s ) ) return 0;
86*51c0b2f7Stbbdev     return handle;
87*51c0b2f7Stbbdev }
88*51c0b2f7Stbbdev #endif
89*51c0b2f7Stbbdev 
90*51c0b2f7Stbbdev }} // rml::internal
91*51c0b2f7Stbbdev 
92*51c0b2f7Stbbdev using rml::internal::ipc_thread_monitor;
93*51c0b2f7Stbbdev using tbb::internal::rml::get_shared_name;
94*51c0b2f7Stbbdev 
95*51c0b2f7Stbbdev namespace tbb {
96*51c0b2f7Stbbdev namespace detail {
97*51c0b2f7Stbbdev 
98*51c0b2f7Stbbdev namespace r1 {
99*51c0b2f7Stbbdev bool terminate_on_exception() {
100*51c0b2f7Stbbdev     return false;
101*51c0b2f7Stbbdev }
102*51c0b2f7Stbbdev }
103*51c0b2f7Stbbdev 
104*51c0b2f7Stbbdev namespace rml {
105*51c0b2f7Stbbdev 
106*51c0b2f7Stbbdev typedef ipc_thread_monitor::handle_type thread_handle;
107*51c0b2f7Stbbdev 
108*51c0b2f7Stbbdev class ipc_server;
109*51c0b2f7Stbbdev 
110*51c0b2f7Stbbdev static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS";
111*51c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active";
112*51c0b2f7Stbbdev static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop";
113*51c0b2f7Stbbdev static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE";
114*51c0b2f7Stbbdev static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE";
115*51c0b2f7Stbbdev static const mode_t IPC_SEM_MODE = 0660;
116*51c0b2f7Stbbdev 
117*51c0b2f7Stbbdev static std::atomic<int> my_global_thread_count;
118*51c0b2f7Stbbdev using tbb_client = tbb::detail::r1::rml::tbb_client;
119*51c0b2f7Stbbdev using tbb_server = tbb::detail::r1::rml::tbb_server;
120*51c0b2f7Stbbdev using tbb_factory = tbb::detail::r1::rml::tbb_factory;
121*51c0b2f7Stbbdev 
122*51c0b2f7Stbbdev using tbb::detail::r1::runtime_warning;
123*51c0b2f7Stbbdev 
124*51c0b2f7Stbbdev char* get_sem_name(const char* name, const char* prefix) {
125*51c0b2f7Stbbdev     __TBB_ASSERT(name != nullptr, nullptr);
126*51c0b2f7Stbbdev     __TBB_ASSERT(prefix != nullptr, nullptr);
127*51c0b2f7Stbbdev     char* value = std::getenv(name);
128*51c0b2f7Stbbdev     std::size_t len = value == nullptr ? 0 : std::strlen(value);
129*51c0b2f7Stbbdev     if (len > 0) {
130*51c0b2f7Stbbdev         // TODO: consider returning the original string instead of the copied string.
131*51c0b2f7Stbbdev         char* sem_name = new char[len + 1];
132*51c0b2f7Stbbdev         __TBB_ASSERT(sem_name != nullptr, nullptr);
133*51c0b2f7Stbbdev         std::strncpy(sem_name, value, len+1);
134*51c0b2f7Stbbdev         __TBB_ASSERT(sem_name[len] == 0, nullptr);
135*51c0b2f7Stbbdev         return sem_name;
136*51c0b2f7Stbbdev     } else {
137*51c0b2f7Stbbdev         return get_shared_name(prefix);
138*51c0b2f7Stbbdev     }
139*51c0b2f7Stbbdev }
140*51c0b2f7Stbbdev 
141*51c0b2f7Stbbdev char* get_active_sem_name() {
142*51c0b2f7Stbbdev     return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
143*51c0b2f7Stbbdev }
144*51c0b2f7Stbbdev 
145*51c0b2f7Stbbdev char* get_stop_sem_name() {
146*51c0b2f7Stbbdev     return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
147*51c0b2f7Stbbdev }
148*51c0b2f7Stbbdev 
149*51c0b2f7Stbbdev static void release_thread_sem(sem_t* my_sem) {
150*51c0b2f7Stbbdev     int old = my_global_thread_count.load(std::memory_order_relaxed);
151*51c0b2f7Stbbdev     do {
152*51c0b2f7Stbbdev         if( old<=0 ) return;
153*51c0b2f7Stbbdev     } while( !my_global_thread_count.compare_exchange_strong(old, old-1) );
154*51c0b2f7Stbbdev     if( old>0 ) {
155*51c0b2f7Stbbdev         sem_post( my_sem );
156*51c0b2f7Stbbdev     }
157*51c0b2f7Stbbdev }
158*51c0b2f7Stbbdev 
159*51c0b2f7Stbbdev void set_sem_name(const char* name, const char* prefix) {
160*51c0b2f7Stbbdev     __TBB_ASSERT(name != nullptr, nullptr);
161*51c0b2f7Stbbdev     __TBB_ASSERT(prefix != nullptr, nullptr);
162*51c0b2f7Stbbdev     const char* postfix = "_XXXXXX";
163*51c0b2f7Stbbdev     std::size_t plen = std::strlen(prefix);
164*51c0b2f7Stbbdev     std::size_t xlen = std::strlen(postfix);
165*51c0b2f7Stbbdev     char* templ = new char[plen + xlen + 1];
166*51c0b2f7Stbbdev     __TBB_ASSERT(templ != nullptr, nullptr);
167*51c0b2f7Stbbdev     strncpy(templ, prefix, plen+1);
168*51c0b2f7Stbbdev     __TBB_ASSERT(templ[plen] == 0, nullptr);
169*51c0b2f7Stbbdev     strncat(templ, postfix, xlen + 1);
170*51c0b2f7Stbbdev     __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr);
171*51c0b2f7Stbbdev     // TODO: consider using mkstemp instead of mktemp.
172*51c0b2f7Stbbdev     char* sem_name = mktemp(templ);
173*51c0b2f7Stbbdev     if (sem_name != nullptr) {
174*51c0b2f7Stbbdev         int status = setenv(name, sem_name,  /*overwrite*/ 1);
175*51c0b2f7Stbbdev         __TBB_ASSERT_EX(status == 0, nullptr);
176*51c0b2f7Stbbdev     }
177*51c0b2f7Stbbdev     delete[] templ;
178*51c0b2f7Stbbdev }
179*51c0b2f7Stbbdev 
180*51c0b2f7Stbbdev extern "C" void set_active_sem_name() {
181*51c0b2f7Stbbdev     set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
182*51c0b2f7Stbbdev }
183*51c0b2f7Stbbdev 
184*51c0b2f7Stbbdev extern "C" void set_stop_sem_name() {
185*51c0b2f7Stbbdev     set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
186*51c0b2f7Stbbdev }
187*51c0b2f7Stbbdev 
188*51c0b2f7Stbbdev extern "C" void release_resources() {
189*51c0b2f7Stbbdev     if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) {
190*51c0b2f7Stbbdev         char* active_sem_name = get_active_sem_name();
191*51c0b2f7Stbbdev         sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT );
192*51c0b2f7Stbbdev         __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
193*51c0b2f7Stbbdev         delete[] active_sem_name;
194*51c0b2f7Stbbdev 
195*51c0b2f7Stbbdev         do {
196*51c0b2f7Stbbdev             release_thread_sem( my_active_sem );
197*51c0b2f7Stbbdev         } while( my_global_thread_count.load(std::memory_order_acquire)!=0 );
198*51c0b2f7Stbbdev     }
199*51c0b2f7Stbbdev }
200*51c0b2f7Stbbdev 
201*51c0b2f7Stbbdev extern "C" void release_semaphores() {
202*51c0b2f7Stbbdev     int status = 0;
203*51c0b2f7Stbbdev     char* sem_name = nullptr;
204*51c0b2f7Stbbdev 
205*51c0b2f7Stbbdev     sem_name = get_active_sem_name();
206*51c0b2f7Stbbdev     if( sem_name==nullptr ) {
207*51c0b2f7Stbbdev         runtime_warning("Can not get RML semaphore name");
208*51c0b2f7Stbbdev         return;
209*51c0b2f7Stbbdev     }
210*51c0b2f7Stbbdev     status = sem_unlink( sem_name );
211*51c0b2f7Stbbdev     if( status!=0 ) {
212*51c0b2f7Stbbdev         if( errno==ENOENT ) {
213*51c0b2f7Stbbdev             /* There is no semaphore with the given name, nothing to do */
214*51c0b2f7Stbbdev         } else {
215*51c0b2f7Stbbdev             runtime_warning("Can not release RML semaphore");
216*51c0b2f7Stbbdev             return;
217*51c0b2f7Stbbdev         }
218*51c0b2f7Stbbdev     }
219*51c0b2f7Stbbdev     delete[] sem_name;
220*51c0b2f7Stbbdev 
221*51c0b2f7Stbbdev     sem_name = get_stop_sem_name();
222*51c0b2f7Stbbdev     if( sem_name==nullptr ) {
223*51c0b2f7Stbbdev         runtime_warning( "Can not get RML semaphore name" );
224*51c0b2f7Stbbdev         return;
225*51c0b2f7Stbbdev     }
226*51c0b2f7Stbbdev     status = sem_unlink( sem_name );
227*51c0b2f7Stbbdev     if( status!=0 ) {
228*51c0b2f7Stbbdev         if( errno==ENOENT ) {
229*51c0b2f7Stbbdev             /* There is no semaphore with the given name, nothing to do */
230*51c0b2f7Stbbdev         } else {
231*51c0b2f7Stbbdev             runtime_warning("Can not release RML semaphore");
232*51c0b2f7Stbbdev             return;
233*51c0b2f7Stbbdev         }
234*51c0b2f7Stbbdev     }
235*51c0b2f7Stbbdev     delete[] sem_name;
236*51c0b2f7Stbbdev }
237*51c0b2f7Stbbdev 
238*51c0b2f7Stbbdev class ipc_worker: no_copy {
239*51c0b2f7Stbbdev protected:
240*51c0b2f7Stbbdev     //! State in finite-state machine that controls the worker.
241*51c0b2f7Stbbdev     /** State diagram:
242*51c0b2f7Stbbdev                     /----------stop---\
243*51c0b2f7Stbbdev                     |           ^     |
244*51c0b2f7Stbbdev                     V           |     |
245*51c0b2f7Stbbdev         init --> starting --> normal  |
246*51c0b2f7Stbbdev           |         |           |     |
247*51c0b2f7Stbbdev           |         V           |     |
248*51c0b2f7Stbbdev           \------> quit <-------/<----/
249*51c0b2f7Stbbdev       */
250*51c0b2f7Stbbdev     enum state_t {
251*51c0b2f7Stbbdev         //! *this is initialized
252*51c0b2f7Stbbdev         st_init,
253*51c0b2f7Stbbdev         //! *this has associated thread that is starting up.
254*51c0b2f7Stbbdev         st_starting,
255*51c0b2f7Stbbdev         //! Associated thread is doing normal life sequence.
256*51c0b2f7Stbbdev         st_normal,
257*51c0b2f7Stbbdev         //! Associated thread is stopped but can be started again.
258*51c0b2f7Stbbdev         st_stop,
259*51c0b2f7Stbbdev         //! Associated thread has ended normal life sequence and promises to never touch *this again.
260*51c0b2f7Stbbdev         st_quit
261*51c0b2f7Stbbdev     };
262*51c0b2f7Stbbdev     std::atomic<state_t> my_state;
263*51c0b2f7Stbbdev 
264*51c0b2f7Stbbdev     //! Associated server
265*51c0b2f7Stbbdev     ipc_server& my_server;
266*51c0b2f7Stbbdev 
267*51c0b2f7Stbbdev     //! Associated client
268*51c0b2f7Stbbdev     tbb_client& my_client;
269*51c0b2f7Stbbdev 
270*51c0b2f7Stbbdev     //! index used for avoiding the 64K aliasing problem
271*51c0b2f7Stbbdev     const size_t my_index;
272*51c0b2f7Stbbdev 
273*51c0b2f7Stbbdev     //! Monitor for sleeping when there is no work to do.
274*51c0b2f7Stbbdev     /** The invariant that holds for sleeping workers is:
275*51c0b2f7Stbbdev         "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
276*51c0b2f7Stbbdev     ipc_thread_monitor my_thread_monitor;
277*51c0b2f7Stbbdev 
278*51c0b2f7Stbbdev     //! Handle of the OS thread associated with this worker
279*51c0b2f7Stbbdev     thread_handle my_handle;
280*51c0b2f7Stbbdev 
281*51c0b2f7Stbbdev     //! Link for list of workers that are sleeping or have no associated thread.
282*51c0b2f7Stbbdev     ipc_worker* my_next;
283*51c0b2f7Stbbdev 
284*51c0b2f7Stbbdev     friend class ipc_server;
285*51c0b2f7Stbbdev 
286*51c0b2f7Stbbdev     //! Actions executed by the associated thread
287*51c0b2f7Stbbdev     void run();
288*51c0b2f7Stbbdev 
289*51c0b2f7Stbbdev     //! Wake up associated thread (or launch a thread if there is none)
290*51c0b2f7Stbbdev     bool wake_or_launch();
291*51c0b2f7Stbbdev 
292*51c0b2f7Stbbdev     //! Called by a thread (usually not the associated thread) to commence termination.
293*51c0b2f7Stbbdev     void start_shutdown(bool join);
294*51c0b2f7Stbbdev 
295*51c0b2f7Stbbdev     //! Called by a thread (usually not the associated thread) to commence stopping.
296*51c0b2f7Stbbdev     void start_stopping(bool join);
297*51c0b2f7Stbbdev 
298*51c0b2f7Stbbdev     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
299*51c0b2f7Stbbdev 
300*51c0b2f7Stbbdev     static void release_handle(thread_handle my_handle, bool join);
301*51c0b2f7Stbbdev 
302*51c0b2f7Stbbdev protected:
303*51c0b2f7Stbbdev     ipc_worker(ipc_server& server, tbb_client& client, const size_t i) :
304*51c0b2f7Stbbdev         my_server(server),
305*51c0b2f7Stbbdev         my_client(client),
306*51c0b2f7Stbbdev         my_index(i)
307*51c0b2f7Stbbdev     {
308*51c0b2f7Stbbdev         my_state = st_init;
309*51c0b2f7Stbbdev     }
310*51c0b2f7Stbbdev };
311*51c0b2f7Stbbdev 
312*51c0b2f7Stbbdev //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit
313*51c0b2f7Stbbdev constexpr static size_t cache_line_sz = 128;
314*51c0b2f7Stbbdev 
315*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
316*51c0b2f7Stbbdev     // Suppress overzealous compiler warnings about uninstantiable class
317*51c0b2f7Stbbdev     #pragma warning(push)
318*51c0b2f7Stbbdev     #pragma warning(disable:4510 4610)
319*51c0b2f7Stbbdev #endif
320*51c0b2f7Stbbdev class padded_ipc_worker: public ipc_worker {
321*51c0b2f7Stbbdev     char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz];
322*51c0b2f7Stbbdev public:
323*51c0b2f7Stbbdev     padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i)
324*51c0b2f7Stbbdev     : ipc_worker( server,client,i ) { suppress_unused_warning(pad); }
325*51c0b2f7Stbbdev };
326*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
327*51c0b2f7Stbbdev     #pragma warning(pop)
328*51c0b2f7Stbbdev #endif
329*51c0b2f7Stbbdev 
330*51c0b2f7Stbbdev class ipc_waker : public padded_ipc_worker {
331*51c0b2f7Stbbdev private:
332*51c0b2f7Stbbdev     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
333*51c0b2f7Stbbdev     void run();
334*51c0b2f7Stbbdev     bool wake_or_launch();
335*51c0b2f7Stbbdev 
336*51c0b2f7Stbbdev     friend class ipc_server;
337*51c0b2f7Stbbdev 
338*51c0b2f7Stbbdev public:
339*51c0b2f7Stbbdev     ipc_waker(ipc_server& server, tbb_client& client, const size_t i)
340*51c0b2f7Stbbdev     : padded_ipc_worker( server, client, i ) {}
341*51c0b2f7Stbbdev };
342*51c0b2f7Stbbdev 
343*51c0b2f7Stbbdev class ipc_stopper : public padded_ipc_worker {
344*51c0b2f7Stbbdev private:
345*51c0b2f7Stbbdev     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
346*51c0b2f7Stbbdev     void run();
347*51c0b2f7Stbbdev     bool wake_or_launch();
348*51c0b2f7Stbbdev 
349*51c0b2f7Stbbdev     friend class ipc_server;
350*51c0b2f7Stbbdev 
351*51c0b2f7Stbbdev public:
352*51c0b2f7Stbbdev     ipc_stopper(ipc_server& server, tbb_client& client, const size_t i)
353*51c0b2f7Stbbdev     : padded_ipc_worker( server, client, i ) {}
354*51c0b2f7Stbbdev };
355*51c0b2f7Stbbdev 
356*51c0b2f7Stbbdev class ipc_server: public tbb_server, no_copy {
357*51c0b2f7Stbbdev private:
358*51c0b2f7Stbbdev     tbb_client& my_client;
359*51c0b2f7Stbbdev     //! Maximum number of threads to be created.
360*51c0b2f7Stbbdev     /** Threads are created lazily, so maximum might not actually be reached. */
361*51c0b2f7Stbbdev     tbb_client::size_type my_n_thread;
362*51c0b2f7Stbbdev 
363*51c0b2f7Stbbdev     //! Stack size for each thread. */
364*51c0b2f7Stbbdev     const size_t my_stack_size;
365*51c0b2f7Stbbdev 
366*51c0b2f7Stbbdev     //! Number of jobs that could use their associated thread minus number of active threads.
367*51c0b2f7Stbbdev     /** If negative, indicates oversubscription.
368*51c0b2f7Stbbdev         If positive, indicates that more threads should run.
369*51c0b2f7Stbbdev         Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
370*51c0b2f7Stbbdev         because raising it impacts the invariant for sleeping threads. */
371*51c0b2f7Stbbdev     std::atomic<int> my_slack;
372*51c0b2f7Stbbdev 
373*51c0b2f7Stbbdev     //! Counter used to determine when to delete this.
374*51c0b2f7Stbbdev     std::atomic<int> my_ref_count;
375*51c0b2f7Stbbdev 
376*51c0b2f7Stbbdev     padded_ipc_worker* my_thread_array;
377*51c0b2f7Stbbdev 
378*51c0b2f7Stbbdev     //! List of workers that are asleep or committed to sleeping until notified by another thread.
379*51c0b2f7Stbbdev     std::atomic<ipc_worker*> my_asleep_list_root;
380*51c0b2f7Stbbdev 
381*51c0b2f7Stbbdev     //! Protects my_asleep_list_root
382*51c0b2f7Stbbdev     typedef scheduler_mutex_type asleep_list_mutex_type;
383*51c0b2f7Stbbdev     asleep_list_mutex_type my_asleep_list_mutex;
384*51c0b2f7Stbbdev 
385*51c0b2f7Stbbdev     //! Should server wait workers while terminate
386*51c0b2f7Stbbdev     const bool my_join_workers;
387*51c0b2f7Stbbdev 
388*51c0b2f7Stbbdev     //! Service thread for waking of workers
389*51c0b2f7Stbbdev     ipc_waker* my_waker;
390*51c0b2f7Stbbdev 
391*51c0b2f7Stbbdev     //! Service thread to stop threads
392*51c0b2f7Stbbdev     ipc_stopper* my_stopper;
393*51c0b2f7Stbbdev 
394*51c0b2f7Stbbdev     //! Semaphore to account active threads
395*51c0b2f7Stbbdev     sem_t* my_active_sem;
396*51c0b2f7Stbbdev 
397*51c0b2f7Stbbdev     //! Semaphore to account stop threads
398*51c0b2f7Stbbdev     sem_t* my_stop_sem;
399*51c0b2f7Stbbdev 
400*51c0b2f7Stbbdev #if TBB_USE_ASSERT
401*51c0b2f7Stbbdev     std::atomic<int> my_net_slack_requests;
402*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
403*51c0b2f7Stbbdev 
404*51c0b2f7Stbbdev     //! Wake up to two sleeping workers, if there are any sleeping.
405*51c0b2f7Stbbdev     /** The call is used to propagate a chain reaction where each thread wakes up two threads,
406*51c0b2f7Stbbdev         which in turn each wake up two threads, etc. */
407*51c0b2f7Stbbdev     void propagate_chain_reaction() {
408*51c0b2f7Stbbdev         // First test of a double-check idiom.  Second test is inside wake_some(0).
409*51c0b2f7Stbbdev         if( my_slack.load(std::memory_order_acquire)>0 ) {
410*51c0b2f7Stbbdev             int active_threads = 0;
411*51c0b2f7Stbbdev             if( try_get_active_thread() ) {
412*51c0b2f7Stbbdev                 ++active_threads;
413*51c0b2f7Stbbdev                 if( try_get_active_thread() ) {
414*51c0b2f7Stbbdev                     ++active_threads;
415*51c0b2f7Stbbdev                 }
416*51c0b2f7Stbbdev                 wake_some( 0, active_threads );
417*51c0b2f7Stbbdev             }
418*51c0b2f7Stbbdev         }
419*51c0b2f7Stbbdev     }
420*51c0b2f7Stbbdev 
421*51c0b2f7Stbbdev     //! Try to add t to list of sleeping workers
422*51c0b2f7Stbbdev     bool try_insert_in_asleep_list(ipc_worker& t);
423*51c0b2f7Stbbdev 
424*51c0b2f7Stbbdev     //! Try to add t to list of sleeping workers even if there is some work to do
425*51c0b2f7Stbbdev     bool try_insert_in_asleep_list_forced(ipc_worker& t);
426*51c0b2f7Stbbdev 
427*51c0b2f7Stbbdev     //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
428*51c0b2f7Stbbdev     void wake_some(int additional_slack, int active_threads);
429*51c0b2f7Stbbdev 
430*51c0b2f7Stbbdev     //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits.
431*51c0b2f7Stbbdev     void wake_one_forced(int additional_slack);
432*51c0b2f7Stbbdev 
433*51c0b2f7Stbbdev     //! Stop one thread from asleep list
434*51c0b2f7Stbbdev     bool stop_one();
435*51c0b2f7Stbbdev 
436*51c0b2f7Stbbdev     //! Wait for active thread
437*51c0b2f7Stbbdev     bool wait_active_thread();
438*51c0b2f7Stbbdev 
439*51c0b2f7Stbbdev     //! Try to get active thread
440*51c0b2f7Stbbdev     bool try_get_active_thread();
441*51c0b2f7Stbbdev 
442*51c0b2f7Stbbdev     //! Release active thread
443*51c0b2f7Stbbdev     void release_active_thread();
444*51c0b2f7Stbbdev 
445*51c0b2f7Stbbdev     //! Wait for thread to stop
446*51c0b2f7Stbbdev     bool wait_stop_thread();
447*51c0b2f7Stbbdev 
448*51c0b2f7Stbbdev     //! Add thread to stop list
449*51c0b2f7Stbbdev     void add_stop_thread();
450*51c0b2f7Stbbdev 
451*51c0b2f7Stbbdev     void remove_server_ref() {
452*51c0b2f7Stbbdev         if( --my_ref_count==0 ) {
453*51c0b2f7Stbbdev             my_client.acknowledge_close_connection();
454*51c0b2f7Stbbdev             this->~ipc_server();
455*51c0b2f7Stbbdev             tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 );
456*51c0b2f7Stbbdev         }
457*51c0b2f7Stbbdev     }
458*51c0b2f7Stbbdev 
459*51c0b2f7Stbbdev     friend class ipc_worker;
460*51c0b2f7Stbbdev     friend class ipc_waker;
461*51c0b2f7Stbbdev     friend class ipc_stopper;
462*51c0b2f7Stbbdev public:
463*51c0b2f7Stbbdev     ipc_server(tbb_client& client);
464*51c0b2f7Stbbdev     virtual ~ipc_server();
465*51c0b2f7Stbbdev 
466*51c0b2f7Stbbdev     version_type version() const override {
467*51c0b2f7Stbbdev         return 0;
468*51c0b2f7Stbbdev     }
469*51c0b2f7Stbbdev 
470*51c0b2f7Stbbdev     void request_close_connection(bool /*exiting*/) override {
471*51c0b2f7Stbbdev         my_waker->start_shutdown(false);
472*51c0b2f7Stbbdev         my_stopper->start_shutdown(false);
473*51c0b2f7Stbbdev         for( size_t i=0; i<my_n_thread; ++i )
474*51c0b2f7Stbbdev             my_thread_array[i].start_shutdown( my_join_workers );
475*51c0b2f7Stbbdev         remove_server_ref();
476*51c0b2f7Stbbdev     }
477*51c0b2f7Stbbdev 
478*51c0b2f7Stbbdev     void yield() override {d0::yield();}
479*51c0b2f7Stbbdev 
480*51c0b2f7Stbbdev     void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); }
481*51c0b2f7Stbbdev 
482*51c0b2f7Stbbdev     unsigned default_concurrency() const override { return my_n_thread - 1; }
483*51c0b2f7Stbbdev 
484*51c0b2f7Stbbdev     void adjust_job_count_estimate(int delta) override;
485*51c0b2f7Stbbdev 
486*51c0b2f7Stbbdev #if _WIN32||_WIN64
487*51c0b2f7Stbbdev     void register_master(::rml::server::execution_resource_t&) override {}
488*51c0b2f7Stbbdev     void unregister_master(::rml::server::execution_resource_t) override {}
489*51c0b2f7Stbbdev #endif /* _WIN32||_WIN64 */
490*51c0b2f7Stbbdev };
491*51c0b2f7Stbbdev 
492*51c0b2f7Stbbdev //------------------------------------------------------------------------
493*51c0b2f7Stbbdev // Methods of ipc_worker
494*51c0b2f7Stbbdev //------------------------------------------------------------------------
495*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
496*51c0b2f7Stbbdev     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
497*51c0b2f7Stbbdev     #pragma warning(push)
498*51c0b2f7Stbbdev     #pragma warning(disable:4189)
499*51c0b2f7Stbbdev #endif
500*51c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
501*51c0b2f7Stbbdev // ensure that stack is properly aligned
502*51c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
503*51c0b2f7Stbbdev #endif
504*51c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) {
505*51c0b2f7Stbbdev     ipc_worker* self = static_cast<ipc_worker*>(arg);
506*51c0b2f7Stbbdev     AVOID_64K_ALIASING( self->my_index );
507*51c0b2f7Stbbdev     self->run();
508*51c0b2f7Stbbdev     return 0;
509*51c0b2f7Stbbdev }
510*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
511*51c0b2f7Stbbdev     #pragma warning(pop)
512*51c0b2f7Stbbdev #endif
513*51c0b2f7Stbbdev 
514*51c0b2f7Stbbdev void ipc_worker::release_handle(thread_handle handle, bool join) {
515*51c0b2f7Stbbdev     if( join )
516*51c0b2f7Stbbdev         ipc_thread_monitor::join( handle );
517*51c0b2f7Stbbdev     else
518*51c0b2f7Stbbdev         ipc_thread_monitor::detach_thread( handle );
519*51c0b2f7Stbbdev }
520*51c0b2f7Stbbdev 
521*51c0b2f7Stbbdev void ipc_worker::start_shutdown(bool join) {
522*51c0b2f7Stbbdev     state_t s = my_state.load(std::memory_order_relaxed);;
523*51c0b2f7Stbbdev 
524*51c0b2f7Stbbdev     do {
525*51c0b2f7Stbbdev         __TBB_ASSERT( s!=st_quit, nullptr );
526*51c0b2f7Stbbdev     } while( !my_state.compare_exchange_strong( s, st_quit ) );
527*51c0b2f7Stbbdev     if( s==st_normal || s==st_starting ) {
528*51c0b2f7Stbbdev         // May have invalidated invariant for sleeping, so wake up the thread.
529*51c0b2f7Stbbdev         // Note that the notify() here occurs without maintaining invariants for my_slack.
530*51c0b2f7Stbbdev         // It does not matter, because my_state==st_quit overrides checking of my_slack.
531*51c0b2f7Stbbdev         my_thread_monitor.notify();
532*51c0b2f7Stbbdev         // Do not need release handle in st_init state,
533*51c0b2f7Stbbdev         // because in this case the thread wasn't started yet.
534*51c0b2f7Stbbdev         // For st_starting release is done at launch site.
535*51c0b2f7Stbbdev         if( s==st_normal )
536*51c0b2f7Stbbdev             release_handle( my_handle, join );
537*51c0b2f7Stbbdev     }
538*51c0b2f7Stbbdev }
539*51c0b2f7Stbbdev 
540*51c0b2f7Stbbdev void ipc_worker::start_stopping(bool join) {
541*51c0b2f7Stbbdev     state_t s = my_state.load(std::memory_order_relaxed);;
542*51c0b2f7Stbbdev 
543*51c0b2f7Stbbdev     while( !my_state.compare_exchange_strong( s, st_quit ) ) {};
544*51c0b2f7Stbbdev     if( s==st_normal || s==st_starting ) {
545*51c0b2f7Stbbdev         // May have invalidated invariant for sleeping, so wake up the thread.
546*51c0b2f7Stbbdev         // Note that the notify() here occurs without maintaining invariants for my_slack.
547*51c0b2f7Stbbdev         // It does not matter, because my_state==st_quit overrides checking of my_slack.
548*51c0b2f7Stbbdev         my_thread_monitor.notify();
549*51c0b2f7Stbbdev         // Do not need release handle in st_init state,
550*51c0b2f7Stbbdev         // because in this case the thread wasn't started yet.
551*51c0b2f7Stbbdev         // For st_starting release is done at launch site.
552*51c0b2f7Stbbdev         if( s==st_normal )
553*51c0b2f7Stbbdev             release_handle( my_handle, join );
554*51c0b2f7Stbbdev     }
555*51c0b2f7Stbbdev }
556*51c0b2f7Stbbdev 
557*51c0b2f7Stbbdev void ipc_worker::run() {
558*51c0b2f7Stbbdev     my_server.propagate_chain_reaction();
559*51c0b2f7Stbbdev 
560*51c0b2f7Stbbdev     // Transiting to st_normal here would require setting my_handle,
561*51c0b2f7Stbbdev     // which would create race with the launching thread and
562*51c0b2f7Stbbdev     // complications in handle management on Windows.
563*51c0b2f7Stbbdev 
564*51c0b2f7Stbbdev     ::rml::job& j = *my_client.create_one_job();
565*51c0b2f7Stbbdev     state_t state = my_state.load(std::memory_order_acquire);
566*51c0b2f7Stbbdev     while( state!=st_quit && state!=st_stop ) {
567*51c0b2f7Stbbdev         if( my_server.my_slack>=0 ) {
568*51c0b2f7Stbbdev             my_client.process(j);
569*51c0b2f7Stbbdev         } else {
570*51c0b2f7Stbbdev             ipc_thread_monitor::cookie c;
571*51c0b2f7Stbbdev             // Prepare to wait
572*51c0b2f7Stbbdev             my_thread_monitor.prepare_wait(c);
573*51c0b2f7Stbbdev             // Check/set the invariant for sleeping
574*51c0b2f7Stbbdev             state = my_state.load(std::memory_order_acquire);
575*51c0b2f7Stbbdev             if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
576*51c0b2f7Stbbdev                 if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
577*51c0b2f7Stbbdev                 my_thread_monitor.commit_wait(c);
578*51c0b2f7Stbbdev                 my_server.propagate_chain_reaction();
579*51c0b2f7Stbbdev             } else {
580*51c0b2f7Stbbdev                 // Invariant broken
581*51c0b2f7Stbbdev                 my_thread_monitor.cancel_wait();
582*51c0b2f7Stbbdev             }
583*51c0b2f7Stbbdev         }
584*51c0b2f7Stbbdev         state = my_state.load(std::memory_order_acquire);
585*51c0b2f7Stbbdev     }
586*51c0b2f7Stbbdev     my_client.cleanup(j);
587*51c0b2f7Stbbdev 
588*51c0b2f7Stbbdev     my_server.remove_server_ref();
589*51c0b2f7Stbbdev }
590*51c0b2f7Stbbdev 
591*51c0b2f7Stbbdev inline bool ipc_worker::wake_or_launch() {
592*51c0b2f7Stbbdev     state_t excepted_stop = st_stop, expected_init = st_init;
593*51c0b2f7Stbbdev     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) ||
594*51c0b2f7Stbbdev         ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) {
595*51c0b2f7Stbbdev         // after this point, remove_server_ref() must be done by created thread
596*51c0b2f7Stbbdev #if USE_WINTHREAD
597*51c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
598*51c0b2f7Stbbdev #elif USE_PTHREAD
599*51c0b2f7Stbbdev         {
600*51c0b2f7Stbbdev         affinity_helper fpa;
601*51c0b2f7Stbbdev         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
602*51c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
603*51c0b2f7Stbbdev         if( my_handle == 0 ) {
604*51c0b2f7Stbbdev             // Unable to create new thread for process
605*51c0b2f7Stbbdev             // However, this is expected situation for the use cases of this coordination server
606*51c0b2f7Stbbdev             state_t s = st_starting;
607*51c0b2f7Stbbdev             my_state.compare_exchange_strong( s, st_init );
608*51c0b2f7Stbbdev             if (st_starting != s) {
609*51c0b2f7Stbbdev                 // Do shutdown during startup. my_handle can't be released
610*51c0b2f7Stbbdev                 // by start_shutdown, because my_handle value might be not set yet
611*51c0b2f7Stbbdev                 // at time of transition from st_starting to st_quit.
612*51c0b2f7Stbbdev                 __TBB_ASSERT( s==st_quit, nullptr );
613*51c0b2f7Stbbdev                 release_handle( my_handle, my_server.my_join_workers );
614*51c0b2f7Stbbdev             }
615*51c0b2f7Stbbdev             return false;
616*51c0b2f7Stbbdev         } else {
617*51c0b2f7Stbbdev             my_server.my_ref_count++;
618*51c0b2f7Stbbdev         }
619*51c0b2f7Stbbdev         // Implicit destruction of fpa resets original affinity mask.
620*51c0b2f7Stbbdev         }
621*51c0b2f7Stbbdev #endif /* USE_PTHREAD */
622*51c0b2f7Stbbdev         state_t s = st_starting;
623*51c0b2f7Stbbdev         my_state.compare_exchange_strong( s, st_normal );
624*51c0b2f7Stbbdev         if( st_starting!=s ) {
625*51c0b2f7Stbbdev             // Do shutdown during startup. my_handle can't be released
626*51c0b2f7Stbbdev             // by start_shutdown, because my_handle value might be not set yet
627*51c0b2f7Stbbdev             // at time of transition from st_starting to st_quit.
628*51c0b2f7Stbbdev             __TBB_ASSERT( s==st_quit, nullptr );
629*51c0b2f7Stbbdev             release_handle( my_handle, my_server.my_join_workers );
630*51c0b2f7Stbbdev         }
631*51c0b2f7Stbbdev     }
632*51c0b2f7Stbbdev     else {
633*51c0b2f7Stbbdev         my_thread_monitor.notify();
634*51c0b2f7Stbbdev     }
635*51c0b2f7Stbbdev 
636*51c0b2f7Stbbdev     return true;
637*51c0b2f7Stbbdev }
638*51c0b2f7Stbbdev 
639*51c0b2f7Stbbdev //------------------------------------------------------------------------
640*51c0b2f7Stbbdev // Methods of ipc_waker
641*51c0b2f7Stbbdev //------------------------------------------------------------------------
642*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
643*51c0b2f7Stbbdev     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
644*51c0b2f7Stbbdev     #pragma warning(push)
645*51c0b2f7Stbbdev     #pragma warning(disable:4189)
646*51c0b2f7Stbbdev #endif
647*51c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
648*51c0b2f7Stbbdev // ensure that stack is properly aligned
649*51c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
650*51c0b2f7Stbbdev #endif
651*51c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) {
652*51c0b2f7Stbbdev     ipc_waker* self = static_cast<ipc_waker*>(arg);
653*51c0b2f7Stbbdev     AVOID_64K_ALIASING( self->my_index );
654*51c0b2f7Stbbdev     self->run();
655*51c0b2f7Stbbdev     return 0;
656*51c0b2f7Stbbdev }
657*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
658*51c0b2f7Stbbdev     #pragma warning(pop)
659*51c0b2f7Stbbdev #endif
660*51c0b2f7Stbbdev 
661*51c0b2f7Stbbdev void ipc_waker::run() {
662*51c0b2f7Stbbdev     // Transiting to st_normal here would require setting my_handle,
663*51c0b2f7Stbbdev     // which would create race with the launching thread and
664*51c0b2f7Stbbdev     // complications in handle management on Windows.
665*51c0b2f7Stbbdev 
666*51c0b2f7Stbbdev     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
667*51c0b2f7Stbbdev         bool have_to_sleep = false;
668*51c0b2f7Stbbdev         if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
669*51c0b2f7Stbbdev             if( my_server.wait_active_thread() ) {
670*51c0b2f7Stbbdev                 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
671*51c0b2f7Stbbdev                     my_server.wake_some( 0, 1 );
672*51c0b2f7Stbbdev                 } else {
673*51c0b2f7Stbbdev                     my_server.release_active_thread();
674*51c0b2f7Stbbdev                     have_to_sleep = true;
675*51c0b2f7Stbbdev                 }
676*51c0b2f7Stbbdev             }
677*51c0b2f7Stbbdev         } else {
678*51c0b2f7Stbbdev             have_to_sleep = true;
679*51c0b2f7Stbbdev         }
680*51c0b2f7Stbbdev         if( have_to_sleep ) {
681*51c0b2f7Stbbdev             ipc_thread_monitor::cookie c;
682*51c0b2f7Stbbdev             // Prepare to wait
683*51c0b2f7Stbbdev             my_thread_monitor.prepare_wait(c);
684*51c0b2f7Stbbdev             // Check/set the invariant for sleeping
685*51c0b2f7Stbbdev             if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) {
686*51c0b2f7Stbbdev                 my_thread_monitor.commit_wait(c);
687*51c0b2f7Stbbdev             } else {
688*51c0b2f7Stbbdev                 // Invariant broken
689*51c0b2f7Stbbdev                 my_thread_monitor.cancel_wait();
690*51c0b2f7Stbbdev             }
691*51c0b2f7Stbbdev         }
692*51c0b2f7Stbbdev     }
693*51c0b2f7Stbbdev 
694*51c0b2f7Stbbdev     my_server.remove_server_ref();
695*51c0b2f7Stbbdev }
696*51c0b2f7Stbbdev 
697*51c0b2f7Stbbdev inline bool ipc_waker::wake_or_launch() {
698*51c0b2f7Stbbdev     state_t excepted = st_init;
699*51c0b2f7Stbbdev     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
700*51c0b2f7Stbbdev         // after this point, remove_server_ref() must be done by created thread
701*51c0b2f7Stbbdev #if USE_WINTHREAD
702*51c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
703*51c0b2f7Stbbdev #elif USE_PTHREAD
704*51c0b2f7Stbbdev         {
705*51c0b2f7Stbbdev         affinity_helper fpa;
706*51c0b2f7Stbbdev         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
707*51c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
708*51c0b2f7Stbbdev         if( my_handle == 0 ) {
709*51c0b2f7Stbbdev             runtime_warning( "Unable to create new thread for process %d", getpid() );
710*51c0b2f7Stbbdev             state_t s = st_starting;
711*51c0b2f7Stbbdev             my_state.compare_exchange_strong(s, st_init);
712*51c0b2f7Stbbdev             if (st_starting != s) {
713*51c0b2f7Stbbdev                 // Do shutdown during startup. my_handle can't be released
714*51c0b2f7Stbbdev                 // by start_shutdown, because my_handle value might be not set yet
715*51c0b2f7Stbbdev                 // at time of transition from st_starting to st_quit.
716*51c0b2f7Stbbdev                 __TBB_ASSERT( s==st_quit, nullptr );
717*51c0b2f7Stbbdev                 release_handle( my_handle, my_server.my_join_workers );
718*51c0b2f7Stbbdev             }
719*51c0b2f7Stbbdev             return false;
720*51c0b2f7Stbbdev         } else {
721*51c0b2f7Stbbdev             my_server.my_ref_count++;
722*51c0b2f7Stbbdev         }
723*51c0b2f7Stbbdev         // Implicit destruction of fpa resets original affinity mask.
724*51c0b2f7Stbbdev         }
725*51c0b2f7Stbbdev #endif /* USE_PTHREAD */
726*51c0b2f7Stbbdev         state_t s = st_starting;
727*51c0b2f7Stbbdev         my_state.compare_exchange_strong(s, st_normal);
728*51c0b2f7Stbbdev         if( st_starting!=s ) {
729*51c0b2f7Stbbdev             // Do shutdown during startup. my_handle can't be released
730*51c0b2f7Stbbdev             // by start_shutdown, because my_handle value might be not set yet
731*51c0b2f7Stbbdev             // at time of transition from st_starting to st_quit.
732*51c0b2f7Stbbdev             __TBB_ASSERT( s==st_quit, nullptr );
733*51c0b2f7Stbbdev             release_handle( my_handle, my_server.my_join_workers );
734*51c0b2f7Stbbdev         }
735*51c0b2f7Stbbdev     }
736*51c0b2f7Stbbdev     else {
737*51c0b2f7Stbbdev         my_thread_monitor.notify();
738*51c0b2f7Stbbdev     }
739*51c0b2f7Stbbdev 
740*51c0b2f7Stbbdev     return true;
741*51c0b2f7Stbbdev }
742*51c0b2f7Stbbdev 
743*51c0b2f7Stbbdev //------------------------------------------------------------------------
744*51c0b2f7Stbbdev // Methods of ipc_stopper
745*51c0b2f7Stbbdev //------------------------------------------------------------------------
746*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
747*51c0b2f7Stbbdev     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
748*51c0b2f7Stbbdev     #pragma warning(push)
749*51c0b2f7Stbbdev     #pragma warning(disable:4189)
750*51c0b2f7Stbbdev #endif
751*51c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
752*51c0b2f7Stbbdev // ensure that stack is properly aligned
753*51c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
754*51c0b2f7Stbbdev #endif
755*51c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) {
756*51c0b2f7Stbbdev     ipc_stopper* self = static_cast<ipc_stopper*>(arg);
757*51c0b2f7Stbbdev     AVOID_64K_ALIASING( self->my_index );
758*51c0b2f7Stbbdev     self->run();
759*51c0b2f7Stbbdev     return 0;
760*51c0b2f7Stbbdev }
761*51c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
762*51c0b2f7Stbbdev     #pragma warning(pop)
763*51c0b2f7Stbbdev #endif
764*51c0b2f7Stbbdev 
765*51c0b2f7Stbbdev void ipc_stopper::run() {
766*51c0b2f7Stbbdev     // Transiting to st_normal here would require setting my_handle,
767*51c0b2f7Stbbdev     // which would create race with the launching thread and
768*51c0b2f7Stbbdev     // complications in handle management on Windows.
769*51c0b2f7Stbbdev 
770*51c0b2f7Stbbdev     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
771*51c0b2f7Stbbdev         if( my_server.wait_stop_thread() ) {
772*51c0b2f7Stbbdev             if( my_state.load(std::memory_order_acquire)!=st_quit ) {
773*51c0b2f7Stbbdev                 if( !my_server.stop_one() ) {
774*51c0b2f7Stbbdev                     my_server.add_stop_thread();
775*51c0b2f7Stbbdev                     tbb::detail::r1::prolonged_pause();
776*51c0b2f7Stbbdev                 }
777*51c0b2f7Stbbdev             }
778*51c0b2f7Stbbdev         }
779*51c0b2f7Stbbdev     }
780*51c0b2f7Stbbdev 
781*51c0b2f7Stbbdev     my_server.remove_server_ref();
782*51c0b2f7Stbbdev }
783*51c0b2f7Stbbdev 
784*51c0b2f7Stbbdev inline bool ipc_stopper::wake_or_launch() {
785*51c0b2f7Stbbdev     state_t excepted = st_init;
786*51c0b2f7Stbbdev     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
787*51c0b2f7Stbbdev         // after this point, remove_server_ref() must be done by created thread
788*51c0b2f7Stbbdev #if USE_WINTHREAD
789*51c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
790*51c0b2f7Stbbdev #elif USE_PTHREAD
791*51c0b2f7Stbbdev         {
792*51c0b2f7Stbbdev         affinity_helper fpa;
793*51c0b2f7Stbbdev         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
794*51c0b2f7Stbbdev         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
795*51c0b2f7Stbbdev         if( my_handle == 0 ) {
796*51c0b2f7Stbbdev             runtime_warning( "Unable to create new thread for process %d", getpid() );
797*51c0b2f7Stbbdev             state_t s = st_starting;
798*51c0b2f7Stbbdev             my_state.compare_exchange_strong(s, st_init);
799*51c0b2f7Stbbdev             if (st_starting != s) {
800*51c0b2f7Stbbdev                 // Do shutdown during startup. my_handle can't be released
801*51c0b2f7Stbbdev                 // by start_shutdown, because my_handle value might be not set yet
802*51c0b2f7Stbbdev                 // at time of transition from st_starting to st_quit.
803*51c0b2f7Stbbdev                 __TBB_ASSERT( s==st_quit, nullptr );
804*51c0b2f7Stbbdev                 release_handle( my_handle, my_server.my_join_workers );
805*51c0b2f7Stbbdev             }
806*51c0b2f7Stbbdev             return false;
807*51c0b2f7Stbbdev         } else {
808*51c0b2f7Stbbdev             my_server.my_ref_count++;
809*51c0b2f7Stbbdev         }
810*51c0b2f7Stbbdev         // Implicit destruction of fpa resets original affinity mask.
811*51c0b2f7Stbbdev         }
812*51c0b2f7Stbbdev #endif /* USE_PTHREAD */
813*51c0b2f7Stbbdev         state_t s = st_starting;
814*51c0b2f7Stbbdev         my_state.compare_exchange_strong(s, st_normal);
815*51c0b2f7Stbbdev         if( st_starting!=s ) {
816*51c0b2f7Stbbdev             // Do shutdown during startup. my_handle can't be released
817*51c0b2f7Stbbdev             // by start_shutdown, because my_handle value might be not set yet
818*51c0b2f7Stbbdev             // at time of transition from st_starting to st_quit.
819*51c0b2f7Stbbdev             __TBB_ASSERT( s==st_quit, nullptr );
820*51c0b2f7Stbbdev             release_handle( my_handle, my_server.my_join_workers );
821*51c0b2f7Stbbdev         }
822*51c0b2f7Stbbdev     }
823*51c0b2f7Stbbdev     else {
824*51c0b2f7Stbbdev         my_thread_monitor.notify();
825*51c0b2f7Stbbdev     }
826*51c0b2f7Stbbdev 
827*51c0b2f7Stbbdev     return true;
828*51c0b2f7Stbbdev }
829*51c0b2f7Stbbdev 
830*51c0b2f7Stbbdev //------------------------------------------------------------------------
831*51c0b2f7Stbbdev // Methods of ipc_server
832*51c0b2f7Stbbdev //------------------------------------------------------------------------
833*51c0b2f7Stbbdev ipc_server::ipc_server(tbb_client& client) :
834*51c0b2f7Stbbdev     my_client( client ),
835*51c0b2f7Stbbdev     my_stack_size( client.min_stack_size() ),
836*51c0b2f7Stbbdev     my_thread_array(nullptr),
837*51c0b2f7Stbbdev     my_join_workers(false),
838*51c0b2f7Stbbdev     my_waker(nullptr),
839*51c0b2f7Stbbdev     my_stopper(nullptr)
840*51c0b2f7Stbbdev {
841*51c0b2f7Stbbdev     my_ref_count = 1;
842*51c0b2f7Stbbdev     my_slack = 0;
843*51c0b2f7Stbbdev #if TBB_USE_ASSERT
844*51c0b2f7Stbbdev     my_net_slack_requests = 0;
845*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
846*51c0b2f7Stbbdev     my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME);
847*51c0b2f7Stbbdev     if( my_n_thread==0 ) {
848*51c0b2f7Stbbdev         my_n_thread = tbb::detail::r1::AvailableHwConcurrency();
849*51c0b2f7Stbbdev         __TBB_ASSERT( my_n_thread>0, nullptr );
850*51c0b2f7Stbbdev     }
851*51c0b2f7Stbbdev 
852*51c0b2f7Stbbdev     my_asleep_list_root = nullptr;
853*51c0b2f7Stbbdev     my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread );
854*51c0b2f7Stbbdev     for( size_t i=0; i<my_n_thread; ++i ) {
855*51c0b2f7Stbbdev         ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i );
856*51c0b2f7Stbbdev         t->my_next = my_asleep_list_root;
857*51c0b2f7Stbbdev         my_asleep_list_root = t;
858*51c0b2f7Stbbdev     }
859*51c0b2f7Stbbdev 
860*51c0b2f7Stbbdev     my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1);
861*51c0b2f7Stbbdev     new( my_waker ) ipc_waker( *this, client, my_n_thread );
862*51c0b2f7Stbbdev 
863*51c0b2f7Stbbdev     my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1);
864*51c0b2f7Stbbdev     new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 );
865*51c0b2f7Stbbdev 
866*51c0b2f7Stbbdev     char* active_sem_name = get_active_sem_name();
867*51c0b2f7Stbbdev     my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 );
868*51c0b2f7Stbbdev     __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
869*51c0b2f7Stbbdev     delete[] active_sem_name;
870*51c0b2f7Stbbdev 
871*51c0b2f7Stbbdev     char* stop_sem_name = get_stop_sem_name();
872*51c0b2f7Stbbdev     my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 );
873*51c0b2f7Stbbdev     __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" );
874*51c0b2f7Stbbdev     delete[] stop_sem_name;
875*51c0b2f7Stbbdev }
876*51c0b2f7Stbbdev 
877*51c0b2f7Stbbdev ipc_server::~ipc_server() {
878*51c0b2f7Stbbdev     __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr );
879*51c0b2f7Stbbdev 
880*51c0b2f7Stbbdev     for( size_t i=my_n_thread; i--; )
881*51c0b2f7Stbbdev         my_thread_array[i].~padded_ipc_worker();
882*51c0b2f7Stbbdev     tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread );
883*51c0b2f7Stbbdev     tbb::detail::d0::poison_pointer( my_thread_array );
884*51c0b2f7Stbbdev 
885*51c0b2f7Stbbdev     my_waker->~ipc_waker();
886*51c0b2f7Stbbdev     tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 );
887*51c0b2f7Stbbdev     tbb::detail::d0::poison_pointer( my_waker );
888*51c0b2f7Stbbdev 
889*51c0b2f7Stbbdev     my_stopper->~ipc_stopper();
890*51c0b2f7Stbbdev     tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 );
891*51c0b2f7Stbbdev     tbb::detail::d0::poison_pointer( my_stopper );
892*51c0b2f7Stbbdev 
893*51c0b2f7Stbbdev     sem_close( my_active_sem );
894*51c0b2f7Stbbdev     sem_close( my_stop_sem );
895*51c0b2f7Stbbdev }
896*51c0b2f7Stbbdev 
897*51c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) {
898*51c0b2f7Stbbdev     asleep_list_mutex_type::scoped_lock lock;
899*51c0b2f7Stbbdev     if( !lock.try_acquire( my_asleep_list_mutex ) )
900*51c0b2f7Stbbdev         return false;
901*51c0b2f7Stbbdev     // Contribute to slack under lock so that if another takes that unit of slack,
902*51c0b2f7Stbbdev     // it sees us sleeping on the list and wakes us up.
903*51c0b2f7Stbbdev     int k = ++my_slack;
904*51c0b2f7Stbbdev     if( k<=0 ) {
905*51c0b2f7Stbbdev         t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
906*51c0b2f7Stbbdev         my_asleep_list_root.store(&t, std::memory_order_relaxed);
907*51c0b2f7Stbbdev         return true;
908*51c0b2f7Stbbdev     } else {
909*51c0b2f7Stbbdev         --my_slack;
910*51c0b2f7Stbbdev         return false;
911*51c0b2f7Stbbdev     }
912*51c0b2f7Stbbdev }
913*51c0b2f7Stbbdev 
914*51c0b2f7Stbbdev inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) {
915*51c0b2f7Stbbdev     asleep_list_mutex_type::scoped_lock lock;
916*51c0b2f7Stbbdev     if( !lock.try_acquire( my_asleep_list_mutex ) )
917*51c0b2f7Stbbdev         return false;
918*51c0b2f7Stbbdev     // Contribute to slack under lock so that if another takes that unit of slack,
919*51c0b2f7Stbbdev     // it sees us sleeping on the list and wakes us up.
920*51c0b2f7Stbbdev     ++my_slack;
921*51c0b2f7Stbbdev     t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
922*51c0b2f7Stbbdev     my_asleep_list_root.store(&t, std::memory_order_relaxed);
923*51c0b2f7Stbbdev     return true;
924*51c0b2f7Stbbdev }
925*51c0b2f7Stbbdev 
926*51c0b2f7Stbbdev inline bool ipc_server::wait_active_thread() {
927*51c0b2f7Stbbdev     if( sem_wait( my_active_sem ) == 0 ) {
928*51c0b2f7Stbbdev         ++my_global_thread_count;
929*51c0b2f7Stbbdev         return true;
930*51c0b2f7Stbbdev     }
931*51c0b2f7Stbbdev     return false;
932*51c0b2f7Stbbdev }
933*51c0b2f7Stbbdev 
934*51c0b2f7Stbbdev inline bool ipc_server::try_get_active_thread() {
935*51c0b2f7Stbbdev     if( sem_trywait( my_active_sem ) == 0 ) {
936*51c0b2f7Stbbdev         ++my_global_thread_count;
937*51c0b2f7Stbbdev         return true;
938*51c0b2f7Stbbdev     }
939*51c0b2f7Stbbdev     return false;
940*51c0b2f7Stbbdev }
941*51c0b2f7Stbbdev 
942*51c0b2f7Stbbdev inline void ipc_server::release_active_thread() {
943*51c0b2f7Stbbdev     release_thread_sem( my_active_sem );
944*51c0b2f7Stbbdev }
945*51c0b2f7Stbbdev 
946*51c0b2f7Stbbdev inline bool ipc_server::wait_stop_thread() {
947*51c0b2f7Stbbdev     struct timespec ts;
948*51c0b2f7Stbbdev     if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) {
949*51c0b2f7Stbbdev         ts.tv_sec++;
950*51c0b2f7Stbbdev         if( sem_timedwait( my_stop_sem, &ts )==0 ) {
951*51c0b2f7Stbbdev             return true;
952*51c0b2f7Stbbdev         }
953*51c0b2f7Stbbdev     }
954*51c0b2f7Stbbdev     return false;
955*51c0b2f7Stbbdev }
956*51c0b2f7Stbbdev 
957*51c0b2f7Stbbdev inline void ipc_server::add_stop_thread() {
958*51c0b2f7Stbbdev     sem_post( my_stop_sem );
959*51c0b2f7Stbbdev }
960*51c0b2f7Stbbdev 
961*51c0b2f7Stbbdev void ipc_server::wake_some( int additional_slack, int active_threads ) {
962*51c0b2f7Stbbdev     __TBB_ASSERT( additional_slack>=0, nullptr );
963*51c0b2f7Stbbdev     ipc_worker* wakee[2];
964*51c0b2f7Stbbdev     ipc_worker **w = wakee;
965*51c0b2f7Stbbdev     {
966*51c0b2f7Stbbdev         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
967*51c0b2f7Stbbdev         while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) {
968*51c0b2f7Stbbdev             if( additional_slack>0 ) {
969*51c0b2f7Stbbdev                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
970*51c0b2f7Stbbdev                     break;
971*51c0b2f7Stbbdev                 --additional_slack;
972*51c0b2f7Stbbdev             } else {
973*51c0b2f7Stbbdev                 // Chain reaction; Try to claim unit of slack
974*51c0b2f7Stbbdev                 int old;
975*51c0b2f7Stbbdev                 do {
976*51c0b2f7Stbbdev                     old = my_slack.load(std::memory_order_relaxed);
977*51c0b2f7Stbbdev                     if( old<=0 ) goto done;
978*51c0b2f7Stbbdev                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
979*51c0b2f7Stbbdev             }
980*51c0b2f7Stbbdev             // Pop sleeping worker to combine with claimed unit of slack
981*51c0b2f7Stbbdev             my_asleep_list_root.store(
982*51c0b2f7Stbbdev                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
983*51c0b2f7Stbbdev                 std::memory_order_relaxed
984*51c0b2f7Stbbdev             );
985*51c0b2f7Stbbdev             --active_threads;
986*51c0b2f7Stbbdev         }
987*51c0b2f7Stbbdev         if( additional_slack ) {
988*51c0b2f7Stbbdev             // Contribute our unused slack to my_slack.
989*51c0b2f7Stbbdev             my_slack += additional_slack;
990*51c0b2f7Stbbdev         }
991*51c0b2f7Stbbdev     }
992*51c0b2f7Stbbdev done:
993*51c0b2f7Stbbdev     while( w>wakee ) {
994*51c0b2f7Stbbdev         if( !(*--w)->wake_or_launch() ) {
995*51c0b2f7Stbbdev             add_stop_thread();
996*51c0b2f7Stbbdev             do {
997*51c0b2f7Stbbdev             } while( !try_insert_in_asleep_list_forced(**w) );
998*51c0b2f7Stbbdev             release_active_thread();
999*51c0b2f7Stbbdev         }
1000*51c0b2f7Stbbdev     }
1001*51c0b2f7Stbbdev     while( active_threads ) {
1002*51c0b2f7Stbbdev         release_active_thread();
1003*51c0b2f7Stbbdev         --active_threads;
1004*51c0b2f7Stbbdev     }
1005*51c0b2f7Stbbdev }
1006*51c0b2f7Stbbdev 
1007*51c0b2f7Stbbdev void ipc_server::wake_one_forced( int additional_slack ) {
1008*51c0b2f7Stbbdev     __TBB_ASSERT( additional_slack>=0, nullptr );
1009*51c0b2f7Stbbdev     ipc_worker* wakee[1];
1010*51c0b2f7Stbbdev     ipc_worker **w = wakee;
1011*51c0b2f7Stbbdev     {
1012*51c0b2f7Stbbdev         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1013*51c0b2f7Stbbdev         while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) {
1014*51c0b2f7Stbbdev             if( additional_slack>0 ) {
1015*51c0b2f7Stbbdev                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
1016*51c0b2f7Stbbdev                     break;
1017*51c0b2f7Stbbdev                 --additional_slack;
1018*51c0b2f7Stbbdev             } else {
1019*51c0b2f7Stbbdev                 // Chain reaction; Try to claim unit of slack
1020*51c0b2f7Stbbdev                 int old;
1021*51c0b2f7Stbbdev                 do {
1022*51c0b2f7Stbbdev                     old = my_slack.load(std::memory_order_relaxed);
1023*51c0b2f7Stbbdev                     if( old<=0 ) goto done;
1024*51c0b2f7Stbbdev                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
1025*51c0b2f7Stbbdev             }
1026*51c0b2f7Stbbdev             // Pop sleeping worker to combine with claimed unit of slack
1027*51c0b2f7Stbbdev             my_asleep_list_root.store(
1028*51c0b2f7Stbbdev                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
1029*51c0b2f7Stbbdev                 std::memory_order_relaxed);
1030*51c0b2f7Stbbdev         }
1031*51c0b2f7Stbbdev         if( additional_slack ) {
1032*51c0b2f7Stbbdev             // Contribute our unused slack to my_slack.
1033*51c0b2f7Stbbdev             my_slack += additional_slack;
1034*51c0b2f7Stbbdev         }
1035*51c0b2f7Stbbdev     }
1036*51c0b2f7Stbbdev done:
1037*51c0b2f7Stbbdev     while( w>wakee ) {
1038*51c0b2f7Stbbdev         if( !(*--w)->wake_or_launch() ) {
1039*51c0b2f7Stbbdev             add_stop_thread();
1040*51c0b2f7Stbbdev             do {
1041*51c0b2f7Stbbdev             } while( !try_insert_in_asleep_list_forced(**w) );
1042*51c0b2f7Stbbdev         }
1043*51c0b2f7Stbbdev     }
1044*51c0b2f7Stbbdev }
1045*51c0b2f7Stbbdev 
1046*51c0b2f7Stbbdev bool ipc_server::stop_one() {
1047*51c0b2f7Stbbdev     ipc_worker* current = nullptr;
1048*51c0b2f7Stbbdev     ipc_worker* next = nullptr;
1049*51c0b2f7Stbbdev     {
1050*51c0b2f7Stbbdev         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1051*51c0b2f7Stbbdev         if( my_asleep_list_root.load(std::memory_order_relaxed) ) {
1052*51c0b2f7Stbbdev             current = my_asleep_list_root.load(std::memory_order_relaxed);
1053*51c0b2f7Stbbdev             if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1054*51c0b2f7Stbbdev                 next = current->my_next;
1055*51c0b2f7Stbbdev                 while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1056*51c0b2f7Stbbdev                     current = next;
1057*51c0b2f7Stbbdev                     next = current->my_next;
1058*51c0b2f7Stbbdev                 }
1059*51c0b2f7Stbbdev                 current->start_stopping( my_join_workers );
1060*51c0b2f7Stbbdev                 return true;
1061*51c0b2f7Stbbdev             }
1062*51c0b2f7Stbbdev         }
1063*51c0b2f7Stbbdev     }
1064*51c0b2f7Stbbdev     return false;
1065*51c0b2f7Stbbdev }
1066*51c0b2f7Stbbdev 
1067*51c0b2f7Stbbdev void ipc_server::adjust_job_count_estimate( int delta ) {
1068*51c0b2f7Stbbdev #if TBB_USE_ASSERT
1069*51c0b2f7Stbbdev     my_net_slack_requests+=delta;
1070*51c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
1071*51c0b2f7Stbbdev     if( my_n_thread > 1 ) {
1072*51c0b2f7Stbbdev         if( delta<0 ) {
1073*51c0b2f7Stbbdev             my_slack+=delta;
1074*51c0b2f7Stbbdev         } else if( delta>0 ) {
1075*51c0b2f7Stbbdev             int active_threads = 0;
1076*51c0b2f7Stbbdev             if( try_get_active_thread() ) {
1077*51c0b2f7Stbbdev                 ++active_threads;
1078*51c0b2f7Stbbdev                 if( try_get_active_thread() ) {
1079*51c0b2f7Stbbdev                     ++active_threads;
1080*51c0b2f7Stbbdev                 }
1081*51c0b2f7Stbbdev             }
1082*51c0b2f7Stbbdev             wake_some( delta, active_threads );
1083*51c0b2f7Stbbdev 
1084*51c0b2f7Stbbdev             if( !my_waker->wake_or_launch() ) {
1085*51c0b2f7Stbbdev                 add_stop_thread();
1086*51c0b2f7Stbbdev             }
1087*51c0b2f7Stbbdev             if( !my_stopper->wake_or_launch() ) {
1088*51c0b2f7Stbbdev                 add_stop_thread();
1089*51c0b2f7Stbbdev             }
1090*51c0b2f7Stbbdev         }
1091*51c0b2f7Stbbdev     } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one
1092*51c0b2f7Stbbdev         if( delta<0 ) {
1093*51c0b2f7Stbbdev             my_slack += delta;
1094*51c0b2f7Stbbdev         } else {
1095*51c0b2f7Stbbdev             wake_one_forced( delta );
1096*51c0b2f7Stbbdev         }
1097*51c0b2f7Stbbdev     }
1098*51c0b2f7Stbbdev }
1099*51c0b2f7Stbbdev 
1100*51c0b2f7Stbbdev //------------------------------------------------------------------------
1101*51c0b2f7Stbbdev // RML factory methods
1102*51c0b2f7Stbbdev //------------------------------------------------------------------------
1103*51c0b2f7Stbbdev 
1104*51c0b2f7Stbbdev #if USE_PTHREAD
1105*51c0b2f7Stbbdev 
1106*51c0b2f7Stbbdev static tbb_client* my_global_client = nullptr;
1107*51c0b2f7Stbbdev static tbb_server* my_global_server = nullptr;
1108*51c0b2f7Stbbdev 
1109*51c0b2f7Stbbdev void rml_atexit() {
1110*51c0b2f7Stbbdev     release_resources();
1111*51c0b2f7Stbbdev }
1112*51c0b2f7Stbbdev 
1113*51c0b2f7Stbbdev void rml_atfork_child() {
1114*51c0b2f7Stbbdev     if( my_global_server!=nullptr && my_global_client!=nullptr ) {
1115*51c0b2f7Stbbdev         ipc_server* server = static_cast<ipc_server*>( my_global_server );
1116*51c0b2f7Stbbdev         server->~ipc_server();
1117*51c0b2f7Stbbdev         // memset( server, 0, sizeof(ipc_server) );
1118*51c0b2f7Stbbdev         new( server ) ipc_server( *my_global_client );
1119*51c0b2f7Stbbdev         pthread_atfork( nullptr, nullptr, rml_atfork_child );
1120*51c0b2f7Stbbdev         atexit( rml_atexit );
1121*51c0b2f7Stbbdev     }
1122*51c0b2f7Stbbdev }
1123*51c0b2f7Stbbdev 
1124*51c0b2f7Stbbdev #endif /* USE_PTHREAD */
1125*51c0b2f7Stbbdev 
1126*51c0b2f7Stbbdev extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) {
1127*51c0b2f7Stbbdev     server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client);
1128*51c0b2f7Stbbdev #if USE_PTHREAD
1129*51c0b2f7Stbbdev     my_global_client = &client;
1130*51c0b2f7Stbbdev     my_global_server = server;
1131*51c0b2f7Stbbdev     pthread_atfork( nullptr, nullptr, rml_atfork_child );
1132*51c0b2f7Stbbdev     atexit( rml_atexit );
1133*51c0b2f7Stbbdev #endif /* USE_PTHREAD */
1134*51c0b2f7Stbbdev     if( getenv( "RML_DEBUG" ) ) {
1135*51c0b2f7Stbbdev         runtime_warning("IPC server is started");
1136*51c0b2f7Stbbdev     }
1137*51c0b2f7Stbbdev     return tbb_factory::st_success;
1138*51c0b2f7Stbbdev }
1139*51c0b2f7Stbbdev 
1140*51c0b2f7Stbbdev extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) {
1141*51c0b2f7Stbbdev }
1142*51c0b2f7Stbbdev 
1143*51c0b2f7Stbbdev } // namespace rml
1144*51c0b2f7Stbbdev } // namespace detail
1145*51c0b2f7Stbbdev 
1146*51c0b2f7Stbbdev } // namespace tbb
1147