xref: /oneTBB/python/rml/ipc_server.cpp (revision 6caecf96)
1 /*
2     Copyright (c) 2017-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include <atomic>
18 #include <cstring>
19 #include <cstdlib>
20 
21 #include "../../src/tbb/rml_tbb.h"
22 #include "../../src/tbb/rml_thread_monitor.h"
23 #include "../../src/tbb/scheduler_common.h"
24 #include "../../src/tbb/governor.h"
25 #include "../../src/tbb/misc.h"
26 #include "tbb/cache_aligned_allocator.h"
27 
28 #include "ipc_utils.h"
29 
30 #include <fcntl.h>
31 #include <stdlib.h>
32 
33 namespace rml {
34 namespace internal {
35 
36 static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE";
37 
38 typedef versioned_object::version_type version_type;
39 
40 extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) {
41     if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) {
42         return factory::st_incompatible;
43     }
44 
45     // Hack to keep this library from being closed
46     static std::atomic<bool> one_time_flag{false};
47     bool expected = false;
48 
49     if( one_time_flag.compare_exchange_strong(expected, true) ) {
50         __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr );
51 #if _WIN32||_WIN64
52         f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
53 #else
54         f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
55 #endif
56     }
57     // End of hack
58 
59     return factory::st_success;
60 }
61 
62 extern "C" void __RML_close_factory(factory& /*f*/) {
63 }
64 
65 class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor {
66 public:
67     ipc_thread_monitor() : thread_monitor() {}
68 
69 #if USE_WINTHREAD
70 #elif USE_PTHREAD
71     static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size);
72 #endif
73 };
74 
75 #if USE_WINTHREAD
76 #elif USE_PTHREAD
77 inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) {
78     pthread_attr_t s;
79     if( pthread_attr_init( &s ) ) return 0;
80     if( stack_size>0 ) {
81         if( pthread_attr_setstacksize( &s, stack_size ) ) return 0;
82     }
83     pthread_t handle;
84     if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0;
85     if( pthread_attr_destroy( &s ) ) return 0;
86     return handle;
87 }
88 #endif
89 
90 }} // rml::internal
91 
92 using rml::internal::ipc_thread_monitor;
93 using tbb::internal::rml::get_shared_name;
94 
95 namespace tbb {
96 namespace detail {
97 
98 namespace r1 {
99 bool terminate_on_exception() {
100     return false;
101 }
102 }
103 
104 namespace rml {
105 
106 typedef ipc_thread_monitor::handle_type thread_handle;
107 
108 class ipc_server;
109 
110 static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS";
111 static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active";
112 static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop";
113 static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE";
114 static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE";
115 static const mode_t IPC_SEM_MODE = 0660;
116 
117 static std::atomic<int> my_global_thread_count;
118 using tbb_client = tbb::detail::r1::rml::tbb_client;
119 using tbb_server = tbb::detail::r1::rml::tbb_server;
120 using tbb_factory = tbb::detail::r1::rml::tbb_factory;
121 
122 using tbb::detail::r1::runtime_warning;
123 
124 char* get_sem_name(const char* name, const char* prefix) {
125     __TBB_ASSERT(name != nullptr, nullptr);
126     __TBB_ASSERT(prefix != nullptr, nullptr);
127     char* value = std::getenv(name);
128     std::size_t len = value == nullptr ? 0 : std::strlen(value);
129     if (len > 0) {
130         // TODO: consider returning the original string instead of the copied string.
131         char* sem_name = new char[len + 1];
132         __TBB_ASSERT(sem_name != nullptr, nullptr);
133         std::strncpy(sem_name, value, len+1);
134         __TBB_ASSERT(sem_name[len] == 0, nullptr);
135         return sem_name;
136     } else {
137         return get_shared_name(prefix);
138     }
139 }
140 
141 char* get_active_sem_name() {
142     return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
143 }
144 
145 char* get_stop_sem_name() {
146     return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
147 }
148 
149 static void release_thread_sem(sem_t* my_sem) {
150     int old = my_global_thread_count.load(std::memory_order_relaxed);
151     do {
152         if( old<=0 ) return;
153     } while( !my_global_thread_count.compare_exchange_strong(old, old-1) );
154     if( old>0 ) {
155         sem_post( my_sem );
156     }
157 }
158 
159 void set_sem_name(const char* name, const char* prefix) {
160     __TBB_ASSERT(name != nullptr, nullptr);
161     __TBB_ASSERT(prefix != nullptr, nullptr);
162     const char* postfix = "_XXXXXX";
163     std::size_t plen = std::strlen(prefix);
164     std::size_t xlen = std::strlen(postfix);
165     char* templ = new char[plen + xlen + 1];
166     __TBB_ASSERT(templ != nullptr, nullptr);
167     strncpy(templ, prefix, plen+1);
168     __TBB_ASSERT(templ[plen] == 0, nullptr);
169     strncat(templ, postfix, xlen + 1);
170     __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr);
171     // TODO: consider using mkstemp instead of mktemp.
172     char* sem_name = mktemp(templ);
173     if (sem_name != nullptr) {
174         int status = setenv(name, sem_name,  /*overwrite*/ 1);
175         __TBB_ASSERT_EX(status == 0, nullptr);
176     }
177     delete[] templ;
178 }
179 
180 extern "C" void set_active_sem_name() {
181     set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
182 }
183 
184 extern "C" void set_stop_sem_name() {
185     set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
186 }
187 
188 extern "C" void release_resources() {
189     if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) {
190         char* active_sem_name = get_active_sem_name();
191         sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT );
192         __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
193         delete[] active_sem_name;
194 
195         do {
196             release_thread_sem( my_active_sem );
197         } while( my_global_thread_count.load(std::memory_order_acquire)!=0 );
198     }
199 }
200 
201 extern "C" void release_semaphores() {
202     int status = 0;
203     char* sem_name = nullptr;
204 
205     sem_name = get_active_sem_name();
206     if( sem_name==nullptr ) {
207         runtime_warning("Can not get RML semaphore name");
208         return;
209     }
210     status = sem_unlink( sem_name );
211     if( status!=0 ) {
212         if( errno==ENOENT ) {
213             /* There is no semaphore with the given name, nothing to do */
214         } else {
215             runtime_warning("Can not release RML semaphore");
216             return;
217         }
218     }
219     delete[] sem_name;
220 
221     sem_name = get_stop_sem_name();
222     if( sem_name==nullptr ) {
223         runtime_warning( "Can not get RML semaphore name" );
224         return;
225     }
226     status = sem_unlink( sem_name );
227     if( status!=0 ) {
228         if( errno==ENOENT ) {
229             /* There is no semaphore with the given name, nothing to do */
230         } else {
231             runtime_warning("Can not release RML semaphore");
232             return;
233         }
234     }
235     delete[] sem_name;
236 }
237 
238 class ipc_worker: no_copy {
239 protected:
240     //! State in finite-state machine that controls the worker.
241     /** State diagram:
242                     /----------stop---\
243                     |           ^     |
244                     V           |     |
245         init --> starting --> normal  |
246           |         |           |     |
247           |         V           |     |
248           \------> quit <-------/<----/
249       */
250     enum state_t {
251         //! *this is initialized
252         st_init,
253         //! *this has associated thread that is starting up.
254         st_starting,
255         //! Associated thread is doing normal life sequence.
256         st_normal,
257         //! Associated thread is stopped but can be started again.
258         st_stop,
259         //! Associated thread has ended normal life sequence and promises to never touch *this again.
260         st_quit
261     };
262     std::atomic<state_t> my_state;
263 
264     //! Associated server
265     ipc_server& my_server;
266 
267     //! Associated client
268     tbb_client& my_client;
269 
270     //! index used for avoiding the 64K aliasing problem
271     const size_t my_index;
272 
273     //! Monitor for sleeping when there is no work to do.
274     /** The invariant that holds for sleeping workers is:
275         "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
276     ipc_thread_monitor my_thread_monitor;
277 
278     //! Handle of the OS thread associated with this worker
279     thread_handle my_handle;
280 
281     //! Link for list of workers that are sleeping or have no associated thread.
282     ipc_worker* my_next;
283 
284     friend class ipc_server;
285 
286     //! Actions executed by the associated thread
287     void run();
288 
289     //! Wake up associated thread (or launch a thread if there is none)
290     bool wake_or_launch();
291 
292     //! Called by a thread (usually not the associated thread) to commence termination.
293     void start_shutdown(bool join);
294 
295     //! Called by a thread (usually not the associated thread) to commence stopping.
296     void start_stopping(bool join);
297 
298     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
299 
300     static void release_handle(thread_handle my_handle, bool join);
301 
302 protected:
303     ipc_worker(ipc_server& server, tbb_client& client, const size_t i) :
304         my_server(server),
305         my_client(client),
306         my_index(i)
307     {
308         my_state = st_init;
309     }
310 };
311 
312 //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit
313 constexpr static size_t cache_line_sz = 128;
314 
315 #if _MSC_VER && !defined(__INTEL_COMPILER)
316     // Suppress overzealous compiler warnings about uninstantiable class
317     #pragma warning(push)
318     #pragma warning(disable:4510 4610)
319 #endif
320 class padded_ipc_worker: public ipc_worker {
321     char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz];
322 public:
323     padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i)
324     : ipc_worker( server,client,i ) { suppress_unused_warning(pad); }
325 };
326 #if _MSC_VER && !defined(__INTEL_COMPILER)
327     #pragma warning(pop)
328 #endif
329 
330 class ipc_waker : public padded_ipc_worker {
331 private:
332     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
333     void run();
334     bool wake_or_launch();
335 
336     friend class ipc_server;
337 
338 public:
339     ipc_waker(ipc_server& server, tbb_client& client, const size_t i)
340     : padded_ipc_worker( server, client, i ) {}
341 };
342 
343 class ipc_stopper : public padded_ipc_worker {
344 private:
345     static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
346     void run();
347     bool wake_or_launch();
348 
349     friend class ipc_server;
350 
351 public:
352     ipc_stopper(ipc_server& server, tbb_client& client, const size_t i)
353     : padded_ipc_worker( server, client, i ) {}
354 };
355 
356 class ipc_server: public tbb_server, no_copy {
357 private:
358     tbb_client& my_client;
359     //! Maximum number of threads to be created.
360     /** Threads are created lazily, so maximum might not actually be reached. */
361     tbb_client::size_type my_n_thread;
362 
363     //! Stack size for each thread. */
364     const size_t my_stack_size;
365 
366     //! Number of jobs that could use their associated thread minus number of active threads.
367     /** If negative, indicates oversubscription.
368         If positive, indicates that more threads should run.
369         Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
370         because raising it impacts the invariant for sleeping threads. */
371     std::atomic<int> my_slack;
372 
373     //! Counter used to determine when to delete this.
374     std::atomic<int> my_ref_count;
375 
376     padded_ipc_worker* my_thread_array;
377 
378     //! List of workers that are asleep or committed to sleeping until notified by another thread.
379     std::atomic<ipc_worker*> my_asleep_list_root;
380 
381     //! Protects my_asleep_list_root
382     typedef scheduler_mutex_type asleep_list_mutex_type;
383     asleep_list_mutex_type my_asleep_list_mutex;
384 
385     //! Should server wait workers while terminate
386     const bool my_join_workers;
387 
388     //! Service thread for waking of workers
389     ipc_waker* my_waker;
390 
391     //! Service thread to stop threads
392     ipc_stopper* my_stopper;
393 
394     //! Semaphore to account active threads
395     sem_t* my_active_sem;
396 
397     //! Semaphore to account stop threads
398     sem_t* my_stop_sem;
399 
400 #if TBB_USE_ASSERT
401     std::atomic<int> my_net_slack_requests;
402 #endif /* TBB_USE_ASSERT */
403 
404     //! Wake up to two sleeping workers, if there are any sleeping.
405     /** The call is used to propagate a chain reaction where each thread wakes up two threads,
406         which in turn each wake up two threads, etc. */
407     void propagate_chain_reaction() {
408         // First test of a double-check idiom.  Second test is inside wake_some(0).
409         if( my_slack.load(std::memory_order_acquire)>0 ) {
410             int active_threads = 0;
411             if( try_get_active_thread() ) {
412                 ++active_threads;
413                 if( try_get_active_thread() ) {
414                     ++active_threads;
415                 }
416                 wake_some( 0, active_threads );
417             }
418         }
419     }
420 
421     //! Try to add t to list of sleeping workers
422     bool try_insert_in_asleep_list(ipc_worker& t);
423 
424     //! Try to add t to list of sleeping workers even if there is some work to do
425     bool try_insert_in_asleep_list_forced(ipc_worker& t);
426 
427     //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
428     void wake_some(int additional_slack, int active_threads);
429 
430     //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits.
431     void wake_one_forced(int additional_slack);
432 
433     //! Stop one thread from asleep list
434     bool stop_one();
435 
436     //! Wait for active thread
437     bool wait_active_thread();
438 
439     //! Try to get active thread
440     bool try_get_active_thread();
441 
442     //! Release active thread
443     void release_active_thread();
444 
445     //! Wait for thread to stop
446     bool wait_stop_thread();
447 
448     //! Add thread to stop list
449     void add_stop_thread();
450 
451     void remove_server_ref() {
452         if( --my_ref_count==0 ) {
453             my_client.acknowledge_close_connection();
454             this->~ipc_server();
455             tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 );
456         }
457     }
458 
459     friend class ipc_worker;
460     friend class ipc_waker;
461     friend class ipc_stopper;
462 public:
463     ipc_server(tbb_client& client);
464     virtual ~ipc_server();
465 
466     version_type version() const override {
467         return 0;
468     }
469 
470     void request_close_connection(bool /*exiting*/) override {
471         my_waker->start_shutdown(false);
472         my_stopper->start_shutdown(false);
473         for( size_t i=0; i<my_n_thread; ++i )
474             my_thread_array[i].start_shutdown( my_join_workers );
475         remove_server_ref();
476     }
477 
478     void yield() override {d0::yield();}
479 
480     void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); }
481 
482     unsigned default_concurrency() const override { return my_n_thread - 1; }
483 
484     void adjust_job_count_estimate(int delta) override;
485 
486 #if _WIN32||_WIN64
487     void register_external_thread(::rml::server::execution_resource_t&) override {}
488     void unregister_external_thread(::rml::server::execution_resource_t) override {}
489 #endif /* _WIN32||_WIN64 */
490 };
491 
492 //------------------------------------------------------------------------
493 // Methods of ipc_worker
494 //------------------------------------------------------------------------
495 #if _MSC_VER && !defined(__INTEL_COMPILER)
496     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
497     #pragma warning(push)
498     #pragma warning(disable:4189)
499 #endif
500 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
501 // ensure that stack is properly aligned
502 __attribute__((force_align_arg_pointer))
503 #endif
504 __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) {
505     ipc_worker* self = static_cast<ipc_worker*>(arg);
506     AVOID_64K_ALIASING( self->my_index );
507     self->run();
508     return 0;
509 }
510 #if _MSC_VER && !defined(__INTEL_COMPILER)
511     #pragma warning(pop)
512 #endif
513 
514 void ipc_worker::release_handle(thread_handle handle, bool join) {
515     if( join )
516         ipc_thread_monitor::join( handle );
517     else
518         ipc_thread_monitor::detach_thread( handle );
519 }
520 
521 void ipc_worker::start_shutdown(bool join) {
522     state_t s = my_state.load(std::memory_order_relaxed);;
523 
524     do {
525         __TBB_ASSERT( s!=st_quit, nullptr );
526     } while( !my_state.compare_exchange_strong( s, st_quit ) );
527     if( s==st_normal || s==st_starting ) {
528         // May have invalidated invariant for sleeping, so wake up the thread.
529         // Note that the notify() here occurs without maintaining invariants for my_slack.
530         // It does not matter, because my_state==st_quit overrides checking of my_slack.
531         my_thread_monitor.notify();
532         // Do not need release handle in st_init state,
533         // because in this case the thread wasn't started yet.
534         // For st_starting release is done at launch site.
535         if( s==st_normal )
536             release_handle( my_handle, join );
537     }
538 }
539 
540 void ipc_worker::start_stopping(bool join) {
541     state_t s = my_state.load(std::memory_order_relaxed);;
542 
543     while( !my_state.compare_exchange_strong( s, st_quit ) ) {};
544     if( s==st_normal || s==st_starting ) {
545         // May have invalidated invariant for sleeping, so wake up the thread.
546         // Note that the notify() here occurs without maintaining invariants for my_slack.
547         // It does not matter, because my_state==st_quit overrides checking of my_slack.
548         my_thread_monitor.notify();
549         // Do not need release handle in st_init state,
550         // because in this case the thread wasn't started yet.
551         // For st_starting release is done at launch site.
552         if( s==st_normal )
553             release_handle( my_handle, join );
554     }
555 }
556 
557 void ipc_worker::run() {
558     my_server.propagate_chain_reaction();
559 
560     // Transiting to st_normal here would require setting my_handle,
561     // which would create race with the launching thread and
562     // complications in handle management on Windows.
563 
564     ::rml::job& j = *my_client.create_one_job();
565     state_t state = my_state.load(std::memory_order_acquire);
566     while( state!=st_quit && state!=st_stop ) {
567         if( my_server.my_slack>=0 ) {
568             my_client.process(j);
569         } else {
570             ipc_thread_monitor::cookie c;
571             // Prepare to wait
572             my_thread_monitor.prepare_wait(c);
573             // Check/set the invariant for sleeping
574             state = my_state.load(std::memory_order_acquire);
575             if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
576                 if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
577                 my_thread_monitor.commit_wait(c);
578                 my_server.propagate_chain_reaction();
579             } else {
580                 // Invariant broken
581                 my_thread_monitor.cancel_wait();
582             }
583         }
584         state = my_state.load(std::memory_order_acquire);
585     }
586     my_client.cleanup(j);
587 
588     my_server.remove_server_ref();
589 }
590 
591 inline bool ipc_worker::wake_or_launch() {
592     state_t excepted_stop = st_stop, expected_init = st_init;
593     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) ||
594         ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) {
595         // after this point, remove_server_ref() must be done by created thread
596 #if USE_WINTHREAD
597         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
598 #elif USE_PTHREAD
599         {
600         affinity_helper fpa;
601         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
602         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
603         if( my_handle == 0 ) {
604             // Unable to create new thread for process
605             // However, this is expected situation for the use cases of this coordination server
606             state_t s = st_starting;
607             my_state.compare_exchange_strong( s, st_init );
608             if (st_starting != s) {
609                 // Do shutdown during startup. my_handle can't be released
610                 // by start_shutdown, because my_handle value might be not set yet
611                 // at time of transition from st_starting to st_quit.
612                 __TBB_ASSERT( s==st_quit, nullptr );
613                 release_handle( my_handle, my_server.my_join_workers );
614             }
615             return false;
616         } else {
617             my_server.my_ref_count++;
618         }
619         // Implicit destruction of fpa resets original affinity mask.
620         }
621 #endif /* USE_PTHREAD */
622         state_t s = st_starting;
623         my_state.compare_exchange_strong( s, st_normal );
624         if( st_starting!=s ) {
625             // Do shutdown during startup. my_handle can't be released
626             // by start_shutdown, because my_handle value might be not set yet
627             // at time of transition from st_starting to st_quit.
628             __TBB_ASSERT( s==st_quit, nullptr );
629             release_handle( my_handle, my_server.my_join_workers );
630         }
631     }
632     else {
633         my_thread_monitor.notify();
634     }
635 
636     return true;
637 }
638 
639 //------------------------------------------------------------------------
640 // Methods of ipc_waker
641 //------------------------------------------------------------------------
642 #if _MSC_VER && !defined(__INTEL_COMPILER)
643     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
644     #pragma warning(push)
645     #pragma warning(disable:4189)
646 #endif
647 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
648 // ensure that stack is properly aligned
649 __attribute__((force_align_arg_pointer))
650 #endif
651 __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) {
652     ipc_waker* self = static_cast<ipc_waker*>(arg);
653     AVOID_64K_ALIASING( self->my_index );
654     self->run();
655     return 0;
656 }
657 #if _MSC_VER && !defined(__INTEL_COMPILER)
658     #pragma warning(pop)
659 #endif
660 
661 void ipc_waker::run() {
662     // Transiting to st_normal here would require setting my_handle,
663     // which would create race with the launching thread and
664     // complications in handle management on Windows.
665 
666     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
667         bool have_to_sleep = false;
668         if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
669             if( my_server.wait_active_thread() ) {
670                 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
671                     my_server.wake_some( 0, 1 );
672                 } else {
673                     my_server.release_active_thread();
674                     have_to_sleep = true;
675                 }
676             }
677         } else {
678             have_to_sleep = true;
679         }
680         if( have_to_sleep ) {
681             ipc_thread_monitor::cookie c;
682             // Prepare to wait
683             my_thread_monitor.prepare_wait(c);
684             // Check/set the invariant for sleeping
685             if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.my_slack.load(std::memory_order_acquire)<0 ) {
686                 my_thread_monitor.commit_wait(c);
687             } else {
688                 // Invariant broken
689                 my_thread_monitor.cancel_wait();
690             }
691         }
692     }
693 
694     my_server.remove_server_ref();
695 }
696 
697 inline bool ipc_waker::wake_or_launch() {
698     state_t excepted = st_init;
699     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
700         // after this point, remove_server_ref() must be done by created thread
701 #if USE_WINTHREAD
702         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
703 #elif USE_PTHREAD
704         {
705         affinity_helper fpa;
706         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
707         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
708         if( my_handle == 0 ) {
709             runtime_warning( "Unable to create new thread for process %d", getpid() );
710             state_t s = st_starting;
711             my_state.compare_exchange_strong(s, st_init);
712             if (st_starting != s) {
713                 // Do shutdown during startup. my_handle can't be released
714                 // by start_shutdown, because my_handle value might be not set yet
715                 // at time of transition from st_starting to st_quit.
716                 __TBB_ASSERT( s==st_quit, nullptr );
717                 release_handle( my_handle, my_server.my_join_workers );
718             }
719             return false;
720         } else {
721             my_server.my_ref_count++;
722         }
723         // Implicit destruction of fpa resets original affinity mask.
724         }
725 #endif /* USE_PTHREAD */
726         state_t s = st_starting;
727         my_state.compare_exchange_strong(s, st_normal);
728         if( st_starting!=s ) {
729             // Do shutdown during startup. my_handle can't be released
730             // by start_shutdown, because my_handle value might be not set yet
731             // at time of transition from st_starting to st_quit.
732             __TBB_ASSERT( s==st_quit, nullptr );
733             release_handle( my_handle, my_server.my_join_workers );
734         }
735     }
736     else {
737         my_thread_monitor.notify();
738     }
739 
740     return true;
741 }
742 
743 //------------------------------------------------------------------------
744 // Methods of ipc_stopper
745 //------------------------------------------------------------------------
746 #if _MSC_VER && !defined(__INTEL_COMPILER)
747     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
748     #pragma warning(push)
749     #pragma warning(disable:4189)
750 #endif
751 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
752 // ensure that stack is properly aligned
753 __attribute__((force_align_arg_pointer))
754 #endif
755 __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) {
756     ipc_stopper* self = static_cast<ipc_stopper*>(arg);
757     AVOID_64K_ALIASING( self->my_index );
758     self->run();
759     return 0;
760 }
761 #if _MSC_VER && !defined(__INTEL_COMPILER)
762     #pragma warning(pop)
763 #endif
764 
765 void ipc_stopper::run() {
766     // Transiting to st_normal here would require setting my_handle,
767     // which would create race with the launching thread and
768     // complications in handle management on Windows.
769 
770     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
771         if( my_server.wait_stop_thread() ) {
772             if( my_state.load(std::memory_order_acquire)!=st_quit ) {
773                 if( !my_server.stop_one() ) {
774                     my_server.add_stop_thread();
775                     tbb::detail::r1::prolonged_pause();
776                 }
777             }
778         }
779     }
780 
781     my_server.remove_server_ref();
782 }
783 
784 inline bool ipc_stopper::wake_or_launch() {
785     state_t excepted = st_init;
786     if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
787         // after this point, remove_server_ref() must be done by created thread
788 #if USE_WINTHREAD
789         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
790 #elif USE_PTHREAD
791         {
792         affinity_helper fpa;
793         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
794         my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
795         if( my_handle == 0 ) {
796             runtime_warning( "Unable to create new thread for process %d", getpid() );
797             state_t s = st_starting;
798             my_state.compare_exchange_strong(s, st_init);
799             if (st_starting != s) {
800                 // Do shutdown during startup. my_handle can't be released
801                 // by start_shutdown, because my_handle value might be not set yet
802                 // at time of transition from st_starting to st_quit.
803                 __TBB_ASSERT( s==st_quit, nullptr );
804                 release_handle( my_handle, my_server.my_join_workers );
805             }
806             return false;
807         } else {
808             my_server.my_ref_count++;
809         }
810         // Implicit destruction of fpa resets original affinity mask.
811         }
812 #endif /* USE_PTHREAD */
813         state_t s = st_starting;
814         my_state.compare_exchange_strong(s, st_normal);
815         if( st_starting!=s ) {
816             // Do shutdown during startup. my_handle can't be released
817             // by start_shutdown, because my_handle value might be not set yet
818             // at time of transition from st_starting to st_quit.
819             __TBB_ASSERT( s==st_quit, nullptr );
820             release_handle( my_handle, my_server.my_join_workers );
821         }
822     }
823     else {
824         my_thread_monitor.notify();
825     }
826 
827     return true;
828 }
829 
830 //------------------------------------------------------------------------
831 // Methods of ipc_server
832 //------------------------------------------------------------------------
833 ipc_server::ipc_server(tbb_client& client) :
834     my_client( client ),
835     my_stack_size( client.min_stack_size() ),
836     my_thread_array(nullptr),
837     my_join_workers(false),
838     my_waker(nullptr),
839     my_stopper(nullptr)
840 {
841     my_ref_count = 1;
842     my_slack = 0;
843 #if TBB_USE_ASSERT
844     my_net_slack_requests = 0;
845 #endif /* TBB_USE_ASSERT */
846     my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME);
847     if( my_n_thread==0 ) {
848         my_n_thread = tbb::detail::r1::AvailableHwConcurrency();
849         __TBB_ASSERT( my_n_thread>0, nullptr );
850     }
851 
852     my_asleep_list_root = nullptr;
853     my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread );
854     for( size_t i=0; i<my_n_thread; ++i ) {
855         ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i );
856         t->my_next = my_asleep_list_root;
857         my_asleep_list_root = t;
858     }
859 
860     my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1);
861     new( my_waker ) ipc_waker( *this, client, my_n_thread );
862 
863     my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1);
864     new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 );
865 
866     char* active_sem_name = get_active_sem_name();
867     my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 );
868     __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
869     delete[] active_sem_name;
870 
871     char* stop_sem_name = get_stop_sem_name();
872     my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 );
873     __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" );
874     delete[] stop_sem_name;
875 }
876 
877 ipc_server::~ipc_server() {
878     __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr );
879 
880     for( size_t i=my_n_thread; i--; )
881         my_thread_array[i].~padded_ipc_worker();
882     tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread );
883     tbb::detail::d0::poison_pointer( my_thread_array );
884 
885     my_waker->~ipc_waker();
886     tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 );
887     tbb::detail::d0::poison_pointer( my_waker );
888 
889     my_stopper->~ipc_stopper();
890     tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 );
891     tbb::detail::d0::poison_pointer( my_stopper );
892 
893     sem_close( my_active_sem );
894     sem_close( my_stop_sem );
895 }
896 
897 inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) {
898     asleep_list_mutex_type::scoped_lock lock;
899     if( !lock.try_acquire( my_asleep_list_mutex ) )
900         return false;
901     // Contribute to slack under lock so that if another takes that unit of slack,
902     // it sees us sleeping on the list and wakes us up.
903     int k = ++my_slack;
904     if( k<=0 ) {
905         t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
906         my_asleep_list_root.store(&t, std::memory_order_relaxed);
907         return true;
908     } else {
909         --my_slack;
910         return false;
911     }
912 }
913 
914 inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) {
915     asleep_list_mutex_type::scoped_lock lock;
916     if( !lock.try_acquire( my_asleep_list_mutex ) )
917         return false;
918     // Contribute to slack under lock so that if another takes that unit of slack,
919     // it sees us sleeping on the list and wakes us up.
920     ++my_slack;
921     t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
922     my_asleep_list_root.store(&t, std::memory_order_relaxed);
923     return true;
924 }
925 
926 inline bool ipc_server::wait_active_thread() {
927     if( sem_wait( my_active_sem ) == 0 ) {
928         ++my_global_thread_count;
929         return true;
930     }
931     return false;
932 }
933 
934 inline bool ipc_server::try_get_active_thread() {
935     if( sem_trywait( my_active_sem ) == 0 ) {
936         ++my_global_thread_count;
937         return true;
938     }
939     return false;
940 }
941 
942 inline void ipc_server::release_active_thread() {
943     release_thread_sem( my_active_sem );
944 }
945 
946 inline bool ipc_server::wait_stop_thread() {
947     struct timespec ts;
948     if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) {
949         ts.tv_sec++;
950         if( sem_timedwait( my_stop_sem, &ts )==0 ) {
951             return true;
952         }
953     }
954     return false;
955 }
956 
957 inline void ipc_server::add_stop_thread() {
958     sem_post( my_stop_sem );
959 }
960 
961 void ipc_server::wake_some( int additional_slack, int active_threads ) {
962     __TBB_ASSERT( additional_slack>=0, nullptr );
963     ipc_worker* wakee[2];
964     ipc_worker **w = wakee;
965     {
966         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
967         while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) {
968             if( additional_slack>0 ) {
969                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
970                     break;
971                 --additional_slack;
972             } else {
973                 // Chain reaction; Try to claim unit of slack
974                 int old;
975                 do {
976                     old = my_slack.load(std::memory_order_relaxed);
977                     if( old<=0 ) goto done;
978                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
979             }
980             // Pop sleeping worker to combine with claimed unit of slack
981             my_asleep_list_root.store(
982                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
983                 std::memory_order_relaxed
984             );
985             --active_threads;
986         }
987         if( additional_slack ) {
988             // Contribute our unused slack to my_slack.
989             my_slack += additional_slack;
990         }
991     }
992 done:
993     while( w>wakee ) {
994         if( !(*--w)->wake_or_launch() ) {
995             add_stop_thread();
996             do {
997             } while( !try_insert_in_asleep_list_forced(**w) );
998             release_active_thread();
999         }
1000     }
1001     while( active_threads ) {
1002         release_active_thread();
1003         --active_threads;
1004     }
1005 }
1006 
1007 void ipc_server::wake_one_forced( int additional_slack ) {
1008     __TBB_ASSERT( additional_slack>=0, nullptr );
1009     ipc_worker* wakee[1];
1010     ipc_worker **w = wakee;
1011     {
1012         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1013         while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) {
1014             if( additional_slack>0 ) {
1015                 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
1016                     break;
1017                 --additional_slack;
1018             } else {
1019                 // Chain reaction; Try to claim unit of slack
1020                 int old;
1021                 do {
1022                     old = my_slack.load(std::memory_order_relaxed);
1023                     if( old<=0 ) goto done;
1024                 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
1025             }
1026             // Pop sleeping worker to combine with claimed unit of slack
1027             my_asleep_list_root.store(
1028                 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
1029                 std::memory_order_relaxed);
1030         }
1031         if( additional_slack ) {
1032             // Contribute our unused slack to my_slack.
1033             my_slack += additional_slack;
1034         }
1035     }
1036 done:
1037     while( w>wakee ) {
1038         if( !(*--w)->wake_or_launch() ) {
1039             add_stop_thread();
1040             do {
1041             } while( !try_insert_in_asleep_list_forced(**w) );
1042         }
1043     }
1044 }
1045 
1046 bool ipc_server::stop_one() {
1047     ipc_worker* current = nullptr;
1048     ipc_worker* next = nullptr;
1049     {
1050         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1051         if( my_asleep_list_root.load(std::memory_order_relaxed) ) {
1052             current = my_asleep_list_root.load(std::memory_order_relaxed);
1053             if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1054                 next = current->my_next;
1055                 while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1056                     current = next;
1057                     next = current->my_next;
1058                 }
1059                 current->start_stopping( my_join_workers );
1060                 return true;
1061             }
1062         }
1063     }
1064     return false;
1065 }
1066 
1067 void ipc_server::adjust_job_count_estimate( int delta ) {
1068 #if TBB_USE_ASSERT
1069     my_net_slack_requests+=delta;
1070 #endif /* TBB_USE_ASSERT */
1071     if( my_n_thread > 1 ) {
1072         if( delta<0 ) {
1073             my_slack+=delta;
1074         } else if( delta>0 ) {
1075             int active_threads = 0;
1076             if( try_get_active_thread() ) {
1077                 ++active_threads;
1078                 if( try_get_active_thread() ) {
1079                     ++active_threads;
1080                 }
1081             }
1082             wake_some( delta, active_threads );
1083 
1084             if( !my_waker->wake_or_launch() ) {
1085                 add_stop_thread();
1086             }
1087             if( !my_stopper->wake_or_launch() ) {
1088                 add_stop_thread();
1089             }
1090         }
1091     } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one
1092         if( delta<0 ) {
1093             my_slack += delta;
1094         } else {
1095             wake_one_forced( delta );
1096         }
1097     }
1098 }
1099 
1100 //------------------------------------------------------------------------
1101 // RML factory methods
1102 //------------------------------------------------------------------------
1103 
1104 #if USE_PTHREAD
1105 
1106 static tbb_client* my_global_client = nullptr;
1107 static tbb_server* my_global_server = nullptr;
1108 
1109 void rml_atexit() {
1110     release_resources();
1111 }
1112 
1113 void rml_atfork_child() {
1114     if( my_global_server!=nullptr && my_global_client!=nullptr ) {
1115         ipc_server* server = static_cast<ipc_server*>( my_global_server );
1116         server->~ipc_server();
1117         // memset( server, 0, sizeof(ipc_server) );
1118         new( server ) ipc_server( *my_global_client );
1119         pthread_atfork( nullptr, nullptr, rml_atfork_child );
1120         atexit( rml_atexit );
1121     }
1122 }
1123 
1124 #endif /* USE_PTHREAD */
1125 
1126 extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) {
1127     server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client);
1128 #if USE_PTHREAD
1129     my_global_client = &client;
1130     my_global_server = server;
1131     pthread_atfork( nullptr, nullptr, rml_atfork_child );
1132     atexit( rml_atexit );
1133 #endif /* USE_PTHREAD */
1134     if( getenv( "RML_DEBUG" ) ) {
1135         runtime_warning("IPC server is started");
1136     }
1137     return tbb_factory::st_success;
1138 }
1139 
1140 extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) {
1141 }
1142 
1143 } // namespace rml
1144 } // namespace detail
1145 
1146 } // namespace tbb
1147