xref: /oneTBB/src/tbb/private_server.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "oneapi/tbb/cache_aligned_allocator.h"
18 
19 #include "rml_tbb.h"
20 #include "rml_thread_monitor.h"
21 
22 #include "scheduler_common.h"
23 #include "governor.h"
24 #include "misc.h"
25 
26 #include <atomic>
27 
28 
29 namespace tbb {
30 namespace detail {
31 namespace r1 {
32 namespace rml {
33 
34 using rml::internal::thread_monitor;
35 typedef thread_monitor::handle_type thread_handle;
36 
37 class private_server;
38 
39 class private_worker: no_copy {
40 private:
41     //! State in finite-state machine that controls the worker.
42     /** State diagram:
43         init --> starting --> normal
44           |         |           |
45           |         V           |
46           \------> quit <------/
47       */
48     enum state_t {
49         //! *this is initialized
50         st_init,
51         //! *this has associated thread that is starting up.
52         st_starting,
53         //! Associated thread is doing normal life sequence.
54         st_normal,
55         //! Associated thread has ended normal life sequence and promises to never touch *this again.
56         st_quit
57     };
58     std::atomic<state_t> my_state;
59 
60     //! Associated server
61     private_server& my_server;
62 
63     //! Associated client
64     tbb_client& my_client;
65 
66     //! index used for avoiding the 64K aliasing problem
67     const std::size_t my_index;
68 
69     //! Monitor for sleeping when there is no work to do.
70     /** The invariant that holds for sleeping workers is:
71         "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
72     thread_monitor my_thread_monitor;
73 
74     //! Handle of the OS thread associated with this worker
75     thread_handle my_handle;
76 
77     //! Link for list of workers that are sleeping or have no associated thread.
78     private_worker* my_next;
79 
80     friend class private_server;
81 
82     //! Actions executed by the associated thread
83     void run() noexcept;
84 
85     //! Wake up associated thread (or launch a thread if there is none)
86     void wake_or_launch();
87 
88     //! Called by a thread (usually not the associated thread) to commence termination.
89     void start_shutdown();
90 
91     static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
92 
93     static void release_handle(thread_handle my_handle, bool join);
94 
95 protected:
96     private_worker( private_server& server, tbb_client& client, const std::size_t i ) :
97         my_state(st_init), my_server(server), my_client(client), my_index(i),
98         my_thread_monitor(), my_handle(), my_next()
99     {}
100 };
101 
102 static const std::size_t cache_line_size = tbb::detail::max_nfs_size;
103 
104 #if _MSC_VER && !defined(__INTEL_COMPILER)
105     // Suppress overzealous compiler warnings about uninstantiable class
106     #pragma warning(push)
107     #pragma warning(disable:4510 4610)
108 #endif
109 class padded_private_worker: public private_worker {
110     char pad[cache_line_size - sizeof(private_worker)%cache_line_size];
111 public:
112     padded_private_worker( private_server& server, tbb_client& client, const std::size_t i )
113     : private_worker(server,client,i) { suppress_unused_warning(pad); }
114 };
115 #if _MSC_VER && !defined(__INTEL_COMPILER)
116     #pragma warning(pop)
117 #endif
118 
119 class private_server: public tbb_server, no_copy {
120 private:
121     tbb_client& my_client;
122     //! Maximum number of threads to be created.
123     /** Threads are created lazily, so maximum might not actually be reached. */
124     const tbb_client::size_type my_n_thread;
125 
126     //! Stack size for each thread. */
127     const std::size_t my_stack_size;
128 
129     //! Number of jobs that could use their associated thread minus number of active threads.
130     /** If negative, indicates oversubscription.
131         If positive, indicates that more threads should run.
132         Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
133         because raising it impacts the invariant for sleeping threads. */
134     std::atomic<int> my_slack;
135 
136     //! Counter used to determine when to delete this.
137     std::atomic<int> my_ref_count;
138 
139     padded_private_worker* my_thread_array;
140 
141     //! List of workers that are asleep or committed to sleeping until notified by another thread.
142     std::atomic<private_worker*> my_asleep_list_root;
143 
144     //! Protects my_asleep_list_root
145     typedef scheduler_mutex_type asleep_list_mutex_type;
146     asleep_list_mutex_type my_asleep_list_mutex;
147 
148 #if TBB_USE_ASSERT
149     std::atomic<int> my_net_slack_requests;
150 #endif /* TBB_USE_ASSERT */
151 
152     //! Wake up to two sleeping workers, if there are any sleeping.
153     /** The call is used to propagate a chain reaction where each thread wakes up two threads,
154         which in turn each wake up two threads, etc. */
155     void propagate_chain_reaction() {
156         // First test of a double-check idiom.  Second test is inside wake_some(0).
157         if( my_asleep_list_root.load(std::memory_order_acquire) )
158             wake_some(0);
159     }
160 
161     //! Try to add t to list of sleeping workers
162     bool try_insert_in_asleep_list( private_worker& t );
163 
164     //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
165     void wake_some( int additional_slack );
166 
167     virtual ~private_server();
168 
169     void remove_server_ref() {
170         if( --my_ref_count==0 ) {
171             my_client.acknowledge_close_connection();
172             this->~private_server();
173             tbb::cache_aligned_allocator<private_server>().deallocate( this, 1 );
174         }
175     }
176 
177     friend class private_worker;
178 public:
179     private_server( tbb_client& client );
180 
181     version_type version() const override {
182         return 0;
183     }
184 
185     void request_close_connection( bool /*exiting*/ ) override {
186         for( std::size_t i=0; i<my_n_thread; ++i )
187             my_thread_array[i].start_shutdown();
188         remove_server_ref();
189     }
190 
191     void yield() override { d0::yield(); }
192 
193     void independent_thread_number_changed( int ) override {__TBB_ASSERT(false,NULL);}
194 
195     unsigned default_concurrency() const override { return governor::default_num_threads() - 1; }
196 
197     void adjust_job_count_estimate( int delta ) override;
198 
199 #if _WIN32||_WIN64
200     void register_external_thread ( ::rml::server::execution_resource_t& ) override {}
201     void unregister_external_thread ( ::rml::server::execution_resource_t ) override {}
202 #endif /* _WIN32||_WIN64 */
203 };
204 
205 //------------------------------------------------------------------------
206 // Methods of private_worker
207 //------------------------------------------------------------------------
208 #if _MSC_VER && !defined(__INTEL_COMPILER)
209     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
210     #pragma warning(push)
211     #pragma warning(disable:4189)
212 #endif
213 #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
214 // ensure that stack is properly aligned for TBB threads
215 __attribute__((force_align_arg_pointer))
216 #endif
217 __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) {
218     private_worker* self = static_cast<private_worker*>(arg);
219     AVOID_64K_ALIASING( self->my_index );
220     self->run();
221     return 0;
222 }
223 #if _MSC_VER && !defined(__INTEL_COMPILER)
224     #pragma warning(pop)
225 #endif
226 
227 void private_worker::release_handle(thread_handle handle, bool join) {
228     if (join)
229         thread_monitor::join(handle);
230     else
231         thread_monitor::detach_thread(handle);
232 }
233 
234 void private_worker::start_shutdown() {
235     state_t expected_state = my_state.load(std::memory_order_acquire);
236     __TBB_ASSERT( expected_state!=st_quit, NULL );
237 
238     while( !my_state.compare_exchange_strong( expected_state, st_quit ) );
239 
240     if( expected_state==st_normal || expected_state==st_starting ) {
241         // May have invalidated invariant for sleeping, so wake up the thread.
242         // Note that the notify() here occurs without maintaining invariants for my_slack.
243         // It does not matter, because my_state==st_quit overrides checking of my_slack.
244         my_thread_monitor.notify();
245         // Do not need release handle in st_init state,
246         // because in this case the thread wasn't started yet.
247         // For st_starting release is done at launch site.
248         if (expected_state==st_normal)
249             release_handle(my_handle, governor::does_client_join_workers(my_client));
250     } else if( expected_state==st_init ) {
251         // Perform action that otherwise would be performed by associated thread when it quits.
252         my_server.remove_server_ref();
253     }
254 }
255 
256 void private_worker::run() noexcept {
257     my_server.propagate_chain_reaction();
258 
259     // Transiting to st_normal here would require setting my_handle,
260     // which would create race with the launching thread and
261     // complications in handle management on Windows.
262 
263     ::rml::job& j = *my_client.create_one_job();
264     while( my_state.load(std::memory_order_acquire)!=st_quit ) {
265         if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) {
266             my_client.process(j);
267         } else {
268             thread_monitor::cookie c;
269             // Prepare to wait
270             my_thread_monitor.prepare_wait(c);
271             // Check/set the invariant for sleeping
272             if( my_state.load(std::memory_order_acquire)!=st_quit && my_server.try_insert_in_asleep_list(*this) ) {
273                 my_thread_monitor.commit_wait(c);
274                 __TBB_ASSERT( my_state==st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
275                 my_server.propagate_chain_reaction();
276             } else {
277                 // Invariant broken
278                 my_thread_monitor.cancel_wait();
279             }
280         }
281     }
282     my_client.cleanup(j);
283 
284     ++my_server.my_slack;
285     my_server.remove_server_ref();
286 }
287 
288 inline void private_worker::wake_or_launch() {
289     state_t expected_state = st_init;
290     if( my_state.compare_exchange_strong( expected_state, st_starting ) ) {
291         // after this point, remove_server_ref() must be done by created thread
292 #if __TBB_USE_WINAPI
293         my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size, &this->my_index );
294 #elif __TBB_USE_POSIX
295         {
296         affinity_helper fpa;
297         fpa.protect_affinity_mask( /*restore_process_mask=*/true );
298         my_handle = thread_monitor::launch( thread_routine, this, my_server.my_stack_size );
299         // Implicit destruction of fpa resets original affinity mask.
300         }
301 #endif /* __TBB_USE_POSIX */
302         expected_state = st_starting;
303         if ( !my_state.compare_exchange_strong( expected_state, st_normal ) ) {
304             // Do shutdown during startup. my_handle can't be released
305             // by start_shutdown, because my_handle value might be not set yet
306             // at time of transition from st_starting to st_quit.
307             __TBB_ASSERT( expected_state==st_quit, NULL );
308             release_handle(my_handle, governor::does_client_join_workers(my_client));
309         }
310     }
311     else {
312         __TBB_ASSERT( !my_next, "Should not wake a thread while it's still in asleep list" );
313         my_thread_monitor.notify();
314     }
315 }
316 
317 //------------------------------------------------------------------------
318 // Methods of private_server
319 //------------------------------------------------------------------------
320 private_server::private_server( tbb_client& client ) :
321     my_client(client),
322     my_n_thread(client.max_job_count()),
323     my_stack_size(client.min_stack_size()),
324     my_slack(0),
325     my_ref_count(my_n_thread+1),
326     my_thread_array(NULL),
327     my_asleep_list_root(NULL)
328 #if TBB_USE_ASSERT
329     , my_net_slack_requests(0)
330 #endif /* TBB_USE_ASSERT */
331 {
332     my_thread_array = tbb::cache_aligned_allocator<padded_private_worker>().allocate( my_n_thread );
333     for( std::size_t i=0; i<my_n_thread; ++i ) {
334         private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i );
335         t->my_next = my_asleep_list_root.exchange(t, std::memory_order_relaxed);
336     }
337 }
338 
339 private_server::~private_server() {
340     __TBB_ASSERT( my_net_slack_requests==0, NULL );
341     for( std::size_t i=my_n_thread; i--; )
342         my_thread_array[i].~padded_private_worker();
343     tbb::cache_aligned_allocator<padded_private_worker>().deallocate( my_thread_array, my_n_thread );
344     tbb::detail::poison_pointer( my_thread_array );
345 }
346 
347 inline bool private_server::try_insert_in_asleep_list( private_worker& t ) {
348     asleep_list_mutex_type::scoped_lock lock;
349     if( !lock.try_acquire(my_asleep_list_mutex) )
350         return false;
351     // Contribute to slack under lock so that if another takes that unit of slack,
352     // it sees us sleeping on the list and wakes us up.
353     int k = ++my_slack;
354     if( k<=0 ) {
355         t.my_next = my_asleep_list_root.exchange(&t, std::memory_order_relaxed);
356         return true;
357     } else {
358         --my_slack;
359         return false;
360     }
361 }
362 
363 void private_server::wake_some( int additional_slack ) {
364     __TBB_ASSERT( additional_slack>=0, NULL );
365     private_worker* wakee[2];
366     private_worker**w = wakee;
367     {
368         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
369         while( my_asleep_list_root.load(std::memory_order_relaxed) && w<wakee+2 ) {
370             if( additional_slack>0 ) {
371                 // additional demand does not exceed surplus supply
372                 if ( additional_slack+my_slack.load(std::memory_order_acquire)<=0 )
373                     break;
374                 --additional_slack;
375             } else {
376                 // Chain reaction; Try to claim unit of slack
377                 int old = my_slack;
378                 do {
379                     if( old<=0 ) goto done;
380                 } while( !my_slack.compare_exchange_strong(old,old-1) );
381             }
382             // Pop sleeping worker to combine with claimed unit of slack
383             auto old = my_asleep_list_root.load(std::memory_order_relaxed);
384             my_asleep_list_root.store(old->my_next, std::memory_order_relaxed);
385             *w++ = old;
386         }
387         if( additional_slack ) {
388             // Contribute our unused slack to my_slack.
389             my_slack += additional_slack;
390         }
391     }
392 done:
393     while( w>wakee ) {
394         private_worker* ww = *--w;
395         ww->my_next = NULL;
396         ww->wake_or_launch();
397     }
398 }
399 
400 void private_server::adjust_job_count_estimate( int delta ) {
401 #if TBB_USE_ASSERT
402     my_net_slack_requests+=delta;
403 #endif /* TBB_USE_ASSERT */
404     if( delta<0 ) {
405         my_slack+=delta;
406     } else if( delta>0 ) {
407         wake_some( delta );
408     }
409 }
410 
411 //! Factory method called from task.cpp to create a private_server.
412 tbb_server* make_private_server( tbb_client& client ) {
413     return new( tbb::cache_aligned_allocator<private_server>().allocate(1) ) private_server(client);
414 }
415 
416 } // namespace rml
417 } // namespace r1
418 } // namespace detail
419 } // namespace tbb
420 
421