1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // All platform-specific threading support is encapsulated here. */ 18 19 #ifndef __RML_thread_monitor_H 20 #define __RML_thread_monitor_H 21 22 #if __TBB_USE_WINAPI 23 #include <windows.h> 24 #include <process.h> 25 #include <malloc.h> //_alloca 26 #include "misc.h" // support for processor groups 27 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) 28 #include <thread> 29 #endif 30 #elif __TBB_USE_POSIX 31 #include <pthread.h> 32 #include <cstring> 33 #include <cstdlib> 34 #include <time.h> 35 #else 36 #error Unsupported platform 37 #endif 38 #include <cstdio> 39 40 #include "oneapi/tbb/detail/_template_helpers.h" 41 42 #include "itt_notify.h" 43 #include "semaphore.h" 44 45 // All platform-specific threading support is in this header. 46 47 #if (_WIN32||_WIN64)&&!__TBB_ipf 48 // Deal with 64K aliasing. The formula for "offset" is a Fibonacci hash function, 49 // which has the desirable feature of spreading out the offsets fairly evenly 50 // without knowing the total number of offsets, and furthermore unlikely to 51 // accidentally cancel out other 64K aliasing schemes that Microsoft might implement later. 52 // See Knuth Vol 3. "Theorem S" for details on Fibonacci hashing. 53 // The second statement is really does need "volatile", otherwise the compiler might remove the _alloca. 54 #define AVOID_64K_ALIASING(idx) \ 55 std::size_t offset = (idx+1) * 40503U % (1U<<16); \ 56 void* volatile sink_for_alloca = _alloca(offset); \ 57 __TBB_ASSERT_EX(sink_for_alloca, "_alloca failed"); 58 #else 59 // Linux thread allocators avoid 64K aliasing. 60 #define AVOID_64K_ALIASING(idx) tbb::detail::suppress_unused_warning(idx) 61 #endif /* _WIN32||_WIN64 */ 62 63 namespace tbb { 64 namespace detail { 65 namespace r1 { 66 67 // Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info 68 void handle_perror(int error_code, const char* aux_info); 69 70 namespace rml { 71 namespace internal { 72 73 #if __TBB_USE_ITT_NOTIFY 74 static const ::tbb::detail::r1::tchar *SyncType_RML = _T("%Constant"); 75 static const ::tbb::detail::r1::tchar *SyncObj_ThreadMonitor = _T("RML Thr Monitor"); 76 #endif /* __TBB_USE_ITT_NOTIFY */ 77 78 //! Monitor with limited two-phase commit form of wait. 79 /** At most one thread should wait on an instance at a time. */ 80 class thread_monitor { 81 public: 82 thread_monitor() { 83 ITT_SYNC_CREATE(&my_sema, SyncType_RML, SyncObj_ThreadMonitor); 84 } 85 ~thread_monitor() {} 86 87 //! Notify waiting thread 88 /** Can be called by any thread. */ 89 void notify(); 90 91 //! Wait for notification 92 void wait(); 93 94 #if __TBB_USE_WINAPI 95 typedef HANDLE handle_type; 96 97 #define __RML_DECL_THREAD_ROUTINE unsigned WINAPI 98 typedef unsigned (WINAPI *thread_routine_type)(void*); 99 100 //! Launch a thread 101 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const size_t* worker_index = nullptr ); 102 103 #elif __TBB_USE_POSIX 104 typedef pthread_t handle_type; 105 106 #define __RML_DECL_THREAD_ROUTINE void* 107 typedef void*(*thread_routine_type)(void*); 108 109 //! Launch a thread 110 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size ); 111 #endif /* __TBB_USE_POSIX */ 112 113 //! Join thread 114 static void join(handle_type handle); 115 116 //! Detach thread 117 static void detach_thread(handle_type handle); 118 private: 119 // The protection from double notification of the binary semaphore 120 std::atomic<bool> my_notified{ false }; 121 binary_semaphore my_sema; 122 #if __TBB_USE_POSIX 123 static void check( int error_code, const char* routine ); 124 #endif 125 }; 126 127 #if __TBB_USE_WINAPI 128 129 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION 130 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000 131 #endif 132 133 // _beginthreadex API is not available in Windows 8 Store* applications, so use std::thread instead 134 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) 135 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_function, void* arg, std::size_t, const std::size_t*) { 136 //TODO: check that exception thrown from std::thread is not swallowed silently 137 std::thread* thread_tmp=new std::thread(thread_function, arg); 138 return thread_tmp->native_handle(); 139 } 140 #else 141 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const std::size_t* worker_index ) { 142 unsigned thread_id; 143 int number_of_processor_groups = ( worker_index ) ? NumberOfProcessorGroups() : 0; 144 unsigned create_flags = ( number_of_processor_groups > 1 ) ? CREATE_SUSPENDED : 0; 145 HANDLE h = (HANDLE)_beginthreadex( nullptr, unsigned(stack_size), thread_routine, arg, STACK_SIZE_PARAM_IS_A_RESERVATION | create_flags, &thread_id ); 146 if( !h ) { 147 handle_perror(0, "thread_monitor::launch: _beginthreadex failed\n"); 148 } 149 if ( number_of_processor_groups > 1 ) { 150 MoveThreadIntoProcessorGroup( h, FindProcessorGroupIndex( static_cast<int>(*worker_index) ) ); 151 ResumeThread( h ); 152 } 153 return h; 154 } 155 #endif //__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) 156 157 void thread_monitor::join(handle_type handle) { 158 #if TBB_USE_ASSERT 159 DWORD res = 160 #endif 161 WaitForSingleObjectEx(handle, INFINITE, FALSE); 162 __TBB_ASSERT( res==WAIT_OBJECT_0, nullptr); 163 #if TBB_USE_ASSERT 164 BOOL val = 165 #endif 166 CloseHandle(handle); 167 __TBB_ASSERT( val, nullptr); 168 } 169 170 void thread_monitor::detach_thread(handle_type handle) { 171 #if TBB_USE_ASSERT 172 BOOL val = 173 #endif 174 CloseHandle(handle); 175 __TBB_ASSERT( val, nullptr); 176 } 177 178 #endif /* __TBB_USE_WINAPI */ 179 180 #if __TBB_USE_POSIX 181 inline void thread_monitor::check( int error_code, const char* routine ) { 182 if( error_code ) { 183 handle_perror(error_code, routine); 184 } 185 } 186 187 inline thread_monitor::handle_type thread_monitor::launch( void* (*thread_routine)(void*), void* arg, std::size_t stack_size ) { 188 // FIXME - consider more graceful recovery than just exiting if a thread cannot be launched. 189 // Note that there are some tricky situations to deal with, such that the thread is already 190 // grabbed as part of an OpenMP team. 191 pthread_attr_t s; 192 check(pthread_attr_init( &s ), "pthread_attr_init has failed"); 193 if( stack_size>0 ) 194 check(pthread_attr_setstacksize( &s, stack_size ), "pthread_attr_setstack_size has failed" ); 195 196 // pthread_create(2) can spuriously fail with EAGAIN. We retry 197 // max_num_tries times with progressively longer wait times. 198 pthread_t handle; 199 const int max_num_tries = 20; 200 int error = EAGAIN; 201 202 for (int i = 0; i < max_num_tries && error == EAGAIN; i++) { 203 if (i != 0) { 204 // Wait i milliseconds 205 struct timespec ts = {0, i * 1000 * 1000}; 206 nanosleep(&ts, NULL); 207 } 208 error = pthread_create(&handle, &s, thread_routine, arg); 209 } 210 211 if (error) 212 handle_perror(error, "pthread_create has failed"); 213 214 check( pthread_attr_destroy( &s ), "pthread_attr_destroy has failed" ); 215 return handle; 216 } 217 218 void thread_monitor::join(handle_type handle) { 219 check(pthread_join(handle, nullptr), "pthread_join has failed"); 220 } 221 222 void thread_monitor::detach_thread(handle_type handle) { 223 check(pthread_detach(handle), "pthread_detach has failed"); 224 } 225 #endif /* __TBB_USE_POSIX */ 226 227 inline void thread_monitor::notify() { 228 // Check that the semaphore is not notified twice 229 if (!my_notified.exchange(true, std::memory_order_release)) { 230 my_sema.V(); 231 } 232 } 233 234 inline void thread_monitor::wait() { 235 my_sema.P(); 236 // memory_order_seq_cst is required here to be ordered with 237 // further load checking shutdown state 238 my_notified.store(false, std::memory_order_seq_cst); 239 } 240 241 } // namespace internal 242 } // namespace rml 243 } // namespace r1 244 } // namespace detail 245 } // namespace tbb 246 247 #endif /* __RML_thread_monitor_H */ 248