1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "oneapi/tbb/cache_aligned_allocator.h"
18 #include "oneapi/tbb/mutex.h"
19
20 #include "rml_tbb.h"
21 #include "rml_thread_monitor.h"
22
23 #include "scheduler_common.h"
24 #include "governor.h"
25 #include "misc.h"
26
27 #include <atomic>
28
29
30 namespace tbb {
31 namespace detail {
32 namespace r1 {
33 namespace rml {
34
35 using rml::internal::thread_monitor;
36 typedef thread_monitor::handle_type thread_handle;
37
38 class private_server;
39
40 class private_worker: no_copy {
41 private:
42 //! State in finite-state machine that controls the worker.
43 /** State diagram:
44 init --> starting --> normal
45 | | |
46 | V |
47 \------> quit <------/
48 */
49 enum state_t {
50 //! *this is initialized
51 st_init,
52 //! *this has associated thread that is starting up.
53 st_starting,
54 //! Associated thread is doing normal life sequence.
55 st_normal,
56 //! Associated thread has ended normal life sequence and promises to never touch *this again.
57 st_quit
58 };
59 std::atomic<state_t> my_state;
60
61 //! Associated server
62 private_server& my_server;
63
64 //! Associated client
65 tbb_client& my_client;
66
67 //! index used for avoiding the 64K aliasing problem
68 const std::size_t my_index;
69
70 //! Monitor for sleeping when there is no work to do.
71 /** The invariant that holds for sleeping workers is:
72 "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
73 thread_monitor my_thread_monitor;
74
75 //! Handle of the OS thread associated with this worker
76 thread_handle my_handle;
77
78 //! Link for list of workers that are sleeping or have no associated thread.
79 private_worker* my_next;
80
81 friend class private_server;
82
83 //! Actions executed by the associated thread
84 void run() noexcept;
85
86 //! Wake up associated thread (or launch a thread if there is none)
87 void wake_or_launch();
88
89 //! Called by a thread (usually not the associated thread) to commence termination.
90 void start_shutdown();
91
92 static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
93
94 static void release_handle(thread_handle my_handle, bool join);
95
96 protected:
private_worker(private_server & server,tbb_client & client,const std::size_t i)97 private_worker( private_server& server, tbb_client& client, const std::size_t i ) :
98 my_state(st_init), my_server(server), my_client(client), my_index(i),
99 my_handle(), my_next()
100 {}
101 };
102
103 static const std::size_t cache_line_size = tbb::detail::max_nfs_size;
104
105 #if _MSC_VER && !defined(__INTEL_COMPILER)
106 // Suppress overzealous compiler warnings about uninstantiable class
107 #pragma warning(push)
108 #pragma warning(disable:4510 4610)
109 #endif
110 class padded_private_worker: public private_worker {
111 char pad[cache_line_size - sizeof(private_worker)%cache_line_size];
112 public:
padded_private_worker(private_server & server,tbb_client & client,const std::size_t i)113 padded_private_worker( private_server& server, tbb_client& client, const std::size_t i )
114 : private_worker(server,client,i) { suppress_unused_warning(pad); }
115 };
116 #if _MSC_VER && !defined(__INTEL_COMPILER)
117 #pragma warning(pop)
118 #endif
119
120 class private_server: public tbb_server, no_copy {
121 private:
122 tbb_client& my_client;
123 //! Maximum number of threads to be created.
124 /** Threads are created lazily, so maximum might not actually be reached. */
125 const tbb_client::size_type my_n_thread;
126
127 //! Stack size for each thread. */
128 const std::size_t my_stack_size;
129
130 //! Number of jobs that could use their associated thread minus number of active threads.
131 /** If negative, indicates oversubscription.
132 If positive, indicates that more threads should run.
133 Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
134 because raising it impacts the invariant for sleeping threads. */
135 std::atomic<int> my_slack;
136
137 //! Counter used to determine when to delete this.
138 std::atomic<int> my_ref_count;
139
140 padded_private_worker* my_thread_array;
141
142 //! List of workers that are asleep or committed to sleeping until notified by another thread.
143 std::atomic<private_worker*> my_asleep_list_root;
144
145 //! Protects my_asleep_list_root
146 typedef mutex asleep_list_mutex_type;
147 asleep_list_mutex_type my_asleep_list_mutex;
148
149 #if TBB_USE_ASSERT
150 std::atomic<int> my_net_slack_requests;
151 #endif /* TBB_USE_ASSERT */
152
153 //! Wake up to two sleeping workers, if there are any sleeping.
154 /** The call is used to propagate a chain reaction where each thread wakes up two threads,
155 which in turn each wake up two threads, etc. */
propagate_chain_reaction()156 void propagate_chain_reaction() {
157 // First test of a double-check idiom. Second test is inside wake_some(0).
158 if( my_asleep_list_root.load(std::memory_order_relaxed) )
159 wake_some(0);
160 }
161
162 //! Try to add t to list of sleeping workers
163 bool try_insert_in_asleep_list( private_worker& t );
164
165 //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
166 void wake_some( int additional_slack );
167
168 ~private_server() override;
169
remove_server_ref()170 void remove_server_ref() {
171 if( --my_ref_count==0 ) {
172 my_client.acknowledge_close_connection();
173 this->~private_server();
174 tbb::cache_aligned_allocator<private_server>().deallocate( this, 1 );
175 }
176 }
177
178 friend class private_worker;
179 public:
180 private_server( tbb_client& client );
181
version() const182 version_type version() const override {
183 return 0;
184 }
185
request_close_connection(bool)186 void request_close_connection( bool /*exiting*/ ) override {
187 for( std::size_t i=0; i<my_n_thread; ++i )
188 my_thread_array[i].start_shutdown();
189 remove_server_ref();
190 }
191
yield()192 void yield() override { d0::yield(); }
193
independent_thread_number_changed(int)194 void independent_thread_number_changed( int ) override {__TBB_ASSERT(false, nullptr);}
195
default_concurrency() const196 unsigned default_concurrency() const override { return governor::default_num_threads() - 1; }
197
198 void adjust_job_count_estimate( int delta ) override;
199
200 #if _WIN32 || _WIN64
register_external_thread(::rml::server::execution_resource_t &)201 void register_external_thread ( ::rml::server::execution_resource_t& ) override {}
unregister_external_thread(::rml::server::execution_resource_t)202 void unregister_external_thread ( ::rml::server::execution_resource_t ) override {}
203 #endif /* _WIN32||_WIN64 */
204 };
205
206 //------------------------------------------------------------------------
207 // Methods of private_worker
208 //------------------------------------------------------------------------
209 #if _MSC_VER && !defined(__INTEL_COMPILER)
210 // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
211 #pragma warning(push)
212 #pragma warning(disable:4189)
213 #endif
214 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
215 // ensure that stack is properly aligned for TBB threads
216 __attribute__((force_align_arg_pointer))
217 #endif
thread_routine(void * arg)218 __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) {
219 private_worker* self = static_cast<private_worker*>(arg);
220 AVOID_64K_ALIASING( self->my_index );
221 self->run();
222 // return 0 instead of nullptr due to the difference in the type __RML_DECL_THREAD_ROUTINE on various OSs
223 return 0;
224 }
225 #if _MSC_VER && !defined(__INTEL_COMPILER)
226 #pragma warning(pop)
227 #endif
228
release_handle(thread_handle handle,bool join)229 void private_worker::release_handle(thread_handle handle, bool join) {
230 if (join)
231 thread_monitor::join(handle);
232 else
233 thread_monitor::detach_thread(handle);
234 }
235
start_shutdown()236 void private_worker::start_shutdown() {
237 __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != st_quit, "The quit state is expected to be set only once");
238
239 // `acq` to acquire my_handle
240 // `rel` to release market state
241 state_t prev_state = my_state.exchange(st_quit, std::memory_order_acq_rel);
242
243 if (prev_state == st_init) {
244 // Perform action that otherwise would be performed by associated thread when it quits.
245 my_server.remove_server_ref();
246 } else {
247 __TBB_ASSERT(prev_state == st_normal || prev_state == st_starting, nullptr);
248 // May have invalidated invariant for sleeping, so wake up the thread.
249 // Note that the notify() here occurs without maintaining invariants for my_slack.
250 // It does not matter, because my_state==st_quit overrides checking of my_slack.
251 my_thread_monitor.notify();
252 // Do not need release handle in st_init state,
253 // because in this case the thread wasn't started yet.
254 // For st_starting release is done at launch site.
255 if (prev_state == st_normal)
256 release_handle(my_handle, governor::does_client_join_workers(my_client));
257 }
258 }
259
run()260 void private_worker::run() noexcept {
261 my_server.propagate_chain_reaction();
262
263 // Transiting to st_normal here would require setting my_handle,
264 // which would create race with the launching thread and
265 // complications in handle management on Windows.
266
267 ::rml::job& j = *my_client.create_one_job();
268 // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
269 while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
270 if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) {
271 my_client.process(j);
272 } else if( my_server.try_insert_in_asleep_list(*this) ) {
273 my_thread_monitor.wait();
274 __TBB_ASSERT(my_state.load(std::memory_order_relaxed) == st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
275 my_server.propagate_chain_reaction();
276 }
277 }
278 my_client.cleanup(j);
279
280 ++my_server.my_slack;
281 my_server.remove_server_ref();
282 }
283
wake_or_launch()284 inline void private_worker::wake_or_launch() {
285 state_t state = my_state.load(std::memory_order_relaxed);
286
287 switch (state) {
288 case st_starting:
289 __TBB_fallthrough;
290 case st_normal:
291 __TBB_ASSERT(!my_next, "Should not wake a thread while it's still in asleep list");
292 my_thread_monitor.notify();
293 break;
294 case st_init:
295 if (my_state.compare_exchange_strong(state, st_starting)) {
296 // after this point, remove_server_ref() must be done by created thread
297 #if __TBB_USE_WINAPI
298 // Win thread_monitor::launch is designed on the assumption that the workers thread id go from 1 to Hard limit set by TBB market::global_market
299 const std::size_t worker_idx = my_server.my_n_thread - this->my_index;
300 my_handle = thread_monitor::launch(thread_routine, this, my_server.my_stack_size, &worker_idx);
301 #elif __TBB_USE_POSIX
302 {
303 affinity_helper fpa;
304 fpa.protect_affinity_mask( /*restore_process_mask=*/true);
305 my_handle = thread_monitor::launch(thread_routine, this, my_server.my_stack_size);
306 // Implicit destruction of fpa resets original affinity mask.
307 }
308 #endif /* __TBB_USE_POSIX */
309 state = st_starting;
310 if (!my_state.compare_exchange_strong(state, st_normal)) {
311 // Do shutdown during startup. my_handle can't be released
312 // by start_shutdown, because my_handle value might be not set yet
313 // at time of transition from st_starting to st_quit.
314 __TBB_ASSERT(state == st_quit, nullptr);
315 release_handle(my_handle, governor::does_client_join_workers(my_client));
316 }
317 }
318 break;
319 default:
320 __TBB_ASSERT(state == st_quit, nullptr);
321 }
322 }
323
324 //------------------------------------------------------------------------
325 // Methods of private_server
326 //------------------------------------------------------------------------
private_server(tbb_client & client)327 private_server::private_server( tbb_client& client ) :
328 my_client(client),
329 my_n_thread(client.max_job_count()),
330 my_stack_size(client.min_stack_size()),
331 my_slack(0),
332 my_ref_count(my_n_thread+1),
333 my_thread_array(nullptr),
334 my_asleep_list_root(nullptr)
335 #if TBB_USE_ASSERT
336 , my_net_slack_requests(0)
337 #endif /* TBB_USE_ASSERT */
338 {
339 my_thread_array = tbb::cache_aligned_allocator<padded_private_worker>().allocate( my_n_thread );
340 for( std::size_t i=0; i<my_n_thread; ++i ) {
341 private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i );
342 t->my_next = my_asleep_list_root.load(std::memory_order_relaxed);
343 my_asleep_list_root.store(t, std::memory_order_relaxed);
344 }
345 }
346
~private_server()347 private_server::~private_server() {
348 __TBB_ASSERT( my_net_slack_requests==0, nullptr);
349 for( std::size_t i=my_n_thread; i--; )
350 my_thread_array[i].~padded_private_worker();
351 tbb::cache_aligned_allocator<padded_private_worker>().deallocate( my_thread_array, my_n_thread );
352 tbb::detail::poison_pointer( my_thread_array );
353 }
354
try_insert_in_asleep_list(private_worker & t)355 inline bool private_server::try_insert_in_asleep_list( private_worker& t ) {
356 asleep_list_mutex_type::scoped_lock lock;
357 if( !lock.try_acquire(my_asleep_list_mutex) )
358 return false;
359 // Contribute to slack under lock so that if another takes that unit of slack,
360 // it sees us sleeping on the list and wakes us up.
361 auto expected = my_slack.load(std::memory_order_relaxed);
362 while (expected < 0) {
363 if (my_slack.compare_exchange_strong(expected, expected + 1)) {
364 t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
365 my_asleep_list_root.store(&t, std::memory_order_relaxed);
366 return true;
367 }
368 }
369
370 return false;
371 }
372
wake_some(int additional_slack)373 void private_server::wake_some( int additional_slack ) {
374 __TBB_ASSERT( additional_slack>=0, nullptr );
375 private_worker* wakee[2];
376 private_worker**w = wakee;
377
378 if (additional_slack) {
379 // Contribute our unused slack to my_slack.
380 my_slack += additional_slack;
381 }
382
383 int allotted_slack = 0;
384 while (allotted_slack < 2) {
385 // Chain reaction; Try to claim unit of slack
386 int old = my_slack.load(std::memory_order_relaxed);
387 do {
388 if (old <= 0) goto done;
389 } while (!my_slack.compare_exchange_strong(old, old - 1));
390 ++allotted_slack;
391 }
392 done:
393
394 if (allotted_slack) {
395 asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
396 auto root = my_asleep_list_root.load(std::memory_order_relaxed);
397 while( root && w<wakee+2 && allotted_slack) {
398 --allotted_slack;
399 // Pop sleeping worker to combine with claimed unit of slack
400 *w++ = root;
401 root = root->my_next;
402 }
403 my_asleep_list_root.store(root, std::memory_order_relaxed);
404 if(allotted_slack) {
405 // Contribute our unused slack to my_slack.
406 my_slack += allotted_slack;
407 }
408 }
409 while( w>wakee ) {
410 private_worker* ww = *--w;
411 ww->my_next = nullptr;
412 ww->wake_or_launch();
413 }
414 }
415
adjust_job_count_estimate(int delta)416 void private_server::adjust_job_count_estimate( int delta ) {
417 #if TBB_USE_ASSERT
418 my_net_slack_requests+=delta;
419 #endif /* TBB_USE_ASSERT */
420 if( delta<0 ) {
421 my_slack+=delta;
422 } else if( delta>0 ) {
423 wake_some( delta );
424 }
425 }
426
427 //! Factory method called from task.cpp to create a private_server.
make_private_server(tbb_client & client)428 tbb_server* make_private_server( tbb_client& client ) {
429 return new( tbb::cache_aligned_allocator<private_server>().allocate(1) ) private_server(client);
430 }
431
432 } // namespace rml
433 } // namespace r1
434 } // namespace detail
435 } // namespace tbb
436
437