xref: /oneTBB/python/rml/ipc_server.cpp (revision 89b2e0e3)
1 /*
2     Copyright (c) 2017-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include <atomic>
18 #include <cstring>
19 #include <cstdlib>
20 
21 #include "../../src/tbb/rml_tbb.h"
22 #include "../../src/tbb/rml_thread_monitor.h"
23 #include "../../src/tbb/scheduler_common.h"
24 #include "../../src/tbb/governor.h"
25 #include "../../src/tbb/misc.h"
26 #include "tbb/cache_aligned_allocator.h"
27 
28 #include "ipc_utils.h"
29 
30 #include <fcntl.h>
31 #include <stdlib.h>
32 
33 namespace rml {
34 namespace internal {
35 
36 static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE";
37 
38 typedef versioned_object::version_type version_type;
39 
40 extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) {
41     if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) {
42         return factory::st_incompatible;
43     }
44 
45     // Hack to keep this library from being closed
46     static std::atomic<bool> one_time_flag{false};
47     bool expected = false;
48 
49     if( one_time_flag.compare_exchange_strong(expected, true) ) {
50         __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr );
51 #if _WIN32||_WIN64
52         f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
53 #else
54         f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
55 #endif
56     }
57     // End of hack
58 
59     return factory::st_success;
60 }
61 
62 extern "C" void __RML_close_factory(factory& /*f*/) {
63 }
64 
65 class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor {
66 public:
67     ipc_thread_monitor() : thread_monitor() {}
68 
69 #if USE_WINTHREAD
70 #elif USE_PTHREAD
71     static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size);
72 #endif
73 };
74 
75 #if USE_WINTHREAD
76 #elif USE_PTHREAD
77 inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) {
78     pthread_attr_t s;
79     if( pthread_attr_init( &s ) ) return 0;
80     if( stack_size>0 ) {
81         if( pthread_attr_setstacksize( &s, stack_size ) ) return 0;
82     }
83     pthread_t handle;
84     if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0;
85     if( pthread_attr_destroy( &s ) ) return 0;
86     return handle;
87 }
88 #endif
89 
90 }} // rml::internal
91 
92 using rml::internal::ipc_thread_monitor;
93 using tbb::internal::rml::get_shared_name;
94 
95 namespace tbb {
96 namespace detail {
97 
98 namespace r1 {
99 bool terminate_on_exception() {
100     return false;
101 }
102 }
103 
104 namespace rml {
105 
106 typedef ipc_thread_monitor::handle_type thread_handle;
107 
108 class ipc_server;
109 
110 static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS";
111 static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active";
112 static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop";
113 static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE";
114 static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE";
115 static const mode_t IPC_SEM_MODE = 0660;
116 
117 static std::atomic<int> my_global_thread_count;
118 using tbb_client = tbb::detail::r1::rml::tbb_client;
119 using tbb_server = tbb::detail::r1::rml::tbb_server;
120 using tbb_factory = tbb::detail::r1::rml::tbb_factory;
121 
122 using tbb::detail::r1::runtime_warning;
123 
124 char* get_sem_name(const char* name, const char* prefix) {
125     __TBB_ASSERT(name != nullptr, nullptr);
126     __TBB_ASSERT(prefix != nullptr, nullptr);
127     char* value = std::getenv(name);
128     std::size_t len = value == nullptr ? 0 : std::strlen(value);
129     if (len > 0) {
130         // TODO: consider returning the original string instead of the copied string.
131         char* sem_name = new char[len + 1];
132         __TBB_ASSERT(sem_name != nullptr, nullptr);
133         std::strncpy(sem_name, value, len+1);
134         __TBB_ASSERT(sem_name[len] == 0, nullptr);
135         return sem_name;
136     } else {
137         return get_shared_name(prefix);
138     }
139 }
140 
141 char* get_active_sem_name() {
142     return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
143 }
144 
145 char* get_stop_sem_name() {
146     return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
147 }
148 
149 static void release_thread_sem(sem_t* my_sem) {
150     int old = my_global_thread_count.load(std::memory_order_relaxed);
151     do {
152         if( old<=0 ) return;
153     } while( !my_global_thread_count.compare_exchange_strong(old, old-1) );
154     if( old>0 ) {
155         sem_post( my_sem );
156     }
157 }
158 
159 void set_sem_name(const char* name, const char* prefix) {
160     __TBB_ASSERT(name != nullptr, nullptr);
161     __TBB_ASSERT(prefix != nullptr, nullptr);
162     const char* postfix = "_XXXXXX";
163     std::size_t plen = std::strlen(prefix);
164     std::size_t xlen = std::strlen(postfix);
165     char* templ = new char[plen + xlen + 1];
166     __TBB_ASSERT(templ != nullptr, nullptr);
167     strncpy(templ, prefix, plen+1);
168     __TBB_ASSERT(templ[plen] == 0, nullptr);
169     strncat(templ, postfix, xlen + 1);
170     __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr);
171     // TODO: consider using mkstemp instead of mktemp.
172     char* sem_name = mktemp(templ);
173     if (sem_name != nullptr) {
174         int status = setenv(name, sem_name,  /*overwrite*/ 1);
175         __TBB_ASSERT_EX(status == 0, nullptr);
176     }
177     delete[] templ;
178 }
179 
180 extern "C" void set_active_sem_name() {
181     set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
182 }
183 
184 extern "C" void set_stop_sem_name() {
185     set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
186 }
187 
188 extern "C" void release_resources() {
189     if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) {
190         char* active_sem_name = get_active_sem_name();
191         sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT );
192         __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
193         delete[] active_sem_name;
194 
195         do {
196             release_thread_sem( my_active_sem );
197         } while( my_global_thread_count.load(std::memory_order_acquire)!=0 );
198     }
199 }
200 
201 extern "C" void release_semaphores() {
202     int status = 0;
203     char* sem_name = nullptr;
204 
205     sem_name = get_active_sem_name();
206     if( sem_name==nullptr ) {
207         runtime_warning("Can not get RML semaphore name");
208         return;
209     }
210     status = sem_unlink( sem_name );
211     if( status!=0 ) {
212         if( errno==ENOENT ) {
213             /* There is no semaphore with the given name, nothing to do */
214         } else {
215             runtime_warning("Can not release RML semaphore");
216             return;
217         }
218     }
219     delete[] sem_name;
220 
221     sem_name = get_stop_sem_name();
222     if( sem_name==nullptr ) {
223         runtime_warning( "Can not get RML semaphore name" );
224         return;
225     }
226     status = sem_unlink( sem_name );
227     if( status!=0 ) {
228         if( errno==ENOENT ) {
229             /* There is no semaphore with the given name, nothing to do */
230         } else {
231             runtime_warning("Can not release RML semaphore");
232             return;
233         }
234     }
235     delete[] sem_name;
236 }
237 
238 class ipc_worker: no_copy {
239 protected:
240     //! State in finite-state machine that controls the worker.
241     /** State diagram:
242                     /----------stop---\
243                     |           ^     |
244                     V           |     |
245         init --> starting --> normal  |
246           |         |           |     |
247           |         V           |     |
248           \------> quit <-------/<----/
249       */
250     enum state_t {
251         //! *this is initialized
252         st_init,
253         //! *this has associated thread that is starting up.
254         st_starting,
255         //! Associated thread is doing normal life sequence.
256         st_normal,
257         //! Associated thread is stopped but can be started again.
258         st_stop,
259         //! Associated thread has ended normal life sequence and promises to never touch *this again.
260         st_quit
261     };
262     std::atomic<state_t> my_state;
263 
264     //! Associated server
265     ipc_server& my_server;
266 
267     //! Associated client
268     tbb_client& my_client;
269 
270     //! index used for avoiding the 64K aliasing problem
271     const size_t my_index;
272 
273     //! Monitor for sleeping when there is no work to do.
274     /** The invariant that holds for sleeping workers is:
275         "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
276     ipc_thread_monitor my_thread_monitor;
277 
278     //! Handle of the OS thread associated with this worker
279     thread_handle my_handle;
280 
281     //! Link for list of workers that are sleeping or have no associated thread.
282     ipc_worker* my_next;
283 
284     friend class ipc_server;
285 
286     //! Actions executed by the associated thread
287     void run();
288 
289     //! Wake up associated thread (or launch a thread if there is none)
290     bool wake_or_launch();
291 
292     //! Called by a thread (usually not the associated thread) to commence termination.
293     void start_shutdown(bool join);
294 
295     //! Called by a thread (usually not the associated thread) to commence stopping.
296     void start_stopping(bool join);
297 
298     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
299 
300     static void release_handle(thread_handle my_handle, bool join);
301 
302 protected:
303     ipc_worker(ipc_server& server, tbb_client& client, const size_t i) :
304         my_server(server),
305         my_client(client),
306         my_index(i)
307     {
308         my_state = st_init;
309     }
310 };
311 
312 //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit
313 constexpr static size_t cache_line_sz = 128;
314 
315 #if _MSC_VER && !defined(__INTEL_COMPILER)
316     // Suppress overzealous compiler warnings about uninstantiable class
317     #pragma warning(push)
318     #pragma warning(disable:4510 4610)
319 #endif
320 class padded_ipc_worker: public ipc_worker {
321     char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz];
322 public:
323     padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i)
324     : ipc_worker( server,client,i ) { suppress_unused_warning(pad); }
325 };
326 #if _MSC_VER && !defined(__INTEL_COMPILER)
327     #pragma warning(pop)
328 #endif
329 
330 class ipc_waker : public padded_ipc_worker {
331 private:
332     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
333     void run();
334     bool wake_or_launch();
335 
336     friend class ipc_server;
337 
338 public:
339     ipc_waker(ipc_server& server, tbb_client& client, const size_t i)
340     : padded_ipc_worker( server, client, i ) {}
341 };
342 
343 class ipc_stopper : public padded_ipc_worker {
344 private:
345     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
346     void run();
347     bool wake_or_launch();
348 
349     friend class ipc_server;
350 
351 public:
352     ipc_stopper(ipc_server& server, tbb_client& client, const size_t i)
353     : padded_ipc_worker( server, client, i ) {}
354 };
355 
356 class ipc_server: public tbb_server, no_copy {
357 private:
358     tbb_client& my_client;
359     //! Maximum number of threads to be created.
360     /** Threads are created lazily, so maximum might not actually be reached. */
361     tbb_client::size_type my_n_thread;
362 
363     //! Stack size for each thread. */
364     const size_t my_stack_size;
365 
366     //! Number of jobs that could use their associated thread minus number of active threads.
367     /** If negative, indicates oversubscription.
368         If positive, indicates that more threads should run.
369         Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
370         because raising it impacts the invariant for sleeping threads. */
371     std::atomic<int> my_slack;
372 
373     //! Counter used to determine when to delete this.
374     std::atomic<int> my_ref_count;
375 
376     padded_ipc_worker* my_thread_array;
377 
378     //! List of workers that are asleep or committed to sleeping until notified by another thread.
379     std::atomic<ipc_worker*> my_asleep_list_root;
380 
381     //! Protects my_asleep_list_root
382     typedef scheduler_mutex_type asleep_list_mutex_type;
383     asleep_list_mutex_type my_asleep_list_mutex;
384 
385     //! Should server wait workers while terminate
386     const bool my_join_workers;
387 
388     //! Service thread for waking of workers
389     ipc_waker* my_waker;
390 
391     //! Service thread to stop threads
392     ipc_stopper* my_stopper;
393 
394     //! Semaphore to account active threads
395     sem_t* my_active_sem;
396 
397     //! Semaphore to account stop threads
398     sem_t* my_stop_sem;
399 
400 #if TBB_USE_ASSERT
401     std::atomic<int> my_net_slack_requests;
402 #endif /* TBB_USE_ASSERT */
403 
404     //! Wake up to two sleeping workers, if there are any sleeping.
405     /** The call is used to propagate a chain reaction where each thread wakes up two threads,
406         which in turn each wake up two threads, etc. */
407     void propagate_chain_reaction() {
408         // First test of a double-check idiom.  Second test is inside wake_some(0).
409         if( my_slack.load(std::memory_order_acquire)>0 ) {
410             int active_threads = 0;
411             if( try_get_active_thread() ) {
412                 ++active_threads;
413                 if( try_get_active_thread() ) {
414                     ++active_threads;
415                 }
416                 wake_some( 0, active_threads );
417             }
418         }
419     }
420 
421     //! Try to add t to list of sleeping workers
422     bool try_insert_in_asleep_list(ipc_worker& t);
423 
424     //! Try to add t to list of sleeping workers even if there is some work to do
425     bool try_insert_in_asleep_list_forced(ipc_worker& t);
426 
427     //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
428     void wake_some(int additional_slack, int active_threads);
429 
430     //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits.
431     void wake_one_forced(int additional_slack);
432 
433     //! Stop one thread from asleep list
434     bool stop_one();
435 
436     //! Wait for active thread
437     bool wait_active_thread();
438 
439     //! Try to get active thread
440     bool try_get_active_thread();
441 
442     //! Release active thread
443     void release_active_thread();
444 
445     //! Wait for thread to stop
446     bool wait_stop_thread();
447 
448     //! Add thread to stop list
449     void add_stop_thread();
450 
451     void remove_server_ref() {
452         if( --my_ref_count==0 ) {
453             my_client.acknowledge_close_connection();
454             this->~ipc_server();
455             tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 );
456         }
457     }
458 
459     friend class ipc_worker;
460     friend class ipc_waker;
461     friend class ipc_stopper;
462 public:
463     ipc_server(tbb_client& client);
464     virtual ~ipc_server();
465 
466     version_type version() const override {
467         return 0;
468     }
469 
470     void request_close_connection(bool /*exiting*/) override {
471         my_waker->start_shutdown(false);
472         my_stopper->start_shutdown(false);
473         for( size_t i=0; i<my_n_thread; ++i )
474             my_thread_array[i].start_shutdown( my_join_workers );
475         remove_server_ref();
476     }
477 
478     void yield() override {d0::yield();}
479 
480     void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); }
481 
482     unsigned default_concurrency() const override { return my_n_thread - 1; }
483 
484     void adjust_job_count_estimate(int delta) override;
485 
486 #if _WIN32||_WIN64
487     void register_external_thread(::rml::server::execution_resource_t&) override {}
488     void unregister_external_thread(::rml::server::execution_resource_t) override {}
489 #endif /* _WIN32||_WIN64 */
490 };
491 
492 //------------------------------------------------------------------------
493 // Methods of ipc_worker
494 //------------------------------------------------------------------------
495 #if _MSC_VER && !defined(__INTEL_COMPILER)
496     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
497     #pragma warning(push)
498     #pragma warning(disable:4189)
499 #endif
500 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
501 // ensure that stack is properly aligned
502 __attribute__((force_align_arg_pointer))
503 #endif
504 __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) {
505     ipc_worker* self = static_cast<ipc_worker*>(arg);
506     AVOID_64K_ALIASING( self->my_index );
507     self->run();
508     return 0;
509 }
510 #if _MSC_VER && !defined(__INTEL_COMPILER)
511     #pragma warning(pop)
512 #endif
513 
514 void ipc_worker::release_handle(thread_handle handle, bool join) {
515     if( join )
516         ipc_thread_monitor::join( handle );
517     else
518         ipc_thread_monitor::detach_thread( handle );
519 }
520 
521 void ipc_worker::start_shutdown(bool join) {
522     state_t s = my_state.load(std::memory_order_relaxed);
523 
524     do {
525         __TBB_ASSERT( s!=st_quit, nullptr );
526     } while( !my_state.compare_exchange_strong( s, st_quit ) );
527     if( s==st_normal || s==st_starting ) {
528         // May have invalidated invariant for sleeping, so wake up the thread.
529         // Note that the notify() here occurs without maintaining invariants for my_slack.
530         // It does not matter, because my_state==st_quit overrides checking of my_slack.
531         my_thread_monitor.notify();
532         // Do not need release handle in st_init state,
533         // because in this case the thread wasn't started yet.
534         // For st_starting release is done at launch site.
535         if( s==st_normal )
536             release_handle( my_handle, join );
537     }
538 }
539 
540 void ipc_worker::start_stopping(bool join) {
541     state_t s = my_state.load(std::memory_order_relaxed);
542 
543     while( !my_state.compare_exchange_strong( s, st_quit ) ) {};
544     if( s==st_normal || s==st_starting ) {
545         // May have invalidated invariant for sleeping, so wake up the thread.
546         // Note that the notify() here occurs without maintaining invariants for my_slack.
547         // It does not matter, because my_state==st_quit overrides checking of my_slack.
548         my_thread_monitor.notify();
549         // Do not need release handle in st_init state,
550         // because in this case the thread wasn't started yet.
551         // For st_starting release is done at launch site.
552         if( s==st_normal )
553             release_handle( my_handle, join );
554     }
555 }
556 
557 void ipc_worker::run() {
558     my_server.propagate_chain_reaction();
559 
560     // Transiting to st_normal here would require setting my_handle,
561     // which would create race with the launching thread and
562     // complications in handle management on Windows.
563 
564     ::rml::job& j = *my_client.create_one_job();
565     state_t state = my_state.load(std::memory_order_acquire);
566     while( state!=st_quit && state!=st_stop ) {
567         if( my_server.my_slack>=0 ) {
568             my_client.process(j);
569         } else {
570             // Check/set the invariant for sleeping
571             state = my_state.load(std::memory_order_seq_cst);
572             if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
573                 if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
574                 my_thread_monitor.wait();
575                 my_server.propagate_chain_reaction();
576             }
577         }
578         // memory_order_seq_cst to be strictly ordered after thread_monitor::wait
579         state = my_state.load(std::memory_order_seq_cst);
580     }
581     my_client.cleanup(j);
582 
583     my_server.remove_server_ref();
584 }
585 
586 inline bool ipc_worker::wake_or_launch() {
587     state_t excepted_stop = st_stop, expected_init = st_init;
588     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) ||
589         ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) {
590         // after this point, remove_server_ref() must be done by created thread
591 #if USE_WINTHREAD
592         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
593 #elif USE_PTHREAD
594         {
595         affinity_helper fpa;
596         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
597         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
598         if( my_handle == 0 ) {
599             // Unable to create new thread for process
600             // However, this is expected situation for the use cases of this coordination server
601             state_t s = st_starting;
602             my_state.compare_exchange_strong( s, st_init );
603             if (st_starting != s) {
604                 // Do shutdown during startup. my_handle can't be released
605                 // by start_shutdown, because my_handle value might be not set yet
606                 // at time of transition from st_starting to st_quit.
607                 __TBB_ASSERT( s==st_quit, nullptr );
608                 release_handle( my_handle, my_server.my_join_workers );
609             }
610             return false;
611         } else {
612             my_server.my_ref_count++;
613         }
614         // Implicit destruction of fpa resets original affinity mask.
615         }
616 #endif /* USE_PTHREAD */
617         state_t s = st_starting;
618         my_state.compare_exchange_strong( s, st_normal );
619         if( st_starting!=s ) {
620             // Do shutdown during startup. my_handle can't be released
621             // by start_shutdown, because my_handle value might be not set yet
622             // at time of transition from st_starting to st_quit.
623             __TBB_ASSERT( s==st_quit, nullptr );
624             release_handle( my_handle, my_server.my_join_workers );
625         }
626     }
627     else {
628         my_thread_monitor.notify();
629     }
630 
631     return true;
632 }
633 
634 //------------------------------------------------------------------------
635 // Methods of ipc_waker
636 //------------------------------------------------------------------------
637 #if _MSC_VER && !defined(__INTEL_COMPILER)
638     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
639     #pragma warning(push)
640     #pragma warning(disable:4189)
641 #endif
642 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
643 // ensure that stack is properly aligned
644 __attribute__((force_align_arg_pointer))
645 #endif
646 __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) {
647     ipc_waker* self = static_cast<ipc_waker*>(arg);
648     AVOID_64K_ALIASING( self->my_index );
649     self->run();
650     return 0;
651 }
652 #if _MSC_VER && !defined(__INTEL_COMPILER)
653     #pragma warning(pop)
654 #endif
655 
656 void ipc_waker::run() {
657     // Transiting to st_normal here would require setting my_handle,
658     // which would create race with the launching thread and
659     // complications in handle management on Windows.
660 
661     // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
662     while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
663         bool have_to_sleep = false;
664         if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
665             if( my_server.wait_active_thread() ) {
666                 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
667                     my_server.wake_some( 0, 1 );
668                 } else {
669                     my_server.release_active_thread();
670                     have_to_sleep = true;
671                 }
672             }
673         } else {
674             have_to_sleep = true;
675         }
676         if( have_to_sleep ) {
677             // Check/set the invariant for sleeping
678             if( my_server.my_slack.load(std::memory_order_acquire)<0 ) {
679                 my_thread_monitor.wait();
680             }
681         }
682     }
683 
684     my_server.remove_server_ref();
685 }
686 
687 inline bool ipc_waker::wake_or_launch() {
688     state_t excepted = st_init;
689     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
690         // after this point, remove_server_ref() must be done by created thread
691 #if USE_WINTHREAD
692         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
693 #elif USE_PTHREAD
694         {
695         affinity_helper fpa;
696         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
697         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
698         if( my_handle == 0 ) {
699             runtime_warning( "Unable to create new thread for process %d", getpid() );
700             state_t s = st_starting;
701             my_state.compare_exchange_strong(s, st_init);
702             if (st_starting != s) {
703                 // Do shutdown during startup. my_handle can't be released
704                 // by start_shutdown, because my_handle value might be not set yet
705                 // at time of transition from st_starting to st_quit.
706                 __TBB_ASSERT( s==st_quit, nullptr );
707                 release_handle( my_handle, my_server.my_join_workers );
708             }
709             return false;
710         } else {
711             my_server.my_ref_count++;
712         }
713         // Implicit destruction of fpa resets original affinity mask.
714         }
715 #endif /* USE_PTHREAD */
716         state_t s = st_starting;
717         my_state.compare_exchange_strong(s, st_normal);
718         if( st_starting!=s ) {
719             // Do shutdown during startup. my_handle can't be released
720             // by start_shutdown, because my_handle value might be not set yet
721             // at time of transition from st_starting to st_quit.
722             __TBB_ASSERT( s==st_quit, nullptr );
723             release_handle( my_handle, my_server.my_join_workers );
724         }
725     }
726     else {
727         my_thread_monitor.notify();
728     }
729 
730     return true;
731 }
732 
733 //------------------------------------------------------------------------
734 // Methods of ipc_stopper
735 //------------------------------------------------------------------------
736 #if _MSC_VER && !defined(__INTEL_COMPILER)
737     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
738     #pragma warning(push)
739     #pragma warning(disable:4189)
740 #endif
741 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
742 // ensure that stack is properly aligned
743 __attribute__((force_align_arg_pointer))
744 #endif
745 __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) {
746     ipc_stopper* self = static_cast<ipc_stopper*>(arg);
747     AVOID_64K_ALIASING( self->my_index );
748     self->run();
749     return 0;
750 }
751 #if _MSC_VER && !defined(__INTEL_COMPILER)
752     #pragma warning(pop)
753 #endif
754 
755 void ipc_stopper::run() {
756     // Transiting to st_normal here would require setting my_handle,
757     // which would create race with the launching thread and
758     // complications in handle management on Windows.
759 
760     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
761         if( my_server.wait_stop_thread() ) {
762             if( my_state.load(std::memory_order_acquire)!=st_quit ) {
763                 if( !my_server.stop_one() ) {
764                     my_server.add_stop_thread();
765                     tbb::detail::r1::prolonged_pause();
766                 }
767             }
768         }
769     }
770 
771     my_server.remove_server_ref();
772 }
773 
774 inline bool ipc_stopper::wake_or_launch() {
775     state_t excepted = st_init;
776     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
777         // after this point, remove_server_ref() must be done by created thread
778 #if USE_WINTHREAD
779         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
780 #elif USE_PTHREAD
781         {
782         affinity_helper fpa;
783         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
784         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
785         if( my_handle == 0 ) {
786             runtime_warning( "Unable to create new thread for process %d", getpid() );
787             state_t s = st_starting;
788             my_state.compare_exchange_strong(s, st_init);
789             if (st_starting != s) {
790                 // Do shutdown during startup. my_handle can't be released
791                 // by start_shutdown, because my_handle value might be not set yet
792                 // at time of transition from st_starting to st_quit.
793                 __TBB_ASSERT( s==st_quit, nullptr );
794                 release_handle( my_handle, my_server.my_join_workers );
795             }
796             return false;
797         } else {
798             my_server.my_ref_count++;
799         }
800         // Implicit destruction of fpa resets original affinity mask.
801         }
802 #endif /* USE_PTHREAD */
803         state_t s = st_starting;
804         my_state.compare_exchange_strong(s, st_normal);
805         if( st_starting!=s ) {
806             // Do shutdown during startup. my_handle can't be released
807             // by start_shutdown, because my_handle value might be not set yet
808             // at time of transition from st_starting to st_quit.
809             __TBB_ASSERT( s==st_quit, nullptr );
810             release_handle( my_handle, my_server.my_join_workers );
811         }
812     }
813     else {
814         my_thread_monitor.notify();
815     }
816 
817     return true;
818 }
819 
820 //------------------------------------------------------------------------
821 // Methods of ipc_server
822 //------------------------------------------------------------------------
823 ipc_server::ipc_server(tbb_client& client) :
824     my_client( client ),
825     my_stack_size( client.min_stack_size() ),
826     my_thread_array(nullptr),
827     my_join_workers(false),
828     my_waker(nullptr),
829     my_stopper(nullptr)
830 {
831     my_ref_count = 1;
832     my_slack = 0;
833 #if TBB_USE_ASSERT
834     my_net_slack_requests = 0;
835 #endif /* TBB_USE_ASSERT */
836     my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME);
837     if( my_n_thread==0 ) {
838         my_n_thread = tbb::detail::r1::AvailableHwConcurrency();
839         __TBB_ASSERT( my_n_thread>0, nullptr );
840     }
841 
842     my_asleep_list_root = nullptr;
843     my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread );
844     for( size_t i=0; i<my_n_thread; ++i ) {
845         ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i );
846         t->my_next = my_asleep_list_root;
847         my_asleep_list_root = t;
848     }
849 
850     my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1);
851     new( my_waker ) ipc_waker( *this, client, my_n_thread );
852 
853     my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1);
854     new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 );
855 
856     char* active_sem_name = get_active_sem_name();
857     my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 );
858     __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
859     delete[] active_sem_name;
860 
861     char* stop_sem_name = get_stop_sem_name();
862     my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 );
863     __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" );
864     delete[] stop_sem_name;
865 }
866 
867 ipc_server::~ipc_server() {
868     __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr );
869 
870     for( size_t i=my_n_thread; i--; )
871         my_thread_array[i].~padded_ipc_worker();
872     tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread );
873     tbb::detail::d0::poison_pointer( my_thread_array );
874 
875     my_waker->~ipc_waker();
876     tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 );
877     tbb::detail::d0::poison_pointer( my_waker );
878 
879     my_stopper->~ipc_stopper();
880     tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 );
881     tbb::detail::d0::poison_pointer( my_stopper );
882 
883     sem_close( my_active_sem );
884     sem_close( my_stop_sem );
885 }
886 
887 inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) {
888     asleep_list_mutex_type::scoped_lock lock;
889     if( !lock.try_acquire( my_asleep_list_mutex ) )
890         return false;
891     // Contribute to slack under lock so that if another takes that unit of slack,
892     // it sees us sleeping on the list and wakes us up.
893     int k = ++my_slack;
894     if( k<=0 ) {
895         t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
896         my_asleep_list_root.store(&t, std::memory_order_relaxed);
897         return true;
898     } else {
899         --my_slack;
900         return false;
901     }
902 }
903 
904 inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) {
905     asleep_list_mutex_type::scoped_lock lock;
906     if( !lock.try_acquire( my_asleep_list_mutex ) )
907         return false;
908     // Contribute to slack under lock so that if another takes that unit of slack,
909     // it sees us sleeping on the list and wakes us up.
910     ++my_slack;
911     t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
912     my_asleep_list_root.store(&t, std::memory_order_relaxed);
913     return true;
914 }
915 
916 inline bool ipc_server::wait_active_thread() {
917     if( sem_wait( my_active_sem ) == 0 ) {
918         ++my_global_thread_count;
919         return true;
920     }
921     return false;
922 }
923 
924 inline bool ipc_server::try_get_active_thread() {
925     if( sem_trywait( my_active_sem ) == 0 ) {
926         ++my_global_thread_count;
927         return true;
928     }
929     return false;
930 }
931 
932 inline void ipc_server::release_active_thread() {
933     release_thread_sem( my_active_sem );
934 }
935 
936 inline bool ipc_server::wait_stop_thread() {
937     struct timespec ts;
938     if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) {
939         ts.tv_sec++;
940         if( sem_timedwait( my_stop_sem, &ts )==0 ) {
941             return true;
942         }
943     }
944     return false;
945 }
946 
947 inline void ipc_server::add_stop_thread() {
948     sem_post( my_stop_sem );
949 }
950 
951 void ipc_server::wake_some( int additional_slack, int active_threads ) {
952     __TBB_ASSERT( additional_slack>=0, nullptr );
953     ipc_worker* wakee[2];
954     ipc_worker **w = wakee;
955     {
956         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
957         while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) {
958             if( additional_slack>0 ) {
959                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
960                     break;
961                 --additional_slack;
962             } else {
963                 // Chain reaction; Try to claim unit of slack
964                 int old;
965                 do {
966                     old = my_slack.load(std::memory_order_relaxed);
967                     if( old<=0 ) goto done;
968                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
969             }
970             // Pop sleeping worker to combine with claimed unit of slack
971             my_asleep_list_root.store(
972                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
973                 std::memory_order_relaxed
974             );
975             --active_threads;
976         }
977         if( additional_slack ) {
978             // Contribute our unused slack to my_slack.
979             my_slack += additional_slack;
980         }
981     }
982 done:
983     while( w>wakee ) {
984         if( !(*--w)->wake_or_launch() ) {
985             add_stop_thread();
986             do {
987             } while( !try_insert_in_asleep_list_forced(**w) );
988             release_active_thread();
989         }
990     }
991     while( active_threads ) {
992         release_active_thread();
993         --active_threads;
994     }
995 }
996 
997 void ipc_server::wake_one_forced( int additional_slack ) {
998     __TBB_ASSERT( additional_slack>=0, nullptr );
999     ipc_worker* wakee[1];
1000     ipc_worker **w = wakee;
1001     {
1002         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1003         while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) {
1004             if( additional_slack>0 ) {
1005                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
1006                     break;
1007                 --additional_slack;
1008             } else {
1009                 // Chain reaction; Try to claim unit of slack
1010                 int old;
1011                 do {
1012                     old = my_slack.load(std::memory_order_relaxed);
1013                     if( old<=0 ) goto done;
1014                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
1015             }
1016             // Pop sleeping worker to combine with claimed unit of slack
1017             my_asleep_list_root.store(
1018                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
1019                 std::memory_order_relaxed);
1020         }
1021         if( additional_slack ) {
1022             // Contribute our unused slack to my_slack.
1023             my_slack += additional_slack;
1024         }
1025     }
1026 done:
1027     while( w>wakee ) {
1028         if( !(*--w)->wake_or_launch() ) {
1029             add_stop_thread();
1030             do {
1031             } while( !try_insert_in_asleep_list_forced(**w) );
1032         }
1033     }
1034 }
1035 
1036 bool ipc_server::stop_one() {
1037     ipc_worker* current = nullptr;
1038     ipc_worker* next = nullptr;
1039     {
1040         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1041         if( my_asleep_list_root.load(std::memory_order_relaxed) ) {
1042             current = my_asleep_list_root.load(std::memory_order_relaxed);
1043             if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1044                 next = current->my_next;
1045                 while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1046                     current = next;
1047                     next = current->my_next;
1048                 }
1049                 current->start_stopping( my_join_workers );
1050                 return true;
1051             }
1052         }
1053     }
1054     return false;
1055 }
1056 
1057 void ipc_server::adjust_job_count_estimate( int delta ) {
1058 #if TBB_USE_ASSERT
1059     my_net_slack_requests+=delta;
1060 #endif /* TBB_USE_ASSERT */
1061     if( my_n_thread > 1 ) {
1062         if( delta<0 ) {
1063             my_slack+=delta;
1064         } else if( delta>0 ) {
1065             int active_threads = 0;
1066             if( try_get_active_thread() ) {
1067                 ++active_threads;
1068                 if( try_get_active_thread() ) {
1069                     ++active_threads;
1070                 }
1071             }
1072             wake_some( delta, active_threads );
1073 
1074             if( !my_waker->wake_or_launch() ) {
1075                 add_stop_thread();
1076             }
1077             if( !my_stopper->wake_or_launch() ) {
1078                 add_stop_thread();
1079             }
1080         }
1081     } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one
1082         if( delta<0 ) {
1083             my_slack += delta;
1084         } else {
1085             wake_one_forced( delta );
1086         }
1087     }
1088 }
1089 
1090 //------------------------------------------------------------------------
1091 // RML factory methods
1092 //------------------------------------------------------------------------
1093 
1094 #if USE_PTHREAD
1095 
1096 static tbb_client* my_global_client = nullptr;
1097 static tbb_server* my_global_server = nullptr;
1098 
1099 void rml_atexit() {
1100     release_resources();
1101 }
1102 
1103 void rml_atfork_child() {
1104     if( my_global_server!=nullptr && my_global_client!=nullptr ) {
1105         ipc_server* server = static_cast<ipc_server*>( my_global_server );
1106         server->~ipc_server();
1107         // memset( server, 0, sizeof(ipc_server) );
1108         new( server ) ipc_server( *my_global_client );
1109         pthread_atfork( nullptr, nullptr, rml_atfork_child );
1110         atexit( rml_atexit );
1111     }
1112 }
1113 
1114 #endif /* USE_PTHREAD */
1115 
1116 extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) {
1117     server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client);
1118 #if USE_PTHREAD
1119     my_global_client = &client;
1120     my_global_server = server;
1121     pthread_atfork( nullptr, nullptr, rml_atfork_child );
1122     atexit( rml_atexit );
1123 #endif /* USE_PTHREAD */
1124     if( getenv( "RML_DEBUG" ) ) {
1125         runtime_warning("IPC server is started");
1126     }
1127     return tbb_factory::st_success;
1128 }
1129 
1130 extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) {
1131 }
1132 
1133 } // namespace rml
1134 } // namespace detail
1135 
1136 } // namespace tbb
1137