xref: /oneTBB/src/tbb/private_server.cpp (revision 0a521127)
151c0b2f7Stbbdev /*
2c21e688aSSergey Zheltov     Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1749e08aacStbbdev #include "oneapi/tbb/cache_aligned_allocator.h"
184523a761Stbbdev #include "oneapi/tbb/mutex.h"
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev #include "rml_tbb.h"
2151c0b2f7Stbbdev #include "rml_thread_monitor.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "scheduler_common.h"
2451c0b2f7Stbbdev #include "governor.h"
2551c0b2f7Stbbdev #include "misc.h"
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev #include <atomic>
2851c0b2f7Stbbdev 
2951c0b2f7Stbbdev 
3051c0b2f7Stbbdev namespace tbb {
3151c0b2f7Stbbdev namespace detail {
3251c0b2f7Stbbdev namespace r1 {
3351c0b2f7Stbbdev namespace rml {
3451c0b2f7Stbbdev 
3551c0b2f7Stbbdev using rml::internal::thread_monitor;
3651c0b2f7Stbbdev typedef thread_monitor::handle_type thread_handle;
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev class private_server;
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev class private_worker: no_copy {
4151c0b2f7Stbbdev private:
4251c0b2f7Stbbdev     //! State in finite-state machine that controls the worker.
4351c0b2f7Stbbdev     /** State diagram:
4451c0b2f7Stbbdev         init --> starting --> normal
4551c0b2f7Stbbdev           |         |           |
4651c0b2f7Stbbdev           |         V           |
4751c0b2f7Stbbdev           \------> quit <------/
4851c0b2f7Stbbdev       */
4951c0b2f7Stbbdev     enum state_t {
5051c0b2f7Stbbdev         //! *this is initialized
5151c0b2f7Stbbdev         st_init,
5251c0b2f7Stbbdev         //! *this has associated thread that is starting up.
5351c0b2f7Stbbdev         st_starting,
5451c0b2f7Stbbdev         //! Associated thread is doing normal life sequence.
5551c0b2f7Stbbdev         st_normal,
5651c0b2f7Stbbdev         //! Associated thread has ended normal life sequence and promises to never touch *this again.
5751c0b2f7Stbbdev         st_quit
5851c0b2f7Stbbdev     };
5951c0b2f7Stbbdev     std::atomic<state_t> my_state;
6051c0b2f7Stbbdev 
6151c0b2f7Stbbdev     //! Associated server
6251c0b2f7Stbbdev     private_server& my_server;
6351c0b2f7Stbbdev 
6451c0b2f7Stbbdev     //! Associated client
6551c0b2f7Stbbdev     tbb_client& my_client;
6651c0b2f7Stbbdev 
6751c0b2f7Stbbdev     //! index used for avoiding the 64K aliasing problem
6851c0b2f7Stbbdev     const std::size_t my_index;
6951c0b2f7Stbbdev 
7051c0b2f7Stbbdev     //! Monitor for sleeping when there is no work to do.
7151c0b2f7Stbbdev     /** The invariant that holds for sleeping workers is:
7251c0b2f7Stbbdev         "my_slack<=0 && my_state==st_normal && I am on server's list of asleep threads" */
7351c0b2f7Stbbdev     thread_monitor my_thread_monitor;
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev     //! Handle of the OS thread associated with this worker
7651c0b2f7Stbbdev     thread_handle my_handle;
7751c0b2f7Stbbdev 
7851c0b2f7Stbbdev     //! Link for list of workers that are sleeping or have no associated thread.
7951c0b2f7Stbbdev     private_worker* my_next;
8051c0b2f7Stbbdev 
8151c0b2f7Stbbdev     friend class private_server;
8251c0b2f7Stbbdev 
8351c0b2f7Stbbdev     //! Actions executed by the associated thread
8451c0b2f7Stbbdev     void run() noexcept;
8551c0b2f7Stbbdev 
8651c0b2f7Stbbdev     //! Wake up associated thread (or launch a thread if there is none)
8751c0b2f7Stbbdev     void wake_or_launch();
8851c0b2f7Stbbdev 
8951c0b2f7Stbbdev     //! Called by a thread (usually not the associated thread) to commence termination.
9051c0b2f7Stbbdev     void start_shutdown();
9151c0b2f7Stbbdev 
9251c0b2f7Stbbdev     static __RML_DECL_THREAD_ROUTINE thread_routine( void* arg );
9351c0b2f7Stbbdev 
9451c0b2f7Stbbdev     static void release_handle(thread_handle my_handle, bool join);
9551c0b2f7Stbbdev 
9651c0b2f7Stbbdev protected:
private_worker(private_server & server,tbb_client & client,const std::size_t i)9751c0b2f7Stbbdev     private_worker( private_server& server, tbb_client& client, const std::size_t i ) :
9851c0b2f7Stbbdev         my_state(st_init), my_server(server), my_client(client), my_index(i),
99ba947f18SIlya Isaev         my_handle(), my_next()
10051c0b2f7Stbbdev     {}
10151c0b2f7Stbbdev };
10251c0b2f7Stbbdev 
10351c0b2f7Stbbdev static const std::size_t cache_line_size = tbb::detail::max_nfs_size;
10451c0b2f7Stbbdev 
10551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
10651c0b2f7Stbbdev     // Suppress overzealous compiler warnings about uninstantiable class
10751c0b2f7Stbbdev     #pragma warning(push)
10851c0b2f7Stbbdev     #pragma warning(disable:4510 4610)
10951c0b2f7Stbbdev #endif
11051c0b2f7Stbbdev class padded_private_worker: public private_worker {
11151c0b2f7Stbbdev     char pad[cache_line_size - sizeof(private_worker)%cache_line_size];
11251c0b2f7Stbbdev public:
padded_private_worker(private_server & server,tbb_client & client,const std::size_t i)11351c0b2f7Stbbdev     padded_private_worker( private_server& server, tbb_client& client, const std::size_t i )
11451c0b2f7Stbbdev     : private_worker(server,client,i) { suppress_unused_warning(pad); }
11551c0b2f7Stbbdev };
11651c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
11751c0b2f7Stbbdev     #pragma warning(pop)
11851c0b2f7Stbbdev #endif
11951c0b2f7Stbbdev 
12051c0b2f7Stbbdev class private_server: public tbb_server, no_copy {
12151c0b2f7Stbbdev private:
12251c0b2f7Stbbdev     tbb_client& my_client;
12351c0b2f7Stbbdev     //! Maximum number of threads to be created.
12451c0b2f7Stbbdev     /** Threads are created lazily, so maximum might not actually be reached. */
12551c0b2f7Stbbdev     const tbb_client::size_type my_n_thread;
12651c0b2f7Stbbdev 
12751c0b2f7Stbbdev     //! Stack size for each thread. */
12851c0b2f7Stbbdev     const std::size_t my_stack_size;
12951c0b2f7Stbbdev 
13051c0b2f7Stbbdev     //! Number of jobs that could use their associated thread minus number of active threads.
13151c0b2f7Stbbdev     /** If negative, indicates oversubscription.
13251c0b2f7Stbbdev         If positive, indicates that more threads should run.
13351c0b2f7Stbbdev         Can be lowered asynchronously, but must be raised only while holding my_asleep_list_mutex,
13451c0b2f7Stbbdev         because raising it impacts the invariant for sleeping threads. */
13551c0b2f7Stbbdev     std::atomic<int> my_slack;
13651c0b2f7Stbbdev 
13751c0b2f7Stbbdev     //! Counter used to determine when to delete this.
13851c0b2f7Stbbdev     std::atomic<int> my_ref_count;
13951c0b2f7Stbbdev 
14051c0b2f7Stbbdev     padded_private_worker* my_thread_array;
14151c0b2f7Stbbdev 
14251c0b2f7Stbbdev     //! List of workers that are asleep or committed to sleeping until notified by another thread.
14351c0b2f7Stbbdev     std::atomic<private_worker*> my_asleep_list_root;
14451c0b2f7Stbbdev 
14551c0b2f7Stbbdev     //! Protects my_asleep_list_root
1464523a761Stbbdev     typedef mutex asleep_list_mutex_type;
14751c0b2f7Stbbdev     asleep_list_mutex_type my_asleep_list_mutex;
14851c0b2f7Stbbdev 
14951c0b2f7Stbbdev #if TBB_USE_ASSERT
15051c0b2f7Stbbdev     std::atomic<int> my_net_slack_requests;
15151c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
15251c0b2f7Stbbdev 
15351c0b2f7Stbbdev     //! Wake up to two sleeping workers, if there are any sleeping.
15451c0b2f7Stbbdev     /** The call is used to propagate a chain reaction where each thread wakes up two threads,
15551c0b2f7Stbbdev         which in turn each wake up two threads, etc. */
propagate_chain_reaction()15651c0b2f7Stbbdev     void propagate_chain_reaction() {
15751c0b2f7Stbbdev         // First test of a double-check idiom.  Second test is inside wake_some(0).
15831199975SPavel         if( my_asleep_list_root.load(std::memory_order_relaxed) )
15951c0b2f7Stbbdev             wake_some(0);
16051c0b2f7Stbbdev     }
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev     //! Try to add t to list of sleeping workers
16351c0b2f7Stbbdev     bool try_insert_in_asleep_list( private_worker& t );
16451c0b2f7Stbbdev 
16551c0b2f7Stbbdev     //! Equivalent of adding additional_slack to my_slack and waking up to 2 threads if my_slack permits.
16651c0b2f7Stbbdev     void wake_some( int additional_slack );
16751c0b2f7Stbbdev 
168ba947f18SIlya Isaev     ~private_server() override;
16951c0b2f7Stbbdev 
remove_server_ref()17051c0b2f7Stbbdev     void remove_server_ref() {
17151c0b2f7Stbbdev         if( --my_ref_count==0 ) {
17251c0b2f7Stbbdev             my_client.acknowledge_close_connection();
17351c0b2f7Stbbdev             this->~private_server();
17451c0b2f7Stbbdev             tbb::cache_aligned_allocator<private_server>().deallocate( this, 1 );
17551c0b2f7Stbbdev         }
17651c0b2f7Stbbdev     }
17751c0b2f7Stbbdev 
17851c0b2f7Stbbdev     friend class private_worker;
17951c0b2f7Stbbdev public:
18051c0b2f7Stbbdev     private_server( tbb_client& client );
18151c0b2f7Stbbdev 
version() const18251c0b2f7Stbbdev     version_type version() const override {
18351c0b2f7Stbbdev         return 0;
18451c0b2f7Stbbdev     }
18551c0b2f7Stbbdev 
request_close_connection(bool)18651c0b2f7Stbbdev     void request_close_connection( bool /*exiting*/ ) override {
18751c0b2f7Stbbdev         for( std::size_t i=0; i<my_n_thread; ++i )
18851c0b2f7Stbbdev             my_thread_array[i].start_shutdown();
18951c0b2f7Stbbdev         remove_server_ref();
19051c0b2f7Stbbdev     }
19151c0b2f7Stbbdev 
yield()19251c0b2f7Stbbdev     void yield() override { d0::yield(); }
19351c0b2f7Stbbdev 
independent_thread_number_changed(int)19457f524caSIlya Isaev     void independent_thread_number_changed( int ) override {__TBB_ASSERT(false, nullptr);}
19551c0b2f7Stbbdev 
default_concurrency() const19651c0b2f7Stbbdev     unsigned default_concurrency() const override { return governor::default_num_threads() - 1; }
19751c0b2f7Stbbdev 
19851c0b2f7Stbbdev     void adjust_job_count_estimate( int delta ) override;
19951c0b2f7Stbbdev 
20051c0b2f7Stbbdev #if _WIN32 || _WIN64
register_external_thread(::rml::server::execution_resource_t &)201b15aabb3Stbbdev     void register_external_thread ( ::rml::server::execution_resource_t& ) override {}
unregister_external_thread(::rml::server::execution_resource_t)202b15aabb3Stbbdev     void unregister_external_thread ( ::rml::server::execution_resource_t ) override {}
20351c0b2f7Stbbdev #endif /* _WIN32||_WIN64 */
20451c0b2f7Stbbdev };
20551c0b2f7Stbbdev 
20651c0b2f7Stbbdev //------------------------------------------------------------------------
20751c0b2f7Stbbdev // Methods of private_worker
20851c0b2f7Stbbdev //------------------------------------------------------------------------
20951c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
21051c0b2f7Stbbdev     // Suppress overzealous compiler warnings about an initialized variable 'sink_for_alloca' not referenced
21151c0b2f7Stbbdev     #pragma warning(push)
21251c0b2f7Stbbdev     #pragma warning(disable:4189)
21351c0b2f7Stbbdev #endif
21451c0b2f7Stbbdev #if __MINGW32__ && __GNUC__==4 &&__GNUC_MINOR__>=2 && !__MINGW64__
21551c0b2f7Stbbdev // ensure that stack is properly aligned for TBB threads
21651c0b2f7Stbbdev __attribute__((force_align_arg_pointer))
21751c0b2f7Stbbdev #endif
thread_routine(void * arg)21851c0b2f7Stbbdev __RML_DECL_THREAD_ROUTINE private_worker::thread_routine( void* arg ) {
21951c0b2f7Stbbdev     private_worker* self = static_cast<private_worker*>(arg);
22051c0b2f7Stbbdev     AVOID_64K_ALIASING( self->my_index );
22151c0b2f7Stbbdev     self->run();
22257f524caSIlya Isaev     // return 0 instead of nullptr due to the difference in the type __RML_DECL_THREAD_ROUTINE on various OSs
22351c0b2f7Stbbdev     return 0;
22451c0b2f7Stbbdev }
22551c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
22651c0b2f7Stbbdev     #pragma warning(pop)
22751c0b2f7Stbbdev #endif
22851c0b2f7Stbbdev 
release_handle(thread_handle handle,bool join)22951c0b2f7Stbbdev void private_worker::release_handle(thread_handle handle, bool join) {
23051c0b2f7Stbbdev     if (join)
23151c0b2f7Stbbdev         thread_monitor::join(handle);
23251c0b2f7Stbbdev     else
23351c0b2f7Stbbdev         thread_monitor::detach_thread(handle);
23451c0b2f7Stbbdev }
23551c0b2f7Stbbdev 
start_shutdown()23651c0b2f7Stbbdev void private_worker::start_shutdown() {
23718e06f7fSAlex     __TBB_ASSERT(my_state.load(std::memory_order_relaxed) != st_quit, "The quit state is expected to be set only once");
23851c0b2f7Stbbdev 
23918e06f7fSAlex     // `acq` to acquire my_handle
24018e06f7fSAlex     // `rel` to release market state
24118e06f7fSAlex     state_t prev_state = my_state.exchange(st_quit, std::memory_order_acq_rel);
24218e06f7fSAlex 
24318e06f7fSAlex     if (prev_state == st_init) {
24418e06f7fSAlex         // Perform action that otherwise would be performed by associated thread when it quits.
24518e06f7fSAlex         my_server.remove_server_ref();
24618e06f7fSAlex     } else {
24718e06f7fSAlex         __TBB_ASSERT(prev_state == st_normal || prev_state == st_starting, nullptr);
24851c0b2f7Stbbdev         // May have invalidated invariant for sleeping, so wake up the thread.
24951c0b2f7Stbbdev         // Note that the notify() here occurs without maintaining invariants for my_slack.
25051c0b2f7Stbbdev         // It does not matter, because my_state==st_quit overrides checking of my_slack.
25151c0b2f7Stbbdev         my_thread_monitor.notify();
25251c0b2f7Stbbdev         // Do not need release handle in st_init state,
25351c0b2f7Stbbdev         // because in this case the thread wasn't started yet.
25451c0b2f7Stbbdev         // For st_starting release is done at launch site.
25518e06f7fSAlex         if (prev_state == st_normal)
25651c0b2f7Stbbdev             release_handle(my_handle, governor::does_client_join_workers(my_client));
25751c0b2f7Stbbdev     }
25851c0b2f7Stbbdev }
25951c0b2f7Stbbdev 
run()26051c0b2f7Stbbdev void private_worker::run() noexcept {
26151c0b2f7Stbbdev     my_server.propagate_chain_reaction();
26251c0b2f7Stbbdev 
26351c0b2f7Stbbdev     // Transiting to st_normal here would require setting my_handle,
26451c0b2f7Stbbdev     // which would create race with the launching thread and
26551c0b2f7Stbbdev     // complications in handle management on Windows.
26651c0b2f7Stbbdev 
26751c0b2f7Stbbdev     ::rml::job& j = *my_client.create_one_job();
26818e06f7fSAlex     // memory_order_seq_cst to be strictly ordered after thread_monitor::wait on the next iteration
26918e06f7fSAlex     while( my_state.load(std::memory_order_seq_cst)!=st_quit ) {
27051c0b2f7Stbbdev         if( my_server.my_slack.load(std::memory_order_acquire)>=0 ) {
27151c0b2f7Stbbdev             my_client.process(j);
27218e06f7fSAlex         } else if( my_server.try_insert_in_asleep_list(*this) ) {
27318e06f7fSAlex             my_thread_monitor.wait();
27418e06f7fSAlex             __TBB_ASSERT(my_state.load(std::memory_order_relaxed) == st_quit || !my_next, "Thread monitor missed a spurious wakeup?" );
27551c0b2f7Stbbdev             my_server.propagate_chain_reaction();
27651c0b2f7Stbbdev         }
27751c0b2f7Stbbdev     }
27851c0b2f7Stbbdev     my_client.cleanup(j);
27951c0b2f7Stbbdev 
28051c0b2f7Stbbdev     ++my_server.my_slack;
28151c0b2f7Stbbdev     my_server.remove_server_ref();
28251c0b2f7Stbbdev }
28351c0b2f7Stbbdev 
wake_or_launch()28451c0b2f7Stbbdev inline void private_worker::wake_or_launch() {
28518e06f7fSAlex     state_t state = my_state.load(std::memory_order_relaxed);
28618e06f7fSAlex 
28718e06f7fSAlex     switch (state) {
28818e06f7fSAlex     case st_starting:
28918e06f7fSAlex         __TBB_fallthrough;
29018e06f7fSAlex     case st_normal:
29118e06f7fSAlex         __TBB_ASSERT(!my_next, "Should not wake a thread while it's still in asleep list");
29218e06f7fSAlex         my_thread_monitor.notify();
29318e06f7fSAlex         break;
29418e06f7fSAlex     case st_init:
29518e06f7fSAlex         if (my_state.compare_exchange_strong(state, st_starting)) {
29651c0b2f7Stbbdev             // after this point, remove_server_ref() must be done by created thread
29751c0b2f7Stbbdev #if __TBB_USE_WINAPI
298*0a521127SAnuya Welling             // Win thread_monitor::launch is designed on the assumption that the workers thread id go from 1 to Hard limit set by TBB market::global_market
299*0a521127SAnuya Welling             const std::size_t worker_idx = my_server.my_n_thread - this->my_index;
300*0a521127SAnuya Welling             my_handle = thread_monitor::launch(thread_routine, this, my_server.my_stack_size, &worker_idx);
30151c0b2f7Stbbdev #elif __TBB_USE_POSIX
30251c0b2f7Stbbdev             {
30351c0b2f7Stbbdev                 affinity_helper fpa;
30451c0b2f7Stbbdev                 fpa.protect_affinity_mask( /*restore_process_mask=*/true);
30551c0b2f7Stbbdev                 my_handle = thread_monitor::launch(thread_routine, this, my_server.my_stack_size);
30651c0b2f7Stbbdev                 // Implicit destruction of fpa resets original affinity mask.
30751c0b2f7Stbbdev             }
30851c0b2f7Stbbdev #endif /* __TBB_USE_POSIX */
30918e06f7fSAlex             state = st_starting;
31018e06f7fSAlex             if (!my_state.compare_exchange_strong(state, st_normal)) {
31151c0b2f7Stbbdev                 // Do shutdown during startup. my_handle can't be released
31251c0b2f7Stbbdev                 // by start_shutdown, because my_handle value might be not set yet
31351c0b2f7Stbbdev                 // at time of transition from st_starting to st_quit.
31418e06f7fSAlex                 __TBB_ASSERT(state == st_quit, nullptr);
31551c0b2f7Stbbdev                 release_handle(my_handle, governor::does_client_join_workers(my_client));
31651c0b2f7Stbbdev             }
31751c0b2f7Stbbdev         }
31818e06f7fSAlex         break;
31918e06f7fSAlex     default:
32018e06f7fSAlex         __TBB_ASSERT(state == st_quit, nullptr);
32151c0b2f7Stbbdev     }
32251c0b2f7Stbbdev }
32351c0b2f7Stbbdev 
32451c0b2f7Stbbdev //------------------------------------------------------------------------
32551c0b2f7Stbbdev // Methods of private_server
32651c0b2f7Stbbdev //------------------------------------------------------------------------
private_server(tbb_client & client)32751c0b2f7Stbbdev private_server::private_server( tbb_client& client ) :
32851c0b2f7Stbbdev     my_client(client),
32951c0b2f7Stbbdev     my_n_thread(client.max_job_count()),
33051c0b2f7Stbbdev     my_stack_size(client.min_stack_size()),
33151c0b2f7Stbbdev     my_slack(0),
33251c0b2f7Stbbdev     my_ref_count(my_n_thread+1),
33357f524caSIlya Isaev     my_thread_array(nullptr),
33457f524caSIlya Isaev     my_asleep_list_root(nullptr)
33551c0b2f7Stbbdev #if TBB_USE_ASSERT
33651c0b2f7Stbbdev     , my_net_slack_requests(0)
33751c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
33851c0b2f7Stbbdev {
33951c0b2f7Stbbdev     my_thread_array = tbb::cache_aligned_allocator<padded_private_worker>().allocate( my_n_thread );
34051c0b2f7Stbbdev     for( std::size_t i=0; i<my_n_thread; ++i ) {
34151c0b2f7Stbbdev         private_worker* t = new( &my_thread_array[i] ) padded_private_worker( *this, client, i );
34231199975SPavel         t->my_next = my_asleep_list_root.load(std::memory_order_relaxed);
34331199975SPavel         my_asleep_list_root.store(t, std::memory_order_relaxed);
34451c0b2f7Stbbdev     }
34551c0b2f7Stbbdev }
34651c0b2f7Stbbdev 
~private_server()34751c0b2f7Stbbdev private_server::~private_server() {
34857f524caSIlya Isaev     __TBB_ASSERT( my_net_slack_requests==0, nullptr);
34951c0b2f7Stbbdev     for( std::size_t i=my_n_thread; i--; )
35051c0b2f7Stbbdev         my_thread_array[i].~padded_private_worker();
35151c0b2f7Stbbdev     tbb::cache_aligned_allocator<padded_private_worker>().deallocate( my_thread_array, my_n_thread );
35251c0b2f7Stbbdev     tbb::detail::poison_pointer( my_thread_array );
35351c0b2f7Stbbdev }
35451c0b2f7Stbbdev 
try_insert_in_asleep_list(private_worker & t)35551c0b2f7Stbbdev inline bool private_server::try_insert_in_asleep_list( private_worker& t ) {
35651c0b2f7Stbbdev     asleep_list_mutex_type::scoped_lock lock;
35751c0b2f7Stbbdev     if( !lock.try_acquire(my_asleep_list_mutex) )
35851c0b2f7Stbbdev         return false;
35951c0b2f7Stbbdev     // Contribute to slack under lock so that if another takes that unit of slack,
36051c0b2f7Stbbdev     // it sees us sleeping on the list and wakes us up.
36131199975SPavel     auto expected = my_slack.load(std::memory_order_relaxed);
36231199975SPavel     while (expected < 0) {
36331199975SPavel         if (my_slack.compare_exchange_strong(expected, expected + 1)) {
36431199975SPavel             t.my_next = my_asleep_list_root.load(std::memory_order_relaxed);
36531199975SPavel             my_asleep_list_root.store(&t, std::memory_order_relaxed);
36651c0b2f7Stbbdev             return true;
36751c0b2f7Stbbdev         }
36851c0b2f7Stbbdev     }
36951c0b2f7Stbbdev 
37031199975SPavel     return false;
37131199975SPavel }
37231199975SPavel 
wake_some(int additional_slack)37351c0b2f7Stbbdev void private_server::wake_some( int additional_slack ) {
37431199975SPavel     __TBB_ASSERT( additional_slack>=0, nullptr );
37551c0b2f7Stbbdev     private_worker* wakee[2];
37651c0b2f7Stbbdev     private_worker**w = wakee;
37731199975SPavel 
37831199975SPavel     if (additional_slack) {
37931199975SPavel         // Contribute our unused slack to my_slack.
38031199975SPavel         my_slack += additional_slack;
38131199975SPavel     }
38231199975SPavel 
38331199975SPavel     int allotted_slack = 0;
38431199975SPavel     while (allotted_slack < 2) {
38551c0b2f7Stbbdev         // Chain reaction; Try to claim unit of slack
38631199975SPavel         int old = my_slack.load(std::memory_order_relaxed);
38751c0b2f7Stbbdev         do {
38851c0b2f7Stbbdev             if (old <= 0) goto done;
38951c0b2f7Stbbdev         } while (!my_slack.compare_exchange_strong(old, old - 1));
39031199975SPavel         ++allotted_slack;
39151c0b2f7Stbbdev     }
39231199975SPavel done:
39331199975SPavel 
39418e06f7fSAlex     if (allotted_slack) {
39531199975SPavel         asleep_list_mutex_type::scoped_lock lock(my_asleep_list_mutex);
39618e06f7fSAlex         auto root = my_asleep_list_root.load(std::memory_order_relaxed);
39718e06f7fSAlex         while( root && w<wakee+2 && allotted_slack) {
39831199975SPavel             --allotted_slack;
39951c0b2f7Stbbdev             // Pop sleeping worker to combine with claimed unit of slack
40018e06f7fSAlex             *w++ = root;
40118e06f7fSAlex             root = root->my_next;
40251c0b2f7Stbbdev         }
40318e06f7fSAlex         my_asleep_list_root.store(root, std::memory_order_relaxed);
40431199975SPavel         if(allotted_slack) {
40551c0b2f7Stbbdev             // Contribute our unused slack to my_slack.
40631199975SPavel             my_slack += allotted_slack;
40751c0b2f7Stbbdev         }
40851c0b2f7Stbbdev     }
40951c0b2f7Stbbdev     while( w>wakee ) {
41051c0b2f7Stbbdev         private_worker* ww = *--w;
41131199975SPavel         ww->my_next = nullptr;
41251c0b2f7Stbbdev         ww->wake_or_launch();
41351c0b2f7Stbbdev     }
41451c0b2f7Stbbdev }
41551c0b2f7Stbbdev 
adjust_job_count_estimate(int delta)41651c0b2f7Stbbdev void private_server::adjust_job_count_estimate( int delta ) {
41751c0b2f7Stbbdev #if TBB_USE_ASSERT
41851c0b2f7Stbbdev     my_net_slack_requests+=delta;
41951c0b2f7Stbbdev #endif /* TBB_USE_ASSERT */
42051c0b2f7Stbbdev     if( delta<0 ) {
42151c0b2f7Stbbdev         my_slack+=delta;
42251c0b2f7Stbbdev     } else if( delta>0 ) {
42351c0b2f7Stbbdev         wake_some( delta );
42451c0b2f7Stbbdev     }
42551c0b2f7Stbbdev }
42651c0b2f7Stbbdev 
42751c0b2f7Stbbdev //! Factory method called from task.cpp to create a private_server.
make_private_server(tbb_client & client)42851c0b2f7Stbbdev tbb_server* make_private_server( tbb_client& client ) {
42951c0b2f7Stbbdev     return new( tbb::cache_aligned_allocator<private_server>().allocate(1) ) private_server(client);
43051c0b2f7Stbbdev }
43151c0b2f7Stbbdev 
43251c0b2f7Stbbdev } // namespace rml
43351c0b2f7Stbbdev } // namespace r1
43451c0b2f7Stbbdev } // namespace detail
43551c0b2f7Stbbdev } // namespace tbb
43651c0b2f7Stbbdev 
437