xref: /oneTBB/src/tbb/rml_thread_monitor.h (revision 137c1a88)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // All platform-specific threading support is encapsulated here. */
18 
19 #ifndef __RML_thread_monitor_H
20 #define __RML_thread_monitor_H
21 
22 #if __TBB_USE_WINAPI
23 #include <windows.h>
24 #include <process.h>
25 #include <malloc.h> //_alloca
26 #include "misc.h" // support for processor groups
27 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
28 #include <thread>
29 #endif
30 #elif __TBB_USE_POSIX
31 #include <pthread.h>
32 #include <cstring>
33 #include <cstdlib>
34 #include <time.h>
35 #else
36 #error Unsupported platform
37 #endif
38 #include <cstdio>
39 
40 #include "oneapi/tbb/detail/_template_helpers.h"
41 
42 #include "itt_notify.h"
43 #include "semaphore.h"
44 
45 // All platform-specific threading support is in this header.
46 
47 #if (_WIN32||_WIN64)&&!__TBB_ipf
48 // Deal with 64K aliasing.  The formula for "offset" is a Fibonacci hash function,
49 // which has the desirable feature of spreading out the offsets fairly evenly
50 // without knowing the total number of offsets, and furthermore unlikely to
51 // accidentally cancel out other 64K aliasing schemes that Microsoft might implement later.
52 // See Knuth Vol 3. "Theorem S" for details on Fibonacci hashing.
53 // The second statement is really does need "volatile", otherwise the compiler might remove the _alloca.
54 #define AVOID_64K_ALIASING(idx)                       \
55     std::size_t offset = (idx+1) * 40503U % (1U<<16);      \
56     void* volatile sink_for_alloca = _alloca(offset); \
57     __TBB_ASSERT_EX(sink_for_alloca, "_alloca failed");
58 #else
59 // Linux thread allocators avoid 64K aliasing.
60 #define AVOID_64K_ALIASING(idx) tbb::detail::suppress_unused_warning(idx)
61 #endif /* _WIN32||_WIN64 */
62 
63 namespace tbb {
64 namespace detail {
65 namespace r1 {
66 
67 // Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info
68 void handle_perror(int error_code, const char* aux_info);
69 
70 namespace rml {
71 namespace internal {
72 
73 #if __TBB_USE_ITT_NOTIFY
74 static const ::tbb::detail::r1::tchar *SyncType_RML = _T("%Constant");
75 static const ::tbb::detail::r1::tchar *SyncObj_ThreadMonitor = _T("RML Thr Monitor");
76 #endif /* __TBB_USE_ITT_NOTIFY */
77 
78 //! Monitor with limited two-phase commit form of wait.
79 /** At most one thread should wait on an instance at a time. */
80 class thread_monitor {
81 public:
thread_monitor()82     thread_monitor() {
83         ITT_SYNC_CREATE(&my_sema, SyncType_RML, SyncObj_ThreadMonitor);
84     }
~thread_monitor()85     ~thread_monitor() {}
86 
87     //! Notify waiting thread
88     /** Can be called by any thread. */
89     void notify();
90 
91     //! Wait for notification
92     void wait();
93 
94 #if __TBB_USE_WINAPI
95     typedef HANDLE handle_type;
96 
97     #define __RML_DECL_THREAD_ROUTINE unsigned WINAPI
98     typedef unsigned (WINAPI *thread_routine_type)(void*);
99 
100     //! Launch a thread
101     static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const size_t* worker_index = nullptr );
102 
103 #elif __TBB_USE_POSIX
104     typedef pthread_t handle_type;
105 
106     #define __RML_DECL_THREAD_ROUTINE void*
107     typedef void*(*thread_routine_type)(void*);
108 
109     //! Launch a thread
110     static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size );
111 #endif /* __TBB_USE_POSIX */
112 
113     //! Join thread
114     static void join(handle_type handle);
115 
116     //! Detach thread
117     static void detach_thread(handle_type handle);
118 private:
119     // The protection from double notification of the binary semaphore
120     std::atomic<bool> my_notified{ false };
121     binary_semaphore my_sema;
122 #if __TBB_USE_POSIX
123     static void check( int error_code, const char* routine );
124 #endif
125 };
126 
127 #if __TBB_USE_WINAPI
128 
129 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
130 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
131 #endif
132 
133 // _beginthreadex API is not available in Windows 8 Store* applications, so use std::thread instead
134 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
launch(thread_routine_type thread_function,void * arg,std::size_t,const std::size_t *)135 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_function, void* arg, std::size_t, const std::size_t*) {
136 //TODO: check that exception thrown from std::thread is not swallowed silently
137     std::thread* thread_tmp=new std::thread(thread_function, arg);
138     return thread_tmp->native_handle();
139 }
140 #else
launch(thread_routine_type thread_routine,void * arg,std::size_t stack_size,const std::size_t * worker_index)141 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const std::size_t* worker_index ) {
142     unsigned thread_id;
143     int number_of_processor_groups = ( worker_index ) ? NumberOfProcessorGroups() : 0;
144     unsigned create_flags = ( number_of_processor_groups > 1 ) ? CREATE_SUSPENDED : 0;
145     HANDLE h = (HANDLE)_beginthreadex( nullptr, unsigned(stack_size), thread_routine, arg, STACK_SIZE_PARAM_IS_A_RESERVATION | create_flags, &thread_id );
146     if( !h ) {
147         handle_perror(0, "thread_monitor::launch: _beginthreadex failed\n");
148     }
149     if ( number_of_processor_groups > 1 ) {
150         MoveThreadIntoProcessorGroup( h, FindProcessorGroupIndex( static_cast<int>(*worker_index) ) );
151         ResumeThread( h );
152     }
153     return h;
154 }
155 #endif //__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
156 
join(handle_type handle)157 void thread_monitor::join(handle_type handle) {
158 #if TBB_USE_ASSERT
159     DWORD res =
160 #endif
161         WaitForSingleObjectEx(handle, INFINITE, FALSE);
162     __TBB_ASSERT( res==WAIT_OBJECT_0, nullptr);
163 #if TBB_USE_ASSERT
164     BOOL val =
165 #endif
166         CloseHandle(handle);
167     __TBB_ASSERT( val, nullptr);
168 }
169 
detach_thread(handle_type handle)170 void thread_monitor::detach_thread(handle_type handle) {
171 #if TBB_USE_ASSERT
172     BOOL val =
173 #endif
174         CloseHandle(handle);
175     __TBB_ASSERT( val, nullptr);
176 }
177 
178 #endif /* __TBB_USE_WINAPI */
179 
180 #if __TBB_USE_POSIX
check(int error_code,const char * routine)181 inline void thread_monitor::check( int error_code, const char* routine ) {
182     if( error_code ) {
183         handle_perror(error_code, routine);
184     }
185 }
186 
launch(void * (* thread_routine)(void *),void * arg,std::size_t stack_size)187 inline thread_monitor::handle_type thread_monitor::launch( void* (*thread_routine)(void*), void* arg, std::size_t stack_size ) {
188     // FIXME - consider more graceful recovery than just exiting if a thread cannot be launched.
189     // Note that there are some tricky situations to deal with, such that the thread is already
190     // grabbed as part of an OpenMP team.
191     pthread_attr_t s;
192     check(pthread_attr_init( &s ), "pthread_attr_init has failed");
193     if( stack_size>0 )
194         check(pthread_attr_setstacksize( &s, stack_size ), "pthread_attr_setstack_size has failed" );
195 
196     // pthread_create(2) can spuriously fail with EAGAIN. We retry
197     // max_num_tries times with progressively longer wait times.
198     pthread_t handle;
199     const int max_num_tries = 20;
200     int error = EAGAIN;
201 
202     for (int i = 0; i < max_num_tries && error == EAGAIN; i++) {
203       if (i != 0) {
204         // Wait i milliseconds
205         struct timespec ts = {0, i * 1000 * 1000};
206         nanosleep(&ts, NULL);
207       }
208       error = pthread_create(&handle, &s, thread_routine, arg);
209     }
210 
211     if (error)
212       handle_perror(error, "pthread_create has failed");
213 
214     check( pthread_attr_destroy( &s ), "pthread_attr_destroy has failed" );
215     return handle;
216 }
217 
join(handle_type handle)218 void thread_monitor::join(handle_type handle) {
219     check(pthread_join(handle, nullptr), "pthread_join has failed");
220 }
221 
detach_thread(handle_type handle)222 void thread_monitor::detach_thread(handle_type handle) {
223     check(pthread_detach(handle), "pthread_detach has failed");
224 }
225 #endif /* __TBB_USE_POSIX */
226 
notify()227 inline void thread_monitor::notify() {
228     // Check that the semaphore is not notified twice
229     if (!my_notified.exchange(true, std::memory_order_release)) {
230         my_sema.V();
231     }
232 }
233 
wait()234 inline void thread_monitor::wait() {
235     my_sema.P();
236     // memory_order_seq_cst is required here to be ordered with
237     // further load checking shutdown state
238     my_notified.store(false, std::memory_order_seq_cst);
239 }
240 
241 } // namespace internal
242 } // namespace rml
243 } // namespace r1
244 } // namespace detail
245 } // namespace tbb
246 
247 #endif /* __RML_thread_monitor_H */
248