1 /*
2 Copyright (c) 2017-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include <atomic>
18 #include <cstring>
19 #include <cstdlib>
20
21 #include "../../src/tbb/rml_tbb.h"
22 #include "../../src/tbb/rml_thread_monitor.h"
23 #include "../../src/tbb/scheduler_common.h"
24 #include "../../src/tbb/governor.h"
25 #include "../../src/tbb/misc.h"
26 #include "tbb/cache_aligned_allocator.h"
27
28 #include "ipc_utils.h"
29
30 #include <fcntl.h>
31 #include <stdlib.h>
32
33 namespace rml {
34 namespace internal {
35
36 static const char* IPC_ENABLE_VAR_NAME = "IPC_ENABLE";
37
38 typedef versioned_object::version_type version_type;
39
__RML_open_factory(factory & f,version_type &,version_type)40 extern "C" factory::status_type __RML_open_factory(factory& f, version_type& /*server_version*/, version_type /*client_version*/) {
41 if( !tbb::internal::rml::get_enable_flag( IPC_ENABLE_VAR_NAME ) ) {
42 return factory::st_incompatible;
43 }
44
45 // Hack to keep this library from being closed
46 static std::atomic<bool> one_time_flag{false};
47 bool expected = false;
48
49 if( one_time_flag.compare_exchange_strong(expected, true) ) {
50 __TBB_ASSERT( (size_t)f.library_handle!=factory::c_dont_unload, nullptr );
51 #if _WIN32||_WIN64
52 f.library_handle = reinterpret_cast<HMODULE>(factory::c_dont_unload);
53 #else
54 f.library_handle = reinterpret_cast<void*>(factory::c_dont_unload);
55 #endif
56 }
57 // End of hack
58
59 return factory::st_success;
60 }
61
__RML_close_factory(factory &)62 extern "C" void __RML_close_factory(factory& /*f*/) {
63 }
64
65 class ipc_thread_monitor : public tbb::detail::r1::rml::internal::thread_monitor {
66 public:
ipc_thread_monitor()67 ipc_thread_monitor() : thread_monitor() {}
68
69 #if USE_WINTHREAD
70 #elif USE_PTHREAD
71 static handle_type launch(thread_routine_type thread_routine, void* arg, size_t stack_size);
72 #endif
73 };
74
75 #if USE_WINTHREAD
76 #elif USE_PTHREAD
launch(void * (* thread_routine)(void *),void * arg,size_t stack_size)77 inline ipc_thread_monitor::handle_type ipc_thread_monitor::launch(void* (*thread_routine)(void*), void* arg, size_t stack_size) {
78 pthread_attr_t s;
79 if( pthread_attr_init( &s ) ) return 0;
80 if( stack_size>0 ) {
81 if( pthread_attr_setstacksize( &s, stack_size ) ) return 0;
82 }
83 pthread_t handle;
84 if( pthread_create( &handle, &s, thread_routine, arg ) ) return 0;
85 if( pthread_attr_destroy( &s ) ) return 0;
86 return handle;
87 }
88 #endif
89
90 }} // rml::internal
91
92 using rml::internal::ipc_thread_monitor;
93 using tbb::internal::rml::get_shared_name;
94
95 namespace tbb {
96 namespace detail {
97
98 namespace r1 {
terminate_on_exception()99 bool terminate_on_exception() {
100 return false;
101 }
102 }
103
104 namespace rml {
105
106 typedef ipc_thread_monitor::handle_type thread_handle;
107
108 class ipc_server;
109
110 static const char* IPC_MAX_THREADS_VAR_NAME = "MAX_THREADS";
111 static const char* IPC_ACTIVE_SEM_PREFIX = "/__IPC_active";
112 static const char* IPC_STOP_SEM_PREFIX = "/__IPC_stop";
113 static const char* IPC_ACTIVE_SEM_VAR_NAME = "IPC_ACTIVE_SEMAPHORE";
114 static const char* IPC_STOP_SEM_VAR_NAME = "IPC_STOP_SEMAPHORE";
115 static const mode_t IPC_SEM_MODE = 0660;
116
117 static std::atomic<int> my_global_thread_count;
118 using tbb_client = tbb::detail::r1::rml::tbb_client;
119 using tbb_server = tbb::detail::r1::rml::tbb_server;
120 using tbb_factory = tbb::detail::r1::rml::tbb_factory;
121
122 using tbb::detail::r1::runtime_warning;
123
get_sem_name(const char * name,const char * prefix)124 char* get_sem_name(const char* name, const char* prefix) {
125 __TBB_ASSERT(name != nullptr, nullptr);
126 __TBB_ASSERT(prefix != nullptr, nullptr);
127 char* value = std::getenv(name);
128 std::size_t len = value == nullptr ? 0 : std::strlen(value);
129 if (len > 0) {
130 // TODO: consider returning the original string instead of the copied string.
131 char* sem_name = new char[len + 1];
132 __TBB_ASSERT(sem_name != nullptr, nullptr);
133 std::strncpy(sem_name, value, len+1);
134 __TBB_ASSERT(sem_name[len] == 0, nullptr);
135 return sem_name;
136 } else {
137 return get_shared_name(prefix);
138 }
139 }
140
get_active_sem_name()141 char* get_active_sem_name() {
142 return get_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
143 }
144
get_stop_sem_name()145 char* get_stop_sem_name() {
146 return get_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
147 }
148
release_thread_sem(sem_t * my_sem)149 static void release_thread_sem(sem_t* my_sem) {
150 int old = my_global_thread_count.load(std::memory_order_relaxed);
151 do {
152 if( old<=0 ) return;
153 } while( !my_global_thread_count.compare_exchange_strong(old, old-1) );
154 if( old>0 ) {
155 sem_post( my_sem );
156 }
157 }
158
set_sem_name(const char * name,const char * prefix)159 void set_sem_name(const char* name, const char* prefix) {
160 __TBB_ASSERT(name != nullptr, nullptr);
161 __TBB_ASSERT(prefix != nullptr, nullptr);
162 const char* postfix = "_XXXXXX";
163 std::size_t plen = std::strlen(prefix);
164 std::size_t xlen = std::strlen(postfix);
165 char* templ = new char[plen + xlen + 1];
166 __TBB_ASSERT(templ != nullptr, nullptr);
167 strncpy(templ, prefix, plen+1);
168 __TBB_ASSERT(templ[plen] == 0, nullptr);
169 strncat(templ, postfix, xlen + 1);
170 __TBB_ASSERT(std::strlen(templ) == plen + xlen + 1, nullptr);
171 // TODO: consider using mkstemp instead of mktemp.
172 char* sem_name = mktemp(templ);
173 if (sem_name != nullptr) {
174 int status = setenv(name, sem_name, /*overwrite*/ 1);
175 __TBB_ASSERT_EX(status == 0, nullptr);
176 }
177 delete[] templ;
178 }
179
set_active_sem_name()180 extern "C" void set_active_sem_name() {
181 set_sem_name(IPC_ACTIVE_SEM_VAR_NAME, IPC_ACTIVE_SEM_PREFIX);
182 }
183
set_stop_sem_name()184 extern "C" void set_stop_sem_name() {
185 set_sem_name(IPC_STOP_SEM_VAR_NAME, IPC_STOP_SEM_PREFIX);
186 }
187
release_resources()188 extern "C" void release_resources() {
189 if( my_global_thread_count.load(std::memory_order_acquire)!=0 ) {
190 char* active_sem_name = get_active_sem_name();
191 sem_t* my_active_sem = sem_open( active_sem_name, O_CREAT );
192 __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
193 delete[] active_sem_name;
194
195 do {
196 release_thread_sem( my_active_sem );
197 } while( my_global_thread_count.load(std::memory_order_acquire)!=0 );
198 }
199 }
200
release_semaphores()201 extern "C" void release_semaphores() {
202 int status = 0;
203 char* sem_name = nullptr;
204
205 sem_name = get_active_sem_name();
206 if( sem_name==nullptr ) {
207 runtime_warning("Can not get RML semaphore name");
208 return;
209 }
210 status = sem_unlink( sem_name );
211 if( status!=0 ) {
212 if( errno==ENOENT ) {
213 /* There is no semaphore with the given name, nothing to do */
214 } else {
215 runtime_warning("Can not release RML semaphore");
216 return;
217 }
218 }
219 delete[] sem_name;
220
221 sem_name = get_stop_sem_name();
222 if( sem_name==nullptr ) {
223 runtime_warning( "Can not get RML semaphore name" );
224 return;
225 }
226 status = sem_unlink( sem_name );
227 if( status!=0 ) {
228 if( errno==ENOENT ) {
229 /* There is no semaphore with the given name, nothing to do */
230 } else {
231 runtime_warning("Can not release RML semaphore");
232 return;
233 }
234 }
235 delete[] sem_name;
236 }
237
238 class ipc_worker: no_copy {
239 protected:
240 //! State in finite-state machine that controls the worker.
241 /** State diagram:
242 /----------stop---\
243 | ^ |
244 V | |
245 init --> starting --> normal |
246 | | | |
247 | V | |
248 \------> quit <-------/<----/
249 */
250 enum state_t {
251 //! *this is initialized
252 st_init,
253 //! *this has associated thread that is starting up.
254 st_starting,
255 //! Associated thread is doing normal life sequence.
256 st_normal,
257 //! Associated thread is stopped but can be started again.
258 st_stop,
259 //! Associated thread has ended normal life sequence and promises to never touch *this again.
260 st_quit
261 };
262 std::atomic<state_t> my_state;
263
264 //! Associated server
265 ipc_server& my_server;
266
267 //! Associated client
268 tbb_client& my_client;
269
270 //! index used for avoiding the 64K aliasing problem
271 const size_t my_index;
272
273 //! Monitor for sleeping when there is no work to do.
274 /** The invariant that holds for sleeping workers is:
275 "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
276 ipc_thread_monitor my_thread_monitor;
277
278 //! Handle of the OS thread associated with this worker
279 thread_handle my_handle;
280
281 //! Link for list of workers that are sleeping or have no associated thread.
282 ipc_worker* my_next;
283
284 friend class ipc_server;
285
286 //! Actions executed by the associated thread
287 void run();
288
289 //! Wake up associated thread (or launch a thread if there is none)
290 bool wake_or_launch();
291
292 //! Called by a thread (usually not the associated thread) to commence termination.
293 void start_shutdown(bool join);
294
295 //! Called by a thread (usually not the associated thread) to commence stopping.
296 void start_stopping(bool join);
297
298 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
299
300 static void release_handle(thread_handle my_handle, bool join);
301
302 protected:
ipc_worker(ipc_server & server,tbb_client & client,const size_t i)303 ipc_worker(ipc_server& server, tbb_client& client, const size_t i) :
304 my_server(server),
305 my_client(client),
306 my_index(i)
307 {
308 my_state = st_init;
309 }
310 };
311
312 //TODO: cannot bind to nfs_size from allocator.cpp since nfs_size is constexpr defined in another translation unit
313 constexpr static size_t cache_line_sz = 128;
314
315 #if _MSC_VER && !defined(__INTEL_COMPILER)
316 // Suppress overzealous compiler warnings about uninstantiable class
317 #pragma warning(push)
318 #pragma warning(disable:4510 4610)
319 #endif
320 class padded_ipc_worker: public ipc_worker {
321 char pad[cache_line_sz - sizeof(ipc_worker)%cache_line_sz];
322 public:
padded_ipc_worker(ipc_server & server,tbb_client & client,const size_t i)323 padded_ipc_worker(ipc_server& server, tbb_client& client, const size_t i)
324 : ipc_worker( server,client,i ) { suppress_unused_warning(pad); }
325 };
326 #if _MSC_VER && !defined(__INTEL_COMPILER)
327 #pragma warning(pop)
328 #endif
329
330 class ipc_waker : public padded_ipc_worker {
331 private:
332 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
333 void run();
334 bool wake_or_launch();
335
336 friend class ipc_server;
337
338 public:
ipc_waker(ipc_server & server,tbb_client & client,const size_t i)339 ipc_waker(ipc_server& server, tbb_client& client, const size_t i)
340 : padded_ipc_worker( server, client, i ) {}
341 };
342
343 class ipc_stopper : public padded_ipc_worker {
344 private:
345 static __RML_DECL_THREAD_ROUTINE thread_routine(void* arg);
346 void run();
347 bool wake_or_launch();
348
349 friend class ipc_server;
350
351 public:
ipc_stopper(ipc_server & server,tbb_client & client,const size_t i)352 ipc_stopper(ipc_server& server, tbb_client& client, const size_t i)
353 : padded_ipc_worker( server, client, i ) {}
354 };
355
356 class ipc_server: public tbb_server, no_copy {
357 private:
358 tbb_client& my_client;
359 //! Maximum number of threads to be created.
360 /** Threads are created lazily, so maximum might not actually be reached. */
361 tbb_client::size_type my_n_thread;
362
363 //! Stack size for each thread. */
364 const size_t my_stack_size;
365
366 //! Number of jobs that could use their associated thread minus number of active threads.
367 /** If negative, indicates oversubscription.
368 If positive, indicates that more threads should run.
369 Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
370 because raising it impacts the invariant for sleeping threads. */
371 std::atomic<int> my_slack;
372
373 //! Counter used to determine when to delete this.
374 std::atomic<int> my_ref_count;
375
376 padded_ipc_worker* my_thread_array;
377
378 //! List of workers that are asleep or committed to sleeping until notified by another thread.
379 std::atomic<ipc_worker*> my_asleep_list_root;
380
381 //! Protects my_asleep_list_root
382 typedef scheduler_mutex_type asleep_list_mutex_type;
383 asleep_list_mutex_type my_asleep_list_mutex;
384
385 //! Should server wait workers while terminate
386 const bool my_join_workers;
387
388 //! Service thread for waking of workers
389 ipc_waker* my_waker;
390
391 //! Service thread to stop threads
392 ipc_stopper* my_stopper;
393
394 //! Semaphore to account active threads
395 sem_t* my_active_sem;
396
397 //! Semaphore to account stop threads
398 sem_t* my_stop_sem;
399
400 #if TBB_USE_ASSERT
401 std::atomic<int> my_net_slack_requests;
402 #endif /* TBB_USE_ASSERT */
403
404 //! Wake up to two sleeping workers, if there are any sleeping.
405 /** The call is used to propagate a chain reaction where each thread wakes up two threads,
406 which in turn each wake up two threads, etc. */
propagate_chain_reaction()407 void propagate_chain_reaction() {
408 // First test of a double-check idiom. Second test is inside wake_some(0).
409 if( my_slack.load(std::memory_order_acquire)>0 ) {
410 int active_threads = 0;
411 if( try_get_active_thread() ) {
412 ++active_threads;
413 if( try_get_active_thread() ) {
414 ++active_threads;
415 }
416 wake_some( 0, active_threads );
417 }
418 }
419 }
420
421 //! Try to add t to list of sleeping workers
422 bool try_insert_in_asleep_list(ipc_worker& t);
423
424 //! Try to add t to list of sleeping workers even if there is some work to do
425 bool try_insert_in_asleep_list_forced(ipc_worker& t);
426
427 //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
428 void wake_some(int additional_slack, int active_threads);
429
430 //! Equivalent of adding additional_slack to my_slack and waking up to 1 thread if my_slack permits.
431 void wake_one_forced(int additional_slack);
432
433 //! Stop one thread from asleep list
434 bool stop_one();
435
436 //! Wait for active thread
437 bool wait_active_thread();
438
439 //! Try to get active thread
440 bool try_get_active_thread();
441
442 //! Release active thread
443 void release_active_thread();
444
445 //! Wait for thread to stop
446 bool wait_stop_thread();
447
448 //! Add thread to stop list
449 void add_stop_thread();
450
remove_server_ref()451 void remove_server_ref() {
452 if( --my_ref_count==0 ) {
453 my_client.acknowledge_close_connection();
454 this->~ipc_server();
455 tbb::cache_aligned_allocator<ipc_server>().deallocate( this, 1 );
456 }
457 }
458
459 friend class ipc_worker;
460 friend class ipc_waker;
461 friend class ipc_stopper;
462 public:
463 ipc_server(tbb_client& client);
464 virtual ~ipc_server();
465
version() const466 version_type version() const override {
467 return 0;
468 }
469
request_close_connection(bool)470 void request_close_connection(bool /*exiting*/) override {
471 my_waker->start_shutdown(false);
472 my_stopper->start_shutdown(false);
473 for( size_t i=0; i<my_n_thread; ++i )
474 my_thread_array[i].start_shutdown( my_join_workers );
475 remove_server_ref();
476 }
477
yield()478 void yield() override {d0::yield();}
479
independent_thread_number_changed(int)480 void independent_thread_number_changed(int) override { __TBB_ASSERT( false, nullptr ); }
481
default_concurrency() const482 unsigned default_concurrency() const override { return my_n_thread - 1; }
483
484 void adjust_job_count_estimate(int delta) override;
485
486 #if _WIN32||_WIN64
register_external_thread(::rml::server::execution_resource_t &)487 void register_external_thread(::rml::server::execution_resource_t&) override {}
unregister_external_thread(::rml::server::execution_resource_t)488 void unregister_external_thread(::rml::server::execution_resource_t) override {}
489 #endif /* _WIN32||_WIN64 */
490 };
491
492 //------------------------------------------------------------------------
493 // Methods of ipc_worker
494 //------------------------------------------------------------------------
495 #if _MSC_VER && !defined(__INTEL_COMPILER)
496 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
497 #pragma warning(push)
498 #pragma warning(disable:4189)
499 #endif
500 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
501 // ensure that stack is properly aligned
502 __attribute__((force_align_arg_pointer))
503 #endif
thread_routine(void * arg)504 __RML_DECL_THREAD_ROUTINE ipc_worker::thread_routine(void* arg) {
505 ipc_worker* self = static_cast<ipc_worker*>(arg);
506 AVOID_64K_ALIASING( self->my_index );
507 self->run();
508 return 0;
509 }
510 #if _MSC_VER && !defined(__INTEL_COMPILER)
511 #pragma warning(pop)
512 #endif
513
release_handle(thread_handle handle,bool join)514 void ipc_worker::release_handle(thread_handle handle, bool join) {
515 if( join )
516 ipc_thread_monitor::join( handle );
517 else
518 ipc_thread_monitor::detach_thread( handle );
519 }
520
start_shutdown(bool join)521 void ipc_worker::start_shutdown(bool join) {
522 state_t s = my_state.load(std::memory_order_relaxed);
523
524 do {
525 __TBB_ASSERT( s!=st_quit, nullptr );
526 } while( !my_state.compare_exchange_strong( s, st_quit ) );
527 if( s==st_normal || s==st_starting ) {
528 // May have invalidated invariant for sleeping, so wake up the thread.
529 // Note that the notify() here occurs without maintaining invariants for my_slack.
530 // It does not matter, because my_state==st_quit overrides checking of my_slack.
531 my_thread_monitor.notify();
532 // Do not need release handle in st_init state,
533 // because in this case the thread wasn't started yet.
534 // For st_starting release is done at launch site.
535 if( s==st_normal )
536 release_handle( my_handle, join );
537 }
538 }
539
start_stopping(bool join)540 void ipc_worker::start_stopping(bool join) {
541 state_t s = my_state.load(std::memory_order_relaxed);
542
543 while( !my_state.compare_exchange_strong( s, st_quit ) ) {};
544 if( s==st_normal || s==st_starting ) {
545 // May have invalidated invariant for sleeping, so wake up the thread.
546 // Note that the notify() here occurs without maintaining invariants for my_slack.
547 // It does not matter, because my_state==st_quit overrides checking of my_slack.
548 my_thread_monitor.notify();
549 // Do not need release handle in st_init state,
550 // because in this case the thread wasn't started yet.
551 // For st_starting release is done at launch site.
552 if( s==st_normal )
553 release_handle( my_handle, join );
554 }
555 }
556
run()557 void ipc_worker::run() {
558 my_server.propagate_chain_reaction();
559
560 // Transiting to st_normal here would require setting my_handle,
561 // which would create race with the launching thread and
562 // complications in handle management on Windows.
563
564 ::rml::job& j = *my_client.create_one_job();
565 state_t state = my_state.load(std::memory_order_acquire);
566 while( state!=st_quit && state!=st_stop ) {
567 if( my_server.my_slack>=0 ) {
568 my_client.process(j);
569 } else {
570 // Check/set the invariant for sleeping
571 state = my_state.load(std::memory_order_seq_cst);
572 if( state!=st_quit && state!=st_stop && my_server.try_insert_in_asleep_list(*this) ) {
573 if( my_server.my_n_thread > 1 ) my_server.release_active_thread();
574 my_thread_monitor.wait();
575 my_server.propagate_chain_reaction();
576 }
577 }
578 // memory_order_seq_cst to be strictly ordered after thread_monitor::wait
579 state = my_state.load(std::memory_order_seq_cst);
580 }
581 my_client.cleanup(j);
582
583 my_server.remove_server_ref();
584 }
585
wake_or_launch()586 inline bool ipc_worker::wake_or_launch() {
587 state_t excepted_stop = st_stop, expected_init = st_init;
588 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( expected_init, st_starting ) ) ||
589 ( my_state.load(std::memory_order_acquire)==st_stop && my_state.compare_exchange_strong( excepted_stop, st_starting ) ) ) {
590 // after this point, remove_server_ref() must be done by created thread
591 #if USE_WINTHREAD
592 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
593 #elif USE_PTHREAD
594 {
595 affinity_helper fpa;
596 fpa.protect_affinity_mask( /*restore_process_mask=*/true );
597 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
598 if( my_handle == 0 ) {
599 // Unable to create new thread for process
600 // However, this is expected situation for the use cases of this coordination server
601 state_t s = st_starting;
602 my_state.compare_exchange_strong( s, st_init );
603 if (st_starting != s) {
604 // Do shutdown during startup. my_handle can't be released
605 // by start_shutdown, because my_handle value might be not set yet
606 // at time of transition from st_starting to st_quit.
607 __TBB_ASSERT( s==st_quit, nullptr );
608 release_handle( my_handle, my_server.my_join_workers );
609 }
610 return false;
611 } else {
612 my_server.my_ref_count++;
613 }
614 // Implicit destruction of fpa resets original affinity mask.
615 }
616 #endif /* USE_PTHREAD */
617 state_t s = st_starting;
618 my_state.compare_exchange_strong( s, st_normal );
619 if( st_starting!=s ) {
620 // Do shutdown during startup. my_handle can't be released
621 // by start_shutdown, because my_handle value might be not set yet
622 // at time of transition from st_starting to st_quit.
623 __TBB_ASSERT( s==st_quit, nullptr );
624 release_handle( my_handle, my_server.my_join_workers );
625 }
626 }
627 else {
628 my_thread_monitor.notify();
629 }
630
631 return true;
632 }
633
634 //------------------------------------------------------------------------
635 // Methods of ipc_waker
636 //------------------------------------------------------------------------
637 #if _MSC_VER && !defined(__INTEL_COMPILER)
638 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
639 #pragma warning(push)
640 #pragma warning(disable:4189)
641 #endif
642 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
643 // ensure that stack is properly aligned
644 __attribute__((force_align_arg_pointer))
645 #endif
thread_routine(void * arg)646 __RML_DECL_THREAD_ROUTINE ipc_waker::thread_routine(void* arg) {
647 ipc_waker* self = static_cast<ipc_waker*>(arg);
648 AVOID_64K_ALIASING( self->my_index );
649 self->run();
650 return 0;
651 }
652 #if _MSC_VER && !defined(__INTEL_COMPILER)
653 #pragma warning(pop)
654 #endif
655
run()656 void ipc_waker::run() {
657 // Transiting to st_normal here would require setting my_handle,
658 // which would create race with the launching thread and
659 // complications in handle management on Windows.
660
661 // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
662 while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
663 bool have_to_sleep = false;
664 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
665 if( my_server.wait_active_thread() ) {
666 if( my_server.my_slack.load(std::memory_order_acquire)>0 ) {
667 my_server.wake_some( 0, 1 );
668 } else {
669 my_server.release_active_thread();
670 have_to_sleep = true;
671 }
672 }
673 } else {
674 have_to_sleep = true;
675 }
676 if( have_to_sleep ) {
677 // Check/set the invariant for sleeping
678 if( my_server.my_slack.load(std::memory_order_acquire)<0 ) {
679 my_thread_monitor.wait();
680 }
681 }
682 }
683
684 my_server.remove_server_ref();
685 }
686
wake_or_launch()687 inline bool ipc_waker::wake_or_launch() {
688 state_t excepted = st_init;
689 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
690 // after this point, remove_server_ref() must be done by created thread
691 #if USE_WINTHREAD
692 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
693 #elif USE_PTHREAD
694 {
695 affinity_helper fpa;
696 fpa.protect_affinity_mask( /*restore_process_mask=*/true );
697 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
698 if( my_handle == 0 ) {
699 runtime_warning( "Unable to create new thread for process %d", getpid() );
700 state_t s = st_starting;
701 my_state.compare_exchange_strong(s, st_init);
702 if (st_starting != s) {
703 // Do shutdown during startup. my_handle can't be released
704 // by start_shutdown, because my_handle value might be not set yet
705 // at time of transition from st_starting to st_quit.
706 __TBB_ASSERT( s==st_quit, nullptr );
707 release_handle( my_handle, my_server.my_join_workers );
708 }
709 return false;
710 } else {
711 my_server.my_ref_count++;
712 }
713 // Implicit destruction of fpa resets original affinity mask.
714 }
715 #endif /* USE_PTHREAD */
716 state_t s = st_starting;
717 my_state.compare_exchange_strong(s, st_normal);
718 if( st_starting!=s ) {
719 // Do shutdown during startup. my_handle can't be released
720 // by start_shutdown, because my_handle value might be not set yet
721 // at time of transition from st_starting to st_quit.
722 __TBB_ASSERT( s==st_quit, nullptr );
723 release_handle( my_handle, my_server.my_join_workers );
724 }
725 }
726 else {
727 my_thread_monitor.notify();
728 }
729
730 return true;
731 }
732
733 //------------------------------------------------------------------------
734 // Methods of ipc_stopper
735 //------------------------------------------------------------------------
736 #if _MSC_VER && !defined(__INTEL_COMPILER)
737 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
738 #pragma warning(push)
739 #pragma warning(disable:4189)
740 #endif
741 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
742 // ensure that stack is properly aligned
743 __attribute__((force_align_arg_pointer))
744 #endif
thread_routine(void * arg)745 __RML_DECL_THREAD_ROUTINE ipc_stopper::thread_routine(void* arg) {
746 ipc_stopper* self = static_cast<ipc_stopper*>(arg);
747 AVOID_64K_ALIASING( self->my_index );
748 self->run();
749 return 0;
750 }
751 #if _MSC_VER && !defined(__INTEL_COMPILER)
752 #pragma warning(pop)
753 #endif
754
run()755 void ipc_stopper::run() {
756 // Transiting to st_normal here would require setting my_handle,
757 // which would create race with the launching thread and
758 // complications in handle management on Windows.
759
760 while( my_state.load(std::memory_order_acquire)!=st_quit ) {
761 if( my_server.wait_stop_thread() ) {
762 if( my_state.load(std::memory_order_acquire)!=st_quit ) {
763 if( !my_server.stop_one() ) {
764 my_server.add_stop_thread();
765 tbb::detail::r1::prolonged_pause();
766 }
767 }
768 }
769 }
770
771 my_server.remove_server_ref();
772 }
773
wake_or_launch()774 inline bool ipc_stopper::wake_or_launch() {
775 state_t excepted = st_init;
776 if( ( my_state.load(std::memory_order_acquire)==st_init && my_state.compare_exchange_strong( excepted, st_starting ) ) ) {
777 // after this point, remove_server_ref() must be done by created thread
778 #if USE_WINTHREAD
779 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
780 #elif USE_PTHREAD
781 {
782 affinity_helper fpa;
783 fpa.protect_affinity_mask( /*restore_process_mask=*/true );
784 my_handle = ipc_thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
785 if( my_handle == 0 ) {
786 runtime_warning( "Unable to create new thread for process %d", getpid() );
787 state_t s = st_starting;
788 my_state.compare_exchange_strong(s, st_init);
789 if (st_starting != s) {
790 // Do shutdown during startup. my_handle can't be released
791 // by start_shutdown, because my_handle value might be not set yet
792 // at time of transition from st_starting to st_quit.
793 __TBB_ASSERT( s==st_quit, nullptr );
794 release_handle( my_handle, my_server.my_join_workers );
795 }
796 return false;
797 } else {
798 my_server.my_ref_count++;
799 }
800 // Implicit destruction of fpa resets original affinity mask.
801 }
802 #endif /* USE_PTHREAD */
803 state_t s = st_starting;
804 my_state.compare_exchange_strong(s, st_normal);
805 if( st_starting!=s ) {
806 // Do shutdown during startup. my_handle can't be released
807 // by start_shutdown, because my_handle value might be not set yet
808 // at time of transition from st_starting to st_quit.
809 __TBB_ASSERT( s==st_quit, nullptr );
810 release_handle( my_handle, my_server.my_join_workers );
811 }
812 }
813 else {
814 my_thread_monitor.notify();
815 }
816
817 return true;
818 }
819
820 //------------------------------------------------------------------------
821 // Methods of ipc_server
822 //------------------------------------------------------------------------
ipc_server(tbb_client & client)823 ipc_server::ipc_server(tbb_client& client) :
824 my_client( client ),
825 my_stack_size( client.min_stack_size() ),
826 my_thread_array(nullptr),
827 my_join_workers(false),
828 my_waker(nullptr),
829 my_stopper(nullptr)
830 {
831 my_ref_count = 1;
832 my_slack = 0;
833 #if TBB_USE_ASSERT
834 my_net_slack_requests = 0;
835 #endif /* TBB_USE_ASSERT */
836 my_n_thread = tbb::internal::rml::get_num_threads(IPC_MAX_THREADS_VAR_NAME);
837 if( my_n_thread==0 ) {
838 my_n_thread = tbb::detail::r1::AvailableHwConcurrency();
839 __TBB_ASSERT( my_n_thread>0, nullptr );
840 }
841
842 my_asleep_list_root = nullptr;
843 my_thread_array = tbb::cache_aligned_allocator<padded_ipc_worker>().allocate( my_n_thread );
844 for( size_t i=0; i<my_n_thread; ++i ) {
845 ipc_worker* t = new( &my_thread_array[i] ) padded_ipc_worker( *this, client, i );
846 t->my_next = my_asleep_list_root;
847 my_asleep_list_root = t;
848 }
849
850 my_waker = tbb::cache_aligned_allocator<ipc_waker>().allocate(1);
851 new( my_waker ) ipc_waker( *this, client, my_n_thread );
852
853 my_stopper = tbb::cache_aligned_allocator<ipc_stopper>().allocate(1);
854 new( my_stopper ) ipc_stopper( *this, client, my_n_thread + 1 );
855
856 char* active_sem_name = get_active_sem_name();
857 my_active_sem = sem_open( active_sem_name, O_CREAT, IPC_SEM_MODE, my_n_thread - 1 );
858 __TBB_ASSERT( my_active_sem, "Unable to open active threads semaphore" );
859 delete[] active_sem_name;
860
861 char* stop_sem_name = get_stop_sem_name();
862 my_stop_sem = sem_open( stop_sem_name, O_CREAT, IPC_SEM_MODE, 0 );
863 __TBB_ASSERT( my_stop_sem, "Unable to open stop threads semaphore" );
864 delete[] stop_sem_name;
865 }
866
~ipc_server()867 ipc_server::~ipc_server() {
868 __TBB_ASSERT( my_net_slack_requests.load(std::memory_order_relaxed)==0, nullptr );
869
870 for( size_t i=my_n_thread; i--; )
871 my_thread_array[i].~padded_ipc_worker();
872 tbb::cache_aligned_allocator<padded_ipc_worker>().deallocate( my_thread_array, my_n_thread );
873 tbb::detail::d0::poison_pointer( my_thread_array );
874
875 my_waker->~ipc_waker();
876 tbb::cache_aligned_allocator<ipc_waker>().deallocate( my_waker, 1 );
877 tbb::detail::d0::poison_pointer( my_waker );
878
879 my_stopper->~ipc_stopper();
880 tbb::cache_aligned_allocator<ipc_stopper>().deallocate( my_stopper, 1 );
881 tbb::detail::d0::poison_pointer( my_stopper );
882
883 sem_close( my_active_sem );
884 sem_close( my_stop_sem );
885 }
886
try_insert_in_asleep_list(ipc_worker & t)887 inline bool ipc_server::try_insert_in_asleep_list(ipc_worker& t) {
888 asleep_list_mutex_type::scoped_lock lock;
889 if( !lock.try_acquire( my_asleep_list_mutex ) )
890 return false;
891 // Contribute to slack under lock so that if another takes that unit of slack,
892 // it sees us sleeping on the list and wakes us up.
893 int k = ++my_slack;
894 if( k<=0 ) {
895 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
896 my_asleep_list_root.store(&t, std::memory_order_relaxed);
897 return true;
898 } else {
899 --my_slack;
900 return false;
901 }
902 }
903
try_insert_in_asleep_list_forced(ipc_worker & t)904 inline bool ipc_server::try_insert_in_asleep_list_forced(ipc_worker& t) {
905 asleep_list_mutex_type::scoped_lock lock;
906 if( !lock.try_acquire( my_asleep_list_mutex ) )
907 return false;
908 // Contribute to slack under lock so that if another takes that unit of slack,
909 // it sees us sleeping on the list and wakes us up.
910 ++my_slack;
911 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
912 my_asleep_list_root.store(&t, std::memory_order_relaxed);
913 return true;
914 }
915
wait_active_thread()916 inline bool ipc_server::wait_active_thread() {
917 if( sem_wait( my_active_sem ) == 0 ) {
918 ++my_global_thread_count;
919 return true;
920 }
921 return false;
922 }
923
try_get_active_thread()924 inline bool ipc_server::try_get_active_thread() {
925 if( sem_trywait( my_active_sem ) == 0 ) {
926 ++my_global_thread_count;
927 return true;
928 }
929 return false;
930 }
931
release_active_thread()932 inline void ipc_server::release_active_thread() {
933 release_thread_sem( my_active_sem );
934 }
935
wait_stop_thread()936 inline bool ipc_server::wait_stop_thread() {
937 struct timespec ts;
938 if( clock_gettime( CLOCK_REALTIME, &ts )==0 ) {
939 ts.tv_sec++;
940 if( sem_timedwait( my_stop_sem, &ts )==0 ) {
941 return true;
942 }
943 }
944 return false;
945 }
946
add_stop_thread()947 inline void ipc_server::add_stop_thread() {
948 sem_post( my_stop_sem );
949 }
950
wake_some(int additional_slack,int active_threads)951 void ipc_server::wake_some( int additional_slack, int active_threads ) {
952 __TBB_ASSERT( additional_slack>=0, nullptr );
953 ipc_worker* wakee[2];
954 ipc_worker **w = wakee;
955 {
956 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
957 while( active_threads>0 && my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) {
958 if( additional_slack>0 ) {
959 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
960 break;
961 --additional_slack;
962 } else {
963 // Chain reaction; Try to claim unit of slack
964 int old;
965 do {
966 old = my_slack.load(std::memory_order_relaxed);
967 if( old<=0 ) goto done;
968 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
969 }
970 // Pop sleeping worker to combine with claimed unit of slack
971 my_asleep_list_root.store(
972 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
973 std::memory_order_relaxed
974 );
975 --active_threads;
976 }
977 if( additional_slack ) {
978 // Contribute our unused slack to my_slack.
979 my_slack += additional_slack;
980 }
981 }
982 done:
983 while( w>wakee ) {
984 if( !(*--w)->wake_or_launch() ) {
985 add_stop_thread();
986 do {
987 } while( !try_insert_in_asleep_list_forced(**w) );
988 release_active_thread();
989 }
990 }
991 while( active_threads ) {
992 release_active_thread();
993 --active_threads;
994 }
995 }
996
wake_one_forced(int additional_slack)997 void ipc_server::wake_one_forced( int additional_slack ) {
998 __TBB_ASSERT( additional_slack>=0, nullptr );
999 ipc_worker* wakee[1];
1000 ipc_worker **w = wakee;
1001 {
1002 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1003 while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+1 ) {
1004 if( additional_slack>0 ) {
1005 if( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) // additional demand does not exceed surplus supply
1006 break;
1007 --additional_slack;
1008 } else {
1009 // Chain reaction; Try to claim unit of slack
1010 int old;
1011 do {
1012 old = my_slack.load(std::memory_order_relaxed);
1013 if( old<=0 ) goto done;
1014 } while( !my_slack.compare_exchange_strong( old, old-1 ) );
1015 }
1016 // Pop sleeping worker to combine with claimed unit of slack
1017 my_asleep_list_root.store(
1018 (*w++ = my_asleep_list_root.load(std::memory_order_relaxed))->my_next,
1019 std::memory_order_relaxed);
1020 }
1021 if( additional_slack ) {
1022 // Contribute our unused slack to my_slack.
1023 my_slack += additional_slack;
1024 }
1025 }
1026 done:
1027 while( w>wakee ) {
1028 if( !(*--w)->wake_or_launch() ) {
1029 add_stop_thread();
1030 do {
1031 } while( !try_insert_in_asleep_list_forced(**w) );
1032 }
1033 }
1034 }
1035
stop_one()1036 bool ipc_server::stop_one() {
1037 ipc_worker* current = nullptr;
1038 ipc_worker* next = nullptr;
1039 {
1040 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
1041 if( my_asleep_list_root.load(std::memory_order_relaxed) ) {
1042 current = my_asleep_list_root.load(std::memory_order_relaxed);
1043 if( current->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1044 next = current->my_next;
1045 while( next!= nullptr && next->my_state.load(std::memory_order_relaxed)==ipc_worker::st_normal ) {
1046 current = next;
1047 next = current->my_next;
1048 }
1049 current->start_stopping( my_join_workers );
1050 return true;
1051 }
1052 }
1053 }
1054 return false;
1055 }
1056
adjust_job_count_estimate(int delta)1057 void ipc_server::adjust_job_count_estimate( int delta ) {
1058 #if TBB_USE_ASSERT
1059 my_net_slack_requests+=delta;
1060 #endif /* TBB_USE_ASSERT */
1061 if( my_n_thread > 1 ) {
1062 if( delta<0 ) {
1063 my_slack+=delta;
1064 } else if( delta>0 ) {
1065 int active_threads = 0;
1066 if( try_get_active_thread() ) {
1067 ++active_threads;
1068 if( try_get_active_thread() ) {
1069 ++active_threads;
1070 }
1071 }
1072 wake_some( delta, active_threads );
1073
1074 if( !my_waker->wake_or_launch() ) {
1075 add_stop_thread();
1076 }
1077 if( !my_stopper->wake_or_launch() ) {
1078 add_stop_thread();
1079 }
1080 }
1081 } else { // Corner case when RML shouldn't provide any worker thread but client has to have at least one
1082 if( delta<0 ) {
1083 my_slack += delta;
1084 } else {
1085 wake_one_forced( delta );
1086 }
1087 }
1088 }
1089
1090 //------------------------------------------------------------------------
1091 // RML factory methods
1092 //------------------------------------------------------------------------
1093
1094 #if USE_PTHREAD
1095
1096 static tbb_client* my_global_client = nullptr;
1097 static tbb_server* my_global_server = nullptr;
1098
rml_atexit()1099 void rml_atexit() {
1100 release_resources();
1101 }
1102
rml_atfork_child()1103 void rml_atfork_child() {
1104 if( my_global_server!=nullptr && my_global_client!=nullptr ) {
1105 ipc_server* server = static_cast<ipc_server*>( my_global_server );
1106 server->~ipc_server();
1107 // memset( server, 0, sizeof(ipc_server) );
1108 new( server ) ipc_server( *my_global_client );
1109 pthread_atfork( nullptr, nullptr, rml_atfork_child );
1110 atexit( rml_atexit );
1111 }
1112 }
1113
1114 #endif /* USE_PTHREAD */
1115
__TBB_make_rml_server(tbb_factory &,tbb_server * & server,tbb_client & client)1116 extern "C" tbb_factory::status_type __TBB_make_rml_server(tbb_factory& /*f*/, tbb_server*& server, tbb_client& client) {
1117 server = new( tbb::cache_aligned_allocator<ipc_server>().allocate(1) ) ipc_server(client);
1118 #if USE_PTHREAD
1119 my_global_client = &client;
1120 my_global_server = server;
1121 pthread_atfork( nullptr, nullptr, rml_atfork_child );
1122 atexit( rml_atexit );
1123 #endif /* USE_PTHREAD */
1124 if( getenv( "RML_DEBUG" ) ) {
1125 runtime_warning("IPC server is started");
1126 }
1127 return tbb_factory::st_success;
1128 }
1129
__TBB_call_with_my_server_info(::rml::server_info_callback_t,void *)1130 extern "C" void __TBB_call_with_my_server_info(::rml::server_info_callback_t /*cb*/, void* /*arg*/) {
1131 }
1132
1133 } // namespace rml
1134 } // namespace detail
1135
1136 } // namespace tbb
1137