xref: /oneTBB/src/tbb/private_server.cpp (revision c21e688a)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/cache_aligned_allocator.h"
18 #include "oneapi/tbb/mutex.h"
19 
20 #include "rml_tbb.h"
21 #include "rml_thread_monitor.h"
22 
23 #include "scheduler_common.h"
24 #include "governor.h"
25 #include "misc.h"
26 
27 #include <atomic>
28 
29 
30 namespace tbb {
31 namespace detail {
32 namespace r1 {
33 namespace rml {
34 
35 using rml::internal::thread_monitor;
36 typedef thread_monitor::handle_type thread_handle;
37 
38 class private_server;
39 
40 class private_worker: no_copy {
41 private:
42     //! State in finite-state machine that controls the worker.
43     /** State diagram:
44         init --> starting --> normal
45           |         |           |
46           |         V           |
47           \------> quit <------/
48       */
49     enum state_t {
50         //! *this is initialized
51         st_init,
52         //! *this has associated thread that is starting up.
53         st_starting,
54         //! Associated thread is doing normal life sequence.
55         st_normal,
56         //! Associated thread has ended normal life sequence and promises to never touch *this again.
57         st_quit
58     };
59     std::atomic<state_t> my_state;
60 
61     //! Associated server
62     private_server& my_server;
63 
64     //! Associated client
65     tbb_client& my_client;
66 
67     //! index used for avoiding the 64K aliasing problem
68     const std::size_t my_index;
69 
70     //! Monitor for sleeping when there is no work to do.
71     /** The invariant that holds for sleeping workers is:
72         "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
73     thread_monitor my_thread_monitor;
74 
75     //! Handle of the OS thread associated with this worker
76     thread_handle my_handle;
77 
78     //! Link for list of workers that are sleeping or have no associated thread.
79     private_worker* my_next;
80 
81     friend class private_server;
82 
83     //! Actions executed by the associated thread
84     void run() noexcept;
85 
86     //! Wake up associated thread (or launch a thread if there is none)
87     void wake_or_launch();
88 
89     //! Called by a thread (usually not the associated thread) to commence termination.
90     void start_shutdown();
91 
92     static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
93 
94     static void release_handle(thread_handle my_handle, bool join);
95 
96 protected:
97     private_worker( private_server& server, tbb_client& client, const std::size_t i ) :
98         my_state(st_init), my_server(server), my_client(client), my_index(i),
99         my_handle(), my_next()
100     {}
101 };
102 
103 static const std::size_t cache_line_size = tbb::detail::max_nfs_size;
104 
105 #if _MSC_VER && !defined(__INTEL_COMPILER)
106     // Suppress overzealous compiler warnings about uninstantiable class
107     #pragma warning(push)
108     #pragma warning(disable:4510 4610)
109 #endif
110 class padded_private_worker: public private_worker {
111     char pad[cache_line_size - sizeof(private_worker)%cache_line_size];
112 public:
113     padded_private_worker( private_server& server, tbb_client& client, const std::size_t i )
114     : private_worker(server,client,i) { suppress_unused_warning(pad); }
115 };
116 #if _MSC_VER && !defined(__INTEL_COMPILER)
117     #pragma warning(pop)
118 #endif
119 
120 class private_server: public tbb_server, no_copy {
121 private:
122     tbb_client& my_client;
123     //! Maximum number of threads to be created.
124     /** Threads are created lazily, so maximum might not actually be reached. */
125     const tbb_client::size_type my_n_thread;
126 
127     //! Stack size for each thread. */
128     const std::size_t my_stack_size;
129 
130     //! Number of jobs that could use their associated thread minus number of active threads.
131     /** If negative, indicates oversubscription.
132         If positive, indicates that more threads should run.
133         Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
134         because raising it impacts the invariant for sleeping threads. */
135     std::atomic<int> my_slack;
136 
137     //! Counter used to determine when to delete this.
138     std::atomic<int> my_ref_count;
139 
140     padded_private_worker* my_thread_array;
141 
142     //! List of workers that are asleep or committed to sleeping until notified by another thread.
143     std::atomic<private_worker*> my_asleep_list_root;
144 
145     //! Protects my_asleep_list_root
146     typedef mutex asleep_list_mutex_type;
147     asleep_list_mutex_type my_asleep_list_mutex;
148 
149 #if TBB_USE_ASSERT
150     std::atomic<int> my_net_slack_requests;
151 #endif /* TBB_USE_ASSERT */
152 
153     //! Wake up to two sleeping workers, if there are any sleeping.
154     /** The call is used to propagate a chain reaction where each thread wakes up two threads,
155         which in turn each wake up two threads, etc. */
156     void propagate_chain_reaction() {
157         // First test of a double-check idiom.  Second test is inside wake_some(0).
158         if( my_asleep_list_root.load(std::memory_order_relaxed) )
159             wake_some(0);
160     }
161 
162     //! Try to add t to list of sleeping workers
163     bool try_insert_in_asleep_list( private_worker& t );
164 
165     //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
166     void wake_some( int additional_slack );
167 
168     ~private_server() override;
169 
170     void remove_server_ref() {
171         if( --my_ref_count==0 ) {
172             my_client.acknowledge_close_connection();
173             this->~private_server();
174             tbb::cache_aligned_allocator<private_server>().deallocate( this, 1 );
175         }
176     }
177 
178     friend class private_worker;
179 public:
180     private_server( tbb_client& client );
181 
182     version_type version() const override {
183         return 0;
184     }
185 
186     void request_close_connection( bool /*exiting*/ ) override {
187         for( std::size_t i=0; i<my_n_thread; ++i )
188             my_thread_array[i].start_shutdown();
189         remove_server_ref();
190     }
191 
192     void yield() override { d0::yield(); }
193 
194     void independent_thread_number_changed( int ) override {__TBB_ASSERT(false, nullptr);}
195 
196     unsigned default_concurrency() const override { return governor::default_num_threads() - 1; }
197 
198     void adjust_job_count_estimate( int delta ) override;
199 
200 #if _WIN32 || _WIN64
201     void register_external_thread ( ::rml::server::execution_resource_t& ) override {}
202     void unregister_external_thread ( ::rml::server::execution_resource_t ) override {}
203 #endif /* _WIN32||_WIN64 */
204 };
205 
206 //------------------------------------------------------------------------
207 // Methods of private_worker
208 //------------------------------------------------------------------------
209 #if _MSC_VER && !defined(__INTEL_COMPILER)
210     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
211     #pragma warning(push)
212     #pragma warning(disable:4189)
213 #endif
214 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
215 // ensure that stack is properly aligned for TBB threads
216 __attribute__((force_align_arg_pointer))
217 #endif
218 __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) {
219     private_worker* self = static_cast<private_worker*>(arg);
220     AVOID_64K_ALIASING( self->my_index );
221     self->run();
222     // return 0 instead of nullptr due to the difference in the type __RML_DECL_THREAD_ROUTINE on various OSs
223     return 0;
224 }
225 #if _MSC_VER && !defined(__INTEL_COMPILER)
226     #pragma warning(pop)
227 #endif
228 
229 void private_worker::release_handle(thread_handle handle, bool join) {
230     if (join)
231         thread_monitor::join(handle);
232     else
233         thread_monitor::detach_thread(handle);
234 }
235 
236 void private_worker::start_shutdown() {
237     // The state can be transferred only in one direction: st_init -> st_starting -> st_normal.
238     // So we do not need more than three CAS attempts.
239     state_t expected_state = my_state.load(std::memory_order_relaxed);
240     __TBB_ASSERT(expected_state != st_quit, "The quit state is expected to be set only once");
241     if (!my_state.compare_exchange_strong(expected_state, st_quit)) {
242         __TBB_ASSERT(expected_state == st_starting || expected_state == st_normal, "We failed once so the init state is not expected");
243         if (!my_state.compare_exchange_strong(expected_state, st_quit)) {
244             __TBB_ASSERT(expected_state == st_normal, "We failed twice so only the normal state is expected");
245             bool res = my_state.compare_exchange_strong(expected_state, st_quit);
246             __TBB_ASSERT_EX(res, "We cannot fail in the normal state");
247         }
248     }
249 
250     if( expected_state==st_normal || expected_state==st_starting ) {
251         // May have invalidated invariant for sleeping, so wake up the thread.
252         // Note that the notify() here occurs without maintaining invariants for my_slack.
253         // It does not matter, because my_state==st_quit overrides checking of my_slack.
254         my_thread_monitor.notify();
255         // Do not need release handle in st_init state,
256         // because in this case the thread wasn't started yet.
257         // For st_starting release is done at launch site.
258         if (expected_state==st_normal)
259             release_handle(my_handle, governor::does_client_join_workers(my_client));
260     } else if( expected_state==st_init ) {
261         // Perform action that otherwise would be performed by associated thread when it quits.
262         my_server.remove_server_ref();
263     }
264 }
265 
266 void private_worker::run() noexcept {
267     my_server.propagate_chain_reaction();
268 
269     // Transiting to st_normal here would require setting my_handle,
270     // which would create race with the launching thread and
271     // complications in handle management on Windows.
272 
273     ::rml::job& j = *my_client.create_one_job();
274     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
275         if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) {
276             my_client.process(j);
277         } else {
278             thread_monitor::cookie c;
279             // Prepare to wait
280             my_thread_monitor.prepare_wait(c);
281             // Check/set the invariant for sleeping
282             // We need memory_order_seq_cst to enforce ordering with prepare_wait
283             // (note that a store in prepare_wait should be with memory_order_seq_cst as well)
284             if( my_state.load(std::memory_order_seq_cst)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) {
285                 my_thread_monitor.commit_wait(c);
286                 __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
287                 my_server.propagate_chain_reaction();
288             } else {
289                 // Invariant broken
290                 my_thread_monitor.cancel_wait();
291             }
292         }
293     }
294     my_client.cleanup(j);
295 
296     ++my_server.my_slack;
297     my_server.remove_server_ref();
298 }
299 
300 inline void private_worker::wake_or_launch() {
301     state_t expected_state = st_init;
302     if( my_state.compare_exchange_strong( expected_state, st_starting ) ) {
303         // after this point, remove_server_ref() must be done by created thread
304 #if __TBB_USE_WINAPI
305         my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
306 #elif __TBB_USE_POSIX
307         {
308         affinity_helper fpa;
309         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
310         my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
311         // Implicit destruction of fpa resets original affinity mask.
312         }
313 #endif /* __TBB_USE_POSIX */
314         expected_state = st_starting;
315         if ( !my_state.compare_exchange_strong( expected_state, st_normal ) ) {
316             // Do shutdown during startup. my_handle can't be released
317             // by start_shutdown, because my_handle value might be not set yet
318             // at time of transition from st_starting to st_quit.
319             __TBB_ASSERT( expected_state==st_quit, nullptr);
320             release_handle(my_handle, governor::does_client_join_workers(my_client));
321         }
322     }
323     else {
324         __TBB_ASSERT( !my_next, "Should not wake a thread while it's still in asleep list" );
325         my_thread_monitor.notify();
326     }
327 }
328 
329 //------------------------------------------------------------------------
330 // Methods of private_server
331 //------------------------------------------------------------------------
332 private_server::private_server( tbb_client& client ) :
333     my_client(client),
334     my_n_thread(client.max_job_count()),
335     my_stack_size(client.min_stack_size()),
336     my_slack(0),
337     my_ref_count(my_n_thread+1),
338     my_thread_array(nullptr),
339     my_asleep_list_root(nullptr)
340 #if TBB_USE_ASSERT
341     , my_net_slack_requests(0)
342 #endif /* TBB_USE_ASSERT */
343 {
344     my_thread_array = tbb::cache_aligned_allocator<padded_private_worker>().allocate( my_n_thread );
345     for( std::size_t i=0; i<my_n_thread; ++i ) {
346         private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i );
347         t->my_next = my_asleep_list_root.load(std::memory_order_relaxed);
348         my_asleep_list_root.store(t, std::memory_order_relaxed);
349     }
350 }
351 
352 private_server::~private_server() {
353     __TBB_ASSERT( my_net_slack_requests==0, nullptr);
354     for( std::size_t i=my_n_thread; i--; )
355         my_thread_array[i].~padded_private_worker();
356     tbb::cache_aligned_allocator<padded_private_worker>().deallocate( my_thread_array, my_n_thread );
357     tbb::detail::poison_pointer( my_thread_array );
358 }
359 
360 inline bool private_server::try_insert_in_asleep_list( private_worker& t ) {
361     asleep_list_mutex_type::scoped_lock lock;
362     if( !lock.try_acquire(my_asleep_list_mutex) )
363         return false;
364     // Contribute to slack under lock so that if another takes that unit of slack,
365     // it sees us sleeping on the list and wakes us up.
366     auto expected = my_slack.load(std::memory_order_relaxed);
367     while (expected < 0) {
368         if (my_slack.compare_exchange_strong(expected, expected + 1)) {
369             t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
370             my_asleep_list_root.store(&t, std::memory_order_relaxed);
371             return true;
372         }
373     }
374 
375     return false;
376 }
377 
378 void private_server::wake_some( int additional_slack ) {
379     __TBB_ASSERT( additional_slack>=0, nullptr );
380     private_worker* wakee[2];
381     private_worker**w = wakee;
382 
383     if (additional_slack) {
384         // Contribute our unused slack to my_slack.
385         my_slack += additional_slack;
386     }
387 
388     int allotted_slack = 0;
389     while (allotted_slack < 2) {
390         // Chain reaction; Try to claim unit of slack
391         int old = my_slack.load(std::memory_order_relaxed);
392         do {
393             if (old <= 0) goto done;
394         } while (!my_slack.compare_exchange_strong(old, old - 1));
395         ++allotted_slack;
396     }
397 done:
398 
399     if (allotted_slack)
400     {
401         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
402 
403         while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 && allotted_slack) {
404             --allotted_slack;
405             // Pop sleeping worker to combine with claimed unit of slack
406             auto old = my_asleep_list_root.load(std::memory_order_relaxed);
407             my_asleep_list_root.store(old->my_next, std::memory_order_relaxed);
408             *w++ = old;
409         }
410         if(allotted_slack) {
411             // Contribute our unused slack to my_slack.
412             my_slack += allotted_slack;
413         }
414     }
415     while( w>wakee ) {
416         private_worker* ww = *--w;
417         ww->my_next = nullptr;
418         ww->wake_or_launch();
419     }
420 }
421 
422 void private_server::adjust_job_count_estimate( int delta ) {
423 #if TBB_USE_ASSERT
424     my_net_slack_requests+=delta;
425 #endif /* TBB_USE_ASSERT */
426     if( delta<0 ) {
427         my_slack+=delta;
428     } else if( delta>0 ) {
429         wake_some( delta );
430     }
431 }
432 
433 //! Factory method called from task.cpp to create a private_server.
434 tbb_server* make_private_server( tbb_client& client ) {
435     return new( tbb::cache_aligned_allocator<private_server>().allocate(1) ) private_server(client);
436 }
437 
438 } // namespace rml
439 } // namespace r1
440 } // namespace detail
441 } // namespace tbb
442 
443