1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/cache_aligned_allocator.h" 18 19 #include "rml_tbb.h" 20 #include "rml_thread_monitor.h" 21 22 #include "scheduler_common.h" 23 #include "governor.h" 24 #include "misc.h" 25 26 #include <atomic> 27 28 29 namespace tbb { 30 namespace detail { 31 namespace r1 { 32 namespace rml { 33 34 using rml::internal::thread_monitor; 35 typedef thread_monitor::handle_type thread_handle; 36 37 class private_server; 38 39 class private_worker: no_copy { 40 private: 41 //! State in finite-state machine that controls the worker. 42 /** State diagram: 43 init --> starting --> normal 44 | | | 45 | V | 46 \------> quit <------/ 47 */ 48 enum state_t { 49 //! *this is initialized 50 st_init, 51 //! *this has associated thread that is starting up. 52 st_starting, 53 //! Associated thread is doing normal life sequence. 54 st_normal, 55 //! Associated thread has ended normal life sequence and promises to never touch *this again. 56 st_quit 57 }; 58 std::atomic<state_t> my_state; 59 60 //! Associated server 61 private_server& my_server; 62 63 //! Associated client 64 tbb_client& my_client; 65 66 //! index used for avoiding the 64K aliasing problem 67 const std::size_t my_index; 68 69 //! Monitor for sleeping when there is no work to do. 70 /** The invariant that holds for sleeping workers is: 71 "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */ 72 thread_monitor my_thread_monitor; 73 74 //! Handle of the OS thread associated with this worker 75 thread_handle my_handle; 76 77 //! Link for list of workers that are sleeping or have no associated thread. 78 private_worker* my_next; 79 80 friend class private_server; 81 82 //! Actions executed by the associated thread 83 void run() noexcept; 84 85 //! Wake up associated thread (or launch a thread if there is none) 86 void wake_or_launch(); 87 88 //! Called by a thread (usually not the associated thread) to commence termination. 89 void start_shutdown(); 90 91 static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg ); 92 93 static void release_handle(thread_handle my_handle, bool join); 94 95 protected: 96 private_worker( private_server& server, tbb_client& client, const std::size_t i ) : 97 my_state(st_init), my_server(server), my_client(client), my_index(i), 98 my_thread_monitor(), my_handle(), my_next() 99 {} 100 }; 101 102 static const std::size_t cache_line_size = tbb::detail::max_nfs_size; 103 104 #if _MSC_VER && !defined(__INTEL_COMPILER) 105 // Suppress overzealous compiler warnings about uninstantiable class 106 #pragma warning(push) 107 #pragma warning(disable:4510 4610) 108 #endif 109 class padded_private_worker: public private_worker { 110 char pad[cache_line_size - sizeof(private_worker)%cache_line_size]; 111 public: 112 padded_private_worker( private_server& server, tbb_client& client, const std::size_t i ) 113 : private_worker(server,client,i) { suppress_unused_warning(pad); } 114 }; 115 #if _MSC_VER && !defined(__INTEL_COMPILER) 116 #pragma warning(pop) 117 #endif 118 119 class private_server: public tbb_server, no_copy { 120 private: 121 tbb_client& my_client; 122 //! Maximum number of threads to be created. 123 /** Threads are created lazily, so maximum might not actually be reached. */ 124 const tbb_client::size_type my_n_thread; 125 126 //! Stack size for each thread. */ 127 const std::size_t my_stack_size; 128 129 //! Number of jobs that could use their associated thread minus number of active threads. 130 /** If negative, indicates oversubscription. 131 If positive, indicates that more threads should run. 132 Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex, 133 because raising it impacts the invariant for sleeping threads. */ 134 std::atomic<int> my_slack; 135 136 //! Counter used to determine when to delete this. 137 std::atomic<int> my_ref_count; 138 139 padded_private_worker* my_thread_array; 140 141 //! List of workers that are asleep or committed to sleeping until notified by another thread. 142 std::atomic<private_worker*> my_asleep_list_root; 143 144 //! Protects my_asleep_list_root 145 typedef scheduler_mutex_type asleep_list_mutex_type; 146 asleep_list_mutex_type my_asleep_list_mutex; 147 148 #if TBB_USE_ASSERT 149 std::atomic<int> my_net_slack_requests; 150 #endif /* TBB_USE_ASSERT */ 151 152 //! Wake up to two sleeping workers, if there are any sleeping. 153 /** The call is used to propagate a chain reaction where each thread wakes up two threads, 154 which in turn each wake up two threads, etc. */ 155 void propagate_chain_reaction() { 156 // First test of a double-check idiom. Second test is inside wake_some(0). 157 if( my_asleep_list_root.load(std::memory_order_acquire) ) 158 wake_some(0); 159 } 160 161 //! Try to add t to list of sleeping workers 162 bool try_insert_in_asleep_list( private_worker& t ); 163 164 //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits. 165 void wake_some( int additional_slack ); 166 167 virtual ~private_server(); 168 169 void remove_server_ref() { 170 if( --my_ref_count==0 ) { 171 my_client.acknowledge_close_connection(); 172 this->~private_server(); 173 tbb::cache_aligned_allocator<private_server>().deallocate( this, 1 ); 174 } 175 } 176 177 friend class private_worker; 178 public: 179 private_server( tbb_client& client ); 180 181 version_type version() const override { 182 return 0; 183 } 184 185 void request_close_connection( bool /*exiting*/ ) override { 186 for( std::size_t i=0; i<my_n_thread; ++i ) 187 my_thread_array[i].start_shutdown(); 188 remove_server_ref(); 189 } 190 191 void yield() override { d0::yield(); } 192 193 void independent_thread_number_changed( int ) override {__TBB_ASSERT(false,NULL);} 194 195 unsigned default_concurrency() const override { return governor::default_num_threads() - 1; } 196 197 void adjust_job_count_estimate( int delta ) override; 198 199 #if _WIN32||_WIN64 200 void register_external_thread ( ::rml::server::execution_resource_t& ) override {} 201 void unregister_external_thread ( ::rml::server::execution_resource_t ) override {} 202 #endif /* _WIN32||_WIN64 */ 203 }; 204 205 //------------------------------------------------------------------------ 206 // Methods of private_worker 207 //------------------------------------------------------------------------ 208 #if _MSC_VER && !defined(__INTEL_COMPILER) 209 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 210 #pragma warning(push) 211 #pragma warning(disable:4189) 212 #endif 213 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 214 // ensure that stack is properly aligned for TBB threads 215 __attribute__((force_align_arg_pointer)) 216 #endif 217 __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) { 218 private_worker* self = static_cast<private_worker*>(arg); 219 AVOID_64K_ALIASING( self->my_index ); 220 self->run(); 221 return 0; 222 } 223 #if _MSC_VER && !defined(__INTEL_COMPILER) 224 #pragma warning(pop) 225 #endif 226 227 void private_worker::release_handle(thread_handle handle, bool join) { 228 if (join) 229 thread_monitor::join(handle); 230 else 231 thread_monitor::detach_thread(handle); 232 } 233 234 void private_worker::start_shutdown() { 235 state_t expected_state = my_state.load(std::memory_order_acquire); 236 __TBB_ASSERT( expected_state!=st_quit, NULL ); 237 238 while( !my_state.compare_exchange_strong( expected_state, st_quit ) ); 239 240 if( expected_state==st_normal || expected_state==st_starting ) { 241 // May have invalidated invariant for sleeping, so wake up the thread. 242 // Note that the notify() here occurs without maintaining invariants for my_slack. 243 // It does not matter, because my_state==st_quit overrides checking of my_slack. 244 my_thread_monitor.notify(); 245 // Do not need release handle in st_init state, 246 // because in this case the thread wasn't started yet. 247 // For st_starting release is done at launch site. 248 if (expected_state==st_normal) 249 release_handle(my_handle, governor::does_client_join_workers(my_client)); 250 } else if( expected_state==st_init ) { 251 // Perform action that otherwise would be performed by associated thread when it quits. 252 my_server.remove_server_ref(); 253 } 254 } 255 256 void private_worker::run() noexcept { 257 my_server.propagate_chain_reaction(); 258 259 // Transiting to st_normal here would require setting my_handle, 260 // which would create race with the launching thread and 261 // complications in handle management on Windows. 262 263 ::rml::job& j = *my_client.create_one_job(); 264 while( my_state.load(std::memory_order_acquire)!=st_quit ) { 265 if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) { 266 my_client.process(j); 267 } else { 268 thread_monitor::cookie c; 269 // Prepare to wait 270 my_thread_monitor.prepare_wait(c); 271 // Check/set the invariant for sleeping 272 if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) { 273 my_thread_monitor.commit_wait(c); 274 __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" ); 275 my_server.propagate_chain_reaction(); 276 } else { 277 // Invariant broken 278 my_thread_monitor.cancel_wait(); 279 } 280 } 281 } 282 my_client.cleanup(j); 283 284 ++my_server.my_slack; 285 my_server.remove_server_ref(); 286 } 287 288 inline void private_worker::wake_or_launch() { 289 state_t expected_state = st_init; 290 if( my_state.compare_exchange_strong( expected_state, st_starting ) ) { 291 // after this point, remove_server_ref() must be done by created thread 292 #if __TBB_USE_WINAPI 293 my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 294 #elif __TBB_USE_POSIX 295 { 296 affinity_helper fpa; 297 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 298 my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 299 // Implicit destruction of fpa resets original affinity mask. 300 } 301 #endif /* __TBB_USE_POSIX */ 302 expected_state = st_starting; 303 if ( !my_state.compare_exchange_strong( expected_state, st_normal ) ) { 304 // Do shutdown during startup. my_handle can't be released 305 // by start_shutdown, because my_handle value might be not set yet 306 // at time of transition from st_starting to st_quit. 307 __TBB_ASSERT( expected_state==st_quit, NULL ); 308 release_handle(my_handle, governor::does_client_join_workers(my_client)); 309 } 310 } 311 else { 312 __TBB_ASSERT( !my_next, "Should not wake a thread while it's still in asleep list" ); 313 my_thread_monitor.notify(); 314 } 315 } 316 317 //------------------------------------------------------------------------ 318 // Methods of private_server 319 //------------------------------------------------------------------------ 320 private_server::private_server( tbb_client& client ) : 321 my_client(client), 322 my_n_thread(client.max_job_count()), 323 my_stack_size(client.min_stack_size()), 324 my_slack(0), 325 my_ref_count(my_n_thread+1), 326 my_thread_array(NULL), 327 my_asleep_list_root(NULL) 328 #if TBB_USE_ASSERT 329 , my_net_slack_requests(0) 330 #endif /* TBB_USE_ASSERT */ 331 { 332 my_thread_array = tbb::cache_aligned_allocator<padded_private_worker>().allocate( my_n_thread ); 333 for( std::size_t i=0; i<my_n_thread; ++i ) { 334 private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i ); 335 t->my_next = my_asleep_list_root.exchange(t, std::memory_order_relaxed); 336 } 337 } 338 339 private_server::~private_server() { 340 __TBB_ASSERT( my_net_slack_requests==0, NULL ); 341 for( std::size_t i=my_n_thread; i--; ) 342 my_thread_array[i].~padded_private_worker(); 343 tbb::cache_aligned_allocator<padded_private_worker>().deallocate( my_thread_array, my_n_thread ); 344 tbb::detail::poison_pointer( my_thread_array ); 345 } 346 347 inline bool private_server::try_insert_in_asleep_list( private_worker& t ) { 348 asleep_list_mutex_type::scoped_lock lock; 349 if( !lock.try_acquire(my_asleep_list_mutex) ) 350 return false; 351 // Contribute to slack under lock so that if another takes that unit of slack, 352 // it sees us sleeping on the list and wakes us up. 353 int k = ++my_slack; 354 if( k<=0 ) { 355 t.my_next = my_asleep_list_root.exchange(&t, std::memory_order_relaxed); 356 return true; 357 } else { 358 --my_slack; 359 return false; 360 } 361 } 362 363 void private_server::wake_some( int additional_slack ) { 364 __TBB_ASSERT( additional_slack>=0, NULL ); 365 private_worker* wakee[2]; 366 private_worker**w = wakee; 367 { 368 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 369 while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) { 370 if( additional_slack>0 ) { 371 // additional demand does not exceed surplus supply 372 if ( additional_slack+my_slack.load(std::memory_order_acquire)<=0 ) 373 break; 374 --additional_slack; 375 } else { 376 // Chain reaction; Try to claim unit of slack 377 int old = my_slack; 378 do { 379 if( old<=0 ) goto done; 380 } while( !my_slack.compare_exchange_strong(old,old-1) ); 381 } 382 // Pop sleeping worker to combine with claimed unit of slack 383 auto old = my_asleep_list_root.load(std::memory_order_relaxed); 384 my_asleep_list_root.store(old->my_next, std::memory_order_relaxed); 385 *w++ = old; 386 } 387 if( additional_slack ) { 388 // Contribute our unused slack to my_slack. 389 my_slack += additional_slack; 390 } 391 } 392 done: 393 while( w>wakee ) { 394 private_worker* ww = *--w; 395 ww->my_next = NULL; 396 ww->wake_or_launch(); 397 } 398 } 399 400 void private_server::adjust_job_count_estimate( int delta ) { 401 #if TBB_USE_ASSERT 402 my_net_slack_requests+=delta; 403 #endif /* TBB_USE_ASSERT */ 404 if( delta<0 ) { 405 my_slack+=delta; 406 } else if( delta>0 ) { 407 wake_some( delta ); 408 } 409 } 410 411 //! Factory method called from task.cpp to create a private_server. 412 tbb_server* make_private_server( tbb_client& client ) { 413 return new( tbb::cache_aligned_allocator<private_server>().allocate(1) ) private_server(client); 414 } 415 416 } // namespace rml 417 } // namespace r1 418 } // namespace detail 419 } // namespace tbb 420 421