1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 // All platform-specific threading support is encapsulated here. */
18
19 #ifndef __RML_thread_monitor_H
20 #define __RML_thread_monitor_H
21
22 #if __TBB_USE_WINAPI
23 #include <windows.h>
24 #include <process.h>
25 #include <malloc.h> //_alloca
26 #include "misc.h" // support for processor groups
27 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
28 #include <thread>
29 #endif
30 #elif __TBB_USE_POSIX
31 #include <pthread.h>
32 #include <cstring>
33 #include <cstdlib>
34 #include <time.h>
35 #else
36 #error Unsupported platform
37 #endif
38 #include <cstdio>
39
40 #include "oneapi/tbb/detail/_template_helpers.h"
41
42 #include "itt_notify.h"
43 #include "semaphore.h"
44
45 // All platform-specific threading support is in this header.
46
47 #if (_WIN32||_WIN64)&&!__TBB_ipf
48 // Deal with 64K aliasing. The formula for "offset" is a Fibonacci hash function,
49 // which has the desirable feature of spreading out the offsets fairly evenly
50 // without knowing the total number of offsets, and furthermore unlikely to
51 // accidentally cancel out other 64K aliasing schemes that Microsoft might implement later.
52 // See Knuth Vol 3. "Theorem S" for details on Fibonacci hashing.
53 // The second statement is really does need "volatile", otherwise the compiler might remove the _alloca.
54 #define AVOID_64K_ALIASING(idx) \
55 std::size_t offset = (idx+1) * 40503U % (1U<<16); \
56 void* volatile sink_for_alloca = _alloca(offset); \
57 __TBB_ASSERT_EX(sink_for_alloca, "_alloca failed");
58 #else
59 // Linux thread allocators avoid 64K aliasing.
60 #define AVOID_64K_ALIASING(idx) tbb::detail::suppress_unused_warning(idx)
61 #endif /* _WIN32||_WIN64 */
62
63 namespace tbb {
64 namespace detail {
65 namespace r1 {
66
67 // Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info
68 void handle_perror(int error_code, const char* aux_info);
69
70 namespace rml {
71 namespace internal {
72
73 #if __TBB_USE_ITT_NOTIFY
74 static const ::tbb::detail::r1::tchar *SyncType_RML = _T("%Constant");
75 static const ::tbb::detail::r1::tchar *SyncObj_ThreadMonitor = _T("RML Thr Monitor");
76 #endif /* __TBB_USE_ITT_NOTIFY */
77
78 //! Monitor with limited two-phase commit form of wait.
79 /** At most one thread should wait on an instance at a time. */
80 class thread_monitor {
81 public:
thread_monitor()82 thread_monitor() {
83 ITT_SYNC_CREATE(&my_sema, SyncType_RML, SyncObj_ThreadMonitor);
84 }
~thread_monitor()85 ~thread_monitor() {}
86
87 //! Notify waiting thread
88 /** Can be called by any thread. */
89 void notify();
90
91 //! Wait for notification
92 void wait();
93
94 #if __TBB_USE_WINAPI
95 typedef HANDLE handle_type;
96
97 #define __RML_DECL_THREAD_ROUTINE unsigned WINAPI
98 typedef unsigned (WINAPI *thread_routine_type)(void*);
99
100 //! Launch a thread
101 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const size_t* worker_index = nullptr );
102
103 #elif __TBB_USE_POSIX
104 typedef pthread_t handle_type;
105
106 #define __RML_DECL_THREAD_ROUTINE void*
107 typedef void*(*thread_routine_type)(void*);
108
109 //! Launch a thread
110 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size );
111 #endif /* __TBB_USE_POSIX */
112
113 //! Join thread
114 static void join(handle_type handle);
115
116 //! Detach thread
117 static void detach_thread(handle_type handle);
118 private:
119 // The protection from double notification of the binary semaphore
120 std::atomic<bool> my_notified{ false };
121 binary_semaphore my_sema;
122 #if __TBB_USE_POSIX
123 static void check( int error_code, const char* routine );
124 #endif
125 };
126
127 #if __TBB_USE_WINAPI
128
129 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION
130 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000
131 #endif
132
133 // _beginthreadex API is not available in Windows 8 Store* applications, so use std::thread instead
134 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
launch(thread_routine_type thread_function,void * arg,std::size_t,const std::size_t *)135 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_function, void* arg, std::size_t, const std::size_t*) {
136 //TODO: check that exception thrown from std::thread is not swallowed silently
137 std::thread* thread_tmp=new std::thread(thread_function, arg);
138 return thread_tmp->native_handle();
139 }
140 #else
launch(thread_routine_type thread_routine,void * arg,std::size_t stack_size,const std::size_t * worker_index)141 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const std::size_t* worker_index ) {
142 unsigned thread_id;
143 int number_of_processor_groups = ( worker_index ) ? NumberOfProcessorGroups() : 0;
144 unsigned create_flags = ( number_of_processor_groups > 1 ) ? CREATE_SUSPENDED : 0;
145 HANDLE h = (HANDLE)_beginthreadex( nullptr, unsigned(stack_size), thread_routine, arg, STACK_SIZE_PARAM_IS_A_RESERVATION | create_flags, &thread_id );
146 if( !h ) {
147 handle_perror(0, "thread_monitor::launch: _beginthreadex failed\n");
148 }
149 if ( number_of_processor_groups > 1 ) {
150 MoveThreadIntoProcessorGroup( h, FindProcessorGroupIndex( static_cast<int>(*worker_index) ) );
151 ResumeThread( h );
152 }
153 return h;
154 }
155 #endif //__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00)
156
join(handle_type handle)157 void thread_monitor::join(handle_type handle) {
158 #if TBB_USE_ASSERT
159 DWORD res =
160 #endif
161 WaitForSingleObjectEx(handle, INFINITE, FALSE);
162 __TBB_ASSERT( res==WAIT_OBJECT_0, nullptr);
163 #if TBB_USE_ASSERT
164 BOOL val =
165 #endif
166 CloseHandle(handle);
167 __TBB_ASSERT( val, nullptr);
168 }
169
detach_thread(handle_type handle)170 void thread_monitor::detach_thread(handle_type handle) {
171 #if TBB_USE_ASSERT
172 BOOL val =
173 #endif
174 CloseHandle(handle);
175 __TBB_ASSERT( val, nullptr);
176 }
177
178 #endif /* __TBB_USE_WINAPI */
179
180 #if __TBB_USE_POSIX
check(int error_code,const char * routine)181 inline void thread_monitor::check( int error_code, const char* routine ) {
182 if( error_code ) {
183 handle_perror(error_code, routine);
184 }
185 }
186
launch(void * (* thread_routine)(void *),void * arg,std::size_t stack_size)187 inline thread_monitor::handle_type thread_monitor::launch( void* (*thread_routine)(void*), void* arg, std::size_t stack_size ) {
188 // FIXME - consider more graceful recovery than just exiting if a thread cannot be launched.
189 // Note that there are some tricky situations to deal with, such that the thread is already
190 // grabbed as part of an OpenMP team.
191 pthread_attr_t s;
192 check(pthread_attr_init( &s ), "pthread_attr_init has failed");
193 if( stack_size>0 )
194 check(pthread_attr_setstacksize( &s, stack_size ), "pthread_attr_setstack_size has failed" );
195
196 // pthread_create(2) can spuriously fail with EAGAIN. We retry
197 // max_num_tries times with progressively longer wait times.
198 pthread_t handle;
199 const int max_num_tries = 20;
200 int error = EAGAIN;
201
202 for (int i = 0; i < max_num_tries && error == EAGAIN; i++) {
203 if (i != 0) {
204 // Wait i milliseconds
205 struct timespec ts = {0, i * 1000 * 1000};
206 nanosleep(&ts, NULL);
207 }
208 error = pthread_create(&handle, &s, thread_routine, arg);
209 }
210
211 if (error)
212 handle_perror(error, "pthread_create has failed");
213
214 check( pthread_attr_destroy( &s ), "pthread_attr_destroy has failed" );
215 return handle;
216 }
217
join(handle_type handle)218 void thread_monitor::join(handle_type handle) {
219 check(pthread_join(handle, nullptr), "pthread_join has failed");
220 }
221
detach_thread(handle_type handle)222 void thread_monitor::detach_thread(handle_type handle) {
223 check(pthread_detach(handle), "pthread_detach has failed");
224 }
225 #endif /* __TBB_USE_POSIX */
226
notify()227 inline void thread_monitor::notify() {
228 // Check that the semaphore is not notified twice
229 if (!my_notified.exchange(true, std::memory_order_release)) {
230 my_sema.V();
231 }
232 }
233
wait()234 inline void thread_monitor::wait() {
235 my_sema.P();
236 // memory_order_seq_cst is required here to be ordered with
237 // further load checking shutdown state
238 my_notified.store(false, std::memory_order_seq_cst);
239 }
240
241 } // namespace internal
242 } // namespace rml
243 } // namespace r1
244 } // namespace detail
245 } // namespace tbb
246
247 #endif /* __RML_thread_monitor_H */
248