1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // All platform-specific threading support is encapsulated here. */ 18 19 #ifndef __RML_thread_monitor_H 20 #define __RML_thread_monitor_H 21 22 #if __TBB_USE_WINAPI 23 #include <windows.h> 24 #include <process.h> 25 #include <malloc.h> //_alloca 26 #include "misc.h" // support for processor groups 27 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) 28 #include <thread> 29 #endif 30 #elif __TBB_USE_POSIX 31 #include <pthread.h> 32 #include <cstring> 33 #include <cstdlib> 34 #else 35 #error Unsupported platform 36 #endif 37 #include <cstdio> 38 39 #include "oneapi/tbb/detail/_template_helpers.h" 40 41 #include "itt_notify.h" 42 #include "semaphore.h" 43 44 // All platform-specific threading support is in this header. 45 46 #if (_WIN32||_WIN64)&&!__TBB_ipf 47 // Deal with 64K aliasing. The formula for "offset" is a Fibonacci hash function, 48 // which has the desirable feature of spreading out the offsets fairly evenly 49 // without knowing the total number of offsets, and furthermore unlikely to 50 // accidentally cancel out other 64K aliasing schemes that Microsoft might implement later. 51 // See Knuth Vol 3. "Theorem S" for details on Fibonacci hashing. 52 // The second statement is really does need "volatile", otherwise the compiler might remove the _alloca. 53 #define AVOID_64K_ALIASING(idx) \ 54 std::size_t offset = (idx+1) * 40503U % (1U<<16); \ 55 void* volatile sink_for_alloca = _alloca(offset); \ 56 __TBB_ASSERT_EX(sink_for_alloca, "_alloca failed"); 57 #else 58 // Linux thread allocators avoid 64K aliasing. 59 #define AVOID_64K_ALIASING(idx) tbb::detail::suppress_unused_warning(idx) 60 #endif /* _WIN32||_WIN64 */ 61 62 namespace tbb { 63 namespace detail { 64 namespace r1 { 65 66 // Forward declaration: throws std::runtime_error with what() returning error_code description prefixed with aux_info 67 void handle_perror(int error_code, const char* aux_info); 68 69 namespace rml { 70 namespace internal { 71 72 #if __TBB_USE_ITT_NOTIFY 73 static const ::tbb::detail::r1::tchar *SyncType_RML = _T("%Constant"); 74 static const ::tbb::detail::r1::tchar *SyncObj_ThreadMonitor = _T("RML Thr Monitor"); 75 #endif /* __TBB_USE_ITT_NOTIFY */ 76 77 //! Monitor with limited two-phase commit form of wait. 78 /** At most one thread should wait on an instance at a time. */ 79 class thread_monitor { 80 public: 81 class cookie { 82 friend class thread_monitor; 83 std::atomic<std::size_t> my_epoch{0}; 84 }; 85 thread_monitor() : skipped_wakeup(false), my_sema() { 86 ITT_SYNC_CREATE(&my_sema, SyncType_RML, SyncObj_ThreadMonitor); 87 } 88 ~thread_monitor() {} 89 90 //! If a thread is waiting or started a two-phase wait, notify it. 91 /** Can be called by any thread. */ 92 void notify(); 93 94 //! Begin two-phase wait. 95 /** Should only be called by thread that owns the monitor. 96 The caller must either complete the wait or cancel it. */ 97 void prepare_wait( cookie& c ); 98 99 //! Complete a two-phase wait and wait until notification occurs after the earlier prepare_wait. 100 void commit_wait( cookie& c ); 101 102 //! Cancel a two-phase wait. 103 void cancel_wait(); 104 105 #if __TBB_USE_WINAPI 106 typedef HANDLE handle_type; 107 108 #define __RML_DECL_THREAD_ROUTINE unsigned WINAPI 109 typedef unsigned (WINAPI *thread_routine_type)(void*); 110 111 //! Launch a thread 112 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const size_t* worker_index = NULL ); 113 114 #elif __TBB_USE_POSIX 115 typedef pthread_t handle_type; 116 117 #define __RML_DECL_THREAD_ROUTINE void* 118 typedef void*(*thread_routine_type)(void*); 119 120 //! Launch a thread 121 static handle_type launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size ); 122 #endif /* __TBB_USE_POSIX */ 123 124 //! Join thread 125 static void join(handle_type handle); 126 127 //! Detach thread 128 static void detach_thread(handle_type handle); 129 private: 130 cookie my_cookie; // epoch counter 131 std::atomic<bool> in_wait{false}; 132 bool skipped_wakeup; 133 binary_semaphore my_sema; 134 #if __TBB_USE_POSIX 135 static void check( int error_code, const char* routine ); 136 #endif 137 }; 138 139 #if __TBB_USE_WINAPI 140 141 #ifndef STACK_SIZE_PARAM_IS_A_RESERVATION 142 #define STACK_SIZE_PARAM_IS_A_RESERVATION 0x00010000 143 #endif 144 145 // _beginthreadex API is not available in Windows 8 Store* applications, so use std::thread instead 146 #if __TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) 147 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_function, void* arg, std::size_t, const std::size_t*) { 148 //TODO: check that exception thrown from std::thread is not swallowed silently 149 std::thread* thread_tmp=new std::thread(thread_function, arg); 150 return thread_tmp->native_handle(); 151 } 152 #else 153 inline thread_monitor::handle_type thread_monitor::launch( thread_routine_type thread_routine, void* arg, std::size_t stack_size, const std::size_t* worker_index ) { 154 unsigned thread_id; 155 int number_of_processor_groups = ( worker_index ) ? NumberOfProcessorGroups() : 0; 156 unsigned create_flags = ( number_of_processor_groups > 1 ) ? CREATE_SUSPENDED : 0; 157 HANDLE h = (HANDLE)_beginthreadex( NULL, unsigned(stack_size), thread_routine, arg, STACK_SIZE_PARAM_IS_A_RESERVATION | create_flags, &thread_id ); 158 if( !h ) { 159 handle_perror(0, "thread_monitor::launch: _beginthreadex failed\n"); 160 } 161 if ( number_of_processor_groups > 1 ) { 162 MoveThreadIntoProcessorGroup( h, FindProcessorGroupIndex( static_cast<int>(*worker_index) ) ); 163 ResumeThread( h ); 164 } 165 return h; 166 } 167 #endif //__TBB_WIN8UI_SUPPORT && (_WIN32_WINNT < 0x0A00) 168 169 void thread_monitor::join(handle_type handle) { 170 #if TBB_USE_ASSERT 171 DWORD res = 172 #endif 173 WaitForSingleObjectEx(handle, INFINITE, FALSE); 174 __TBB_ASSERT( res==WAIT_OBJECT_0, NULL ); 175 #if TBB_USE_ASSERT 176 BOOL val = 177 #endif 178 CloseHandle(handle); 179 __TBB_ASSERT( val, NULL ); 180 } 181 182 void thread_monitor::detach_thread(handle_type handle) { 183 #if TBB_USE_ASSERT 184 BOOL val = 185 #endif 186 CloseHandle(handle); 187 __TBB_ASSERT( val, NULL ); 188 } 189 190 #endif /* __TBB_USE_WINAPI */ 191 192 #if __TBB_USE_POSIX 193 inline void thread_monitor::check( int error_code, const char* routine ) { 194 if( error_code ) { 195 handle_perror(error_code, routine); 196 } 197 } 198 199 inline thread_monitor::handle_type thread_monitor::launch( void* (*thread_routine)(void*), void* arg, std::size_t stack_size ) { 200 // FIXME - consider more graceful recovery than just exiting if a thread cannot be launched. 201 // Note that there are some tricky situations to deal with, such that the thread is already 202 // grabbed as part of an OpenMP team. 203 pthread_attr_t s; 204 check(pthread_attr_init( &s ), "pthread_attr_init has failed"); 205 if( stack_size>0 ) 206 check(pthread_attr_setstacksize( &s, stack_size ), "pthread_attr_setstack_size has failed" ); 207 pthread_t handle; 208 check( pthread_create( &handle, &s, thread_routine, arg ), "pthread_create has failed" ); 209 check( pthread_attr_destroy( &s ), "pthread_attr_destroy has failed" ); 210 return handle; 211 } 212 213 void thread_monitor::join(handle_type handle) { 214 check(pthread_join(handle, NULL), "pthread_join has failed"); 215 } 216 217 void thread_monitor::detach_thread(handle_type handle) { 218 check(pthread_detach(handle), "pthread_detach has failed"); 219 } 220 #endif /* __TBB_USE_POSIX */ 221 222 inline void thread_monitor::notify() { 223 my_cookie.my_epoch.store(my_cookie.my_epoch.load(std::memory_order_acquire) + 1, std::memory_order_release); 224 bool do_signal = in_wait.exchange( false ); 225 if( do_signal ) 226 my_sema.V(); 227 } 228 229 inline void thread_monitor::prepare_wait( cookie& c ) { 230 if( skipped_wakeup ) { 231 // Lazily consume a signal that was skipped due to cancel_wait 232 skipped_wakeup = false; 233 my_sema.P(); // does not really wait on the semaphore 234 } 235 // Former c = my_cookie 236 c.my_epoch.store(my_cookie.my_epoch.load(std::memory_order_acquire), std::memory_order_release); 237 in_wait.store( true, std::memory_order_seq_cst ); 238 } 239 240 inline void thread_monitor::commit_wait( cookie& c ) { 241 bool do_it = ( c.my_epoch.load(std::memory_order_relaxed) == my_cookie.my_epoch.load(std::memory_order_acquire) ); 242 if( do_it ) { 243 my_sema.P(); 244 } else { 245 tbb::detail::atomic_backoff backoff; 246 while (in_wait.load(std::memory_order_relaxed)) { backoff.pause(); } 247 skipped_wakeup = true; 248 } 249 } 250 251 inline void thread_monitor::cancel_wait() { 252 // if not in_wait, then some thread has sent us a signal; 253 // it will be consumed by the next prepare_wait call 254 skipped_wakeup = ! in_wait.exchange( false ); 255 } 256 257 } // namespace internal 258 } // namespace rml 259 } // namespace r1 260 } // namespace detail 261 } // namespace tbb 262 263 #endif /* __RML_thread_monitor_H */ 264