1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "oneapi/tbb/cache_aligned_allocator.h" 18 #include "oneapi/tbb/mutex.h" 19 20 #include "rml_tbb.h" 21 #include "rml_thread_monitor.h" 22 23 #include "scheduler_common.h" 24 #include "governor.h" 25 #include "misc.h" 26 27 #include <atomic> 28 29 30 namespace tbb { 31 namespace detail { 32 namespace r1 { 33 namespace rml { 34 35 using rml::internal::thread_monitor; 36 typedef thread_monitor::handle_type thread_handle; 37 38 class private_server; 39 40 class private_worker: no_copy { 41 private: 42 //! State in finite-state machine that controls the worker. 43 /** State diagram: 44 init --> starting --> normal 45 | | | 46 | V | 47 \------> quit <------/ 48 */ 49 enum state_t { 50 //! *this is initialized 51 st_init, 52 //! *this has associated thread that is starting up. 53 st_starting, 54 //! Associated thread is doing normal life sequence. 55 st_normal, 56 //! Associated thread has ended normal life sequence and promises to never touch *this again. 57 st_quit 58 }; 59 std::atomic<state_t> my_state; 60 61 //! Associated server 62 private_server& my_server; 63 64 //! Associated client 65 tbb_client& my_client; 66 67 //! index used for avoiding the 64K aliasing problem 68 const std::size_t my_index; 69 70 //! Monitor for sleeping when there is no work to do. 71 /** The invariant that holds for sleeping workers is: 72 "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */ 73 thread_monitor my_thread_monitor; 74 75 //! Handle of the OS thread associated with this worker 76 thread_handle my_handle; 77 78 //! Link for list of workers that are sleeping or have no associated thread. 79 private_worker* my_next; 80 81 friend class private_server; 82 83 //! Actions executed by the associated thread 84 void run() noexcept; 85 86 //! Wake up associated thread (or launch a thread if there is none) 87 void wake_or_launch(); 88 89 //! Called by a thread (usually not the associated thread) to commence termination. 90 void start_shutdown(); 91 92 static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg ); 93 94 static void release_handle(thread_handle my_handle, bool join); 95 96 protected: 97 private_worker( private_server& server, tbb_client& client, const std::size_t i ) : 98 my_state(st_init), my_server(server), my_client(client), my_index(i), 99 my_handle(), my_next() 100 {} 101 }; 102 103 static const std::size_t cache_line_size = tbb::detail::max_nfs_size; 104 105 #if _MSC_VER && !defined(__INTEL_COMPILER) 106 // Suppress overzealous compiler warnings about uninstantiable class 107 #pragma warning(push) 108 #pragma warning(disable:4510 4610) 109 #endif 110 class padded_private_worker: public private_worker { 111 char pad[cache_line_size - sizeof(private_worker)%cache_line_size]; 112 public: 113 padded_private_worker( private_server& server, tbb_client& client, const std::size_t i ) 114 : private_worker(server,client,i) { suppress_unused_warning(pad); } 115 }; 116 #if _MSC_VER && !defined(__INTEL_COMPILER) 117 #pragma warning(pop) 118 #endif 119 120 class private_server: public tbb_server, no_copy { 121 private: 122 tbb_client& my_client; 123 //! Maximum number of threads to be created. 124 /** Threads are created lazily, so maximum might not actually be reached. */ 125 const tbb_client::size_type my_n_thread; 126 127 //! Stack size for each thread. */ 128 const std::size_t my_stack_size; 129 130 //! Number of jobs that could use their associated thread minus number of active threads. 131 /** If negative, indicates oversubscription. 132 If positive, indicates that more threads should run. 133 Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex, 134 because raising it impacts the invariant for sleeping threads. */ 135 std::atomic<int> my_slack; 136 137 //! Counter used to determine when to delete this. 138 std::atomic<int> my_ref_count; 139 140 padded_private_worker* my_thread_array; 141 142 //! List of workers that are asleep or committed to sleeping until notified by another thread. 143 std::atomic<private_worker*> my_asleep_list_root; 144 145 //! Protects my_asleep_list_root 146 typedef mutex asleep_list_mutex_type; 147 asleep_list_mutex_type my_asleep_list_mutex; 148 149 #if TBB_USE_ASSERT 150 std::atomic<int> my_net_slack_requests; 151 #endif /* TBB_USE_ASSERT */ 152 153 //! Wake up to two sleeping workers, if there are any sleeping. 154 /** The call is used to propagate a chain reaction where each thread wakes up two threads, 155 which in turn each wake up two threads, etc. */ 156 void propagate_chain_reaction() { 157 // First test of a double-check idiom. Second test is inside wake_some(0). 158 if( my_asleep_list_root.load(std::memory_order_relaxed) ) 159 wake_some(0); 160 } 161 162 //! Try to add t to list of sleeping workers 163 bool try_insert_in_asleep_list( private_worker& t ); 164 165 //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits. 166 void wake_some( int additional_slack ); 167 168 ~private_server() override; 169 170 void remove_server_ref() { 171 if( --my_ref_count==0 ) { 172 my_client.acknowledge_close_connection(); 173 this->~private_server(); 174 tbb::cache_aligned_allocator<private_server>().deallocate( this, 1 ); 175 } 176 } 177 178 friend class private_worker; 179 public: 180 private_server( tbb_client& client ); 181 182 version_type version() const override { 183 return 0; 184 } 185 186 void request_close_connection( bool /*exiting*/ ) override { 187 for( std::size_t i=0; i<my_n_thread; ++i ) 188 my_thread_array[i].start_shutdown(); 189 remove_server_ref(); 190 } 191 192 void yield() override { d0::yield(); } 193 194 void independent_thread_number_changed( int ) override {__TBB_ASSERT(false, nullptr);} 195 196 unsigned default_concurrency() const override { return governor::default_num_threads() - 1; } 197 198 void adjust_job_count_estimate( int delta ) override; 199 200 #if _WIN32 || _WIN64 201 void register_external_thread ( ::rml::server::execution_resource_t& ) override {} 202 void unregister_external_thread ( ::rml::server::execution_resource_t ) override {} 203 #endif /* _WIN32||_WIN64 */ 204 }; 205 206 //------------------------------------------------------------------------ 207 // Methods of private_worker 208 //------------------------------------------------------------------------ 209 #if _MSC_VER && !defined(__INTEL_COMPILER) 210 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced 211 #pragma warning(push) 212 #pragma warning(disable:4189) 213 #endif 214 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__ 215 // ensure that stack is properly aligned for TBB threads 216 __attribute__((force_align_arg_pointer)) 217 #endif 218 __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) { 219 private_worker* self = static_cast<private_worker*>(arg); 220 AVOID_64K_ALIASING( self->my_index ); 221 self->run(); 222 // return 0 instead of nullptr due to the difference in the type __RML_DECL_THREAD_ROUTINE on various OSs 223 return 0; 224 } 225 #if _MSC_VER && !defined(__INTEL_COMPILER) 226 #pragma warning(pop) 227 #endif 228 229 void private_worker::release_handle(thread_handle handle, bool join) { 230 if (join) 231 thread_monitor::join(handle); 232 else 233 thread_monitor::detach_thread(handle); 234 } 235 236 void private_worker::start_shutdown() { 237 // The state can be transferred only in one direction: st_init -> st_starting -> st_normal. 238 // So we do not need more than three CAS attempts. 239 state_t expected_state = my_state.load(std::memory_order_relaxed); 240 __TBB_ASSERT(expected_state != st_quit, "The quit state is expected to be set only once"); 241 if (!my_state.compare_exchange_strong(expected_state, st_quit)) { 242 __TBB_ASSERT(expected_state == st_starting || expected_state == st_normal, "We failed once so the init state is not expected"); 243 if (!my_state.compare_exchange_strong(expected_state, st_quit)) { 244 __TBB_ASSERT(expected_state == st_normal, "We failed twice so only the normal state is expected"); 245 bool res = my_state.compare_exchange_strong(expected_state, st_quit); 246 __TBB_ASSERT_EX(res, "We cannot fail in the normal state"); 247 } 248 } 249 250 if( expected_state==st_normal || expected_state==st_starting ) { 251 // May have invalidated invariant for sleeping, so wake up the thread. 252 // Note that the notify() here occurs without maintaining invariants for my_slack. 253 // It does not matter, because my_state==st_quit overrides checking of my_slack. 254 my_thread_monitor.notify(); 255 // Do not need release handle in st_init state, 256 // because in this case the thread wasn't started yet. 257 // For st_starting release is done at launch site. 258 if (expected_state==st_normal) 259 release_handle(my_handle, governor::does_client_join_workers(my_client)); 260 } else if( expected_state==st_init ) { 261 // Perform action that otherwise would be performed by associated thread when it quits. 262 my_server.remove_server_ref(); 263 } 264 } 265 266 void private_worker::run() noexcept { 267 my_server.propagate_chain_reaction(); 268 269 // Transiting to st_normal here would require setting my_handle, 270 // which would create race with the launching thread and 271 // complications in handle management on Windows. 272 273 ::rml::job& j = *my_client.create_one_job(); 274 while( my_state.load(std::memory_order_acquire)!=st_quit ) { 275 if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) { 276 my_client.process(j); 277 } else { 278 thread_monitor::cookie c; 279 // Prepare to wait 280 my_thread_monitor.prepare_wait(c); 281 // Check/set the invariant for sleeping 282 // We need memory_order_seq_cst to enforce ordering with prepare_wait 283 // (note that a store in prepare_wait should be with memory_order_seq_cst as well) 284 if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) { 285 my_thread_monitor.commit_wait(c); 286 __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" ); 287 my_server.propagate_chain_reaction(); 288 } else { 289 // Invariant broken 290 my_thread_monitor.cancel_wait(); 291 } 292 } 293 } 294 my_client.cleanup(j); 295 296 ++my_server.my_slack; 297 my_server.remove_server_ref(); 298 } 299 300 inline void private_worker::wake_or_launch() { 301 state_t expected_state = st_init; 302 if( my_state.compare_exchange_strong( expected_state, st_starting ) ) { 303 // after this point, remove_server_ref() must be done by created thread 304 #if __TBB_USE_WINAPI 305 my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index ); 306 #elif __TBB_USE_POSIX 307 { 308 affinity_helper fpa; 309 fpa.protect_affinity_mask( /*restore_process_mask=*/true ); 310 my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size ); 311 // Implicit destruction of fpa resets original affinity mask. 312 } 313 #endif /* __TBB_USE_POSIX */ 314 expected_state = st_starting; 315 if ( !my_state.compare_exchange_strong( expected_state, st_normal ) ) { 316 // Do shutdown during startup. my_handle can't be released 317 // by start_shutdown, because my_handle value might be not set yet 318 // at time of transition from st_starting to st_quit. 319 __TBB_ASSERT( expected_state==st_quit, nullptr); 320 release_handle(my_handle, governor::does_client_join_workers(my_client)); 321 } 322 } 323 else { 324 __TBB_ASSERT( !my_next, "Should not wake a thread while it's still in asleep list" ); 325 my_thread_monitor.notify(); 326 } 327 } 328 329 //------------------------------------------------------------------------ 330 // Methods of private_server 331 //------------------------------------------------------------------------ 332 private_server::private_server( tbb_client& client ) : 333 my_client(client), 334 my_n_thread(client.max_job_count()), 335 my_stack_size(client.min_stack_size()), 336 my_slack(0), 337 my_ref_count(my_n_thread+1), 338 my_thread_array(nullptr), 339 my_asleep_list_root(nullptr) 340 #if TBB_USE_ASSERT 341 , my_net_slack_requests(0) 342 #endif /* TBB_USE_ASSERT */ 343 { 344 my_thread_array = tbb::cache_aligned_allocator<padded_private_worker>().allocate( my_n_thread ); 345 for( std::size_t i=0; i<my_n_thread; ++i ) { 346 private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i ); 347 t->my_next = my_asleep_list_root.load(std::memory_order_relaxed); 348 my_asleep_list_root.store(t, std::memory_order_relaxed); 349 } 350 } 351 352 private_server::~private_server() { 353 __TBB_ASSERT( my_net_slack_requests==0, nullptr); 354 for( std::size_t i=my_n_thread; i--; ) 355 my_thread_array[i].~padded_private_worker(); 356 tbb::cache_aligned_allocator<padded_private_worker>().deallocate( my_thread_array, my_n_thread ); 357 tbb::detail::poison_pointer( my_thread_array ); 358 } 359 360 inline bool private_server::try_insert_in_asleep_list( private_worker& t ) { 361 asleep_list_mutex_type::scoped_lock lock; 362 if( !lock.try_acquire(my_asleep_list_mutex) ) 363 return false; 364 // Contribute to slack under lock so that if another takes that unit of slack, 365 // it sees us sleeping on the list and wakes us up. 366 auto expected = my_slack.load(std::memory_order_relaxed); 367 while (expected < 0) { 368 if (my_slack.compare_exchange_strong(expected, expected + 1)) { 369 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed); 370 my_asleep_list_root.store(&t, std::memory_order_relaxed); 371 return true; 372 } 373 } 374 375 return false; 376 } 377 378 void private_server::wake_some( int additional_slack ) { 379 __TBB_ASSERT( additional_slack>=0, nullptr ); 380 private_worker* wakee[2]; 381 private_worker**w = wakee; 382 383 if (additional_slack) { 384 // Contribute our unused slack to my_slack. 385 my_slack += additional_slack; 386 } 387 388 int allotted_slack = 0; 389 while (allotted_slack < 2) { 390 // Chain reaction; Try to claim unit of slack 391 int old = my_slack.load(std::memory_order_relaxed); 392 do { 393 if (old <= 0) goto done; 394 } while (!my_slack.compare_exchange_strong(old, old - 1)); 395 ++allotted_slack; 396 } 397 done: 398 399 if (allotted_slack) 400 { 401 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex); 402 403 while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 && allotted_slack) { 404 --allotted_slack; 405 // Pop sleeping worker to combine with claimed unit of slack 406 auto old = my_asleep_list_root.load(std::memory_order_relaxed); 407 my_asleep_list_root.store(old->my_next, std::memory_order_relaxed); 408 *w++ = old; 409 } 410 if(allotted_slack) { 411 // Contribute our unused slack to my_slack. 412 my_slack += allotted_slack; 413 } 414 } 415 while( w>wakee ) { 416 private_worker* ww = *--w; 417 ww->my_next = nullptr; 418 ww->wake_or_launch(); 419 } 420 } 421 422 void private_server::adjust_job_count_estimate( int delta ) { 423 #if TBB_USE_ASSERT 424 my_net_slack_requests+=delta; 425 #endif /* TBB_USE_ASSERT */ 426 if( delta<0 ) { 427 my_slack+=delta; 428 } else if( delta>0 ) { 429 wake_some( delta ); 430 } 431 } 432 433 //! Factory method called from task.cpp to create a private_server. 434 tbb_server* make_private_server( tbb_client& client ) { 435 return new( tbb::cache_aligned_allocator<private_server>().allocate(1) ) private_server(client); 436 } 437 438 } // namespace rml 439 } // namespace r1 440 } // namespace detail 441 } // namespace tbb 442 443