xref: /oneTBB/src/tbb/thread_dispatcher.cpp (revision c4568449)
1*c4568449SPavel Kumbrasev /*
2*c4568449SPavel Kumbrasev     Copyright (c) 2022-2023 Intel Corporation
3*c4568449SPavel Kumbrasev 
4*c4568449SPavel Kumbrasev     Licensed under the Apache License, Version 2.0 (the "License");
5*c4568449SPavel Kumbrasev     you may not use this file except in compliance with the License.
6*c4568449SPavel Kumbrasev     You may obtain a copy of the License at
7*c4568449SPavel Kumbrasev 
8*c4568449SPavel Kumbrasev         http://www.apache.org/licenses/LICENSE-2.0
9*c4568449SPavel Kumbrasev 
10*c4568449SPavel Kumbrasev     Unless required by applicable law or agreed to in writing, software
11*c4568449SPavel Kumbrasev     distributed under the License is distributed on an "AS IS" BASIS,
12*c4568449SPavel Kumbrasev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*c4568449SPavel Kumbrasev     See the License for the specific language governing permissions and
14*c4568449SPavel Kumbrasev     limitations under the License.
15*c4568449SPavel Kumbrasev */
16*c4568449SPavel Kumbrasev 
17*c4568449SPavel Kumbrasev #include "thread_dispatcher.h"
18*c4568449SPavel Kumbrasev #include "threading_control.h"
19*c4568449SPavel Kumbrasev 
20*c4568449SPavel Kumbrasev namespace tbb {
21*c4568449SPavel Kumbrasev namespace detail {
22*c4568449SPavel Kumbrasev namespace r1 {
23*c4568449SPavel Kumbrasev 
thread_dispatcher(threading_control & tc,unsigned hard_limit,std::size_t stack_size)24*c4568449SPavel Kumbrasev thread_dispatcher::thread_dispatcher(threading_control& tc, unsigned hard_limit, std::size_t stack_size)
25*c4568449SPavel Kumbrasev     : my_threading_control(tc)
26*c4568449SPavel Kumbrasev     , my_num_workers_hard_limit(hard_limit)
27*c4568449SPavel Kumbrasev     , my_stack_size(stack_size)
28*c4568449SPavel Kumbrasev {
29*c4568449SPavel Kumbrasev     my_server = governor::create_rml_server( *this );
30*c4568449SPavel Kumbrasev     __TBB_ASSERT( my_server, "Failed to create RML server" );
31*c4568449SPavel Kumbrasev }
32*c4568449SPavel Kumbrasev 
~thread_dispatcher()33*c4568449SPavel Kumbrasev thread_dispatcher::~thread_dispatcher() {
34*c4568449SPavel Kumbrasev     poison_pointer(my_server);
35*c4568449SPavel Kumbrasev }
36*c4568449SPavel Kumbrasev 
select_next_client(thread_dispatcher_client * hint)37*c4568449SPavel Kumbrasev thread_dispatcher_client* thread_dispatcher::select_next_client(thread_dispatcher_client* hint) {
38*c4568449SPavel Kumbrasev     unsigned next_client_priority_level = num_priority_levels;
39*c4568449SPavel Kumbrasev     if (hint) {
40*c4568449SPavel Kumbrasev         next_client_priority_level = hint->priority_level();
41*c4568449SPavel Kumbrasev     }
42*c4568449SPavel Kumbrasev 
43*c4568449SPavel Kumbrasev     for (unsigned idx = 0; idx < next_client_priority_level; ++idx) {
44*c4568449SPavel Kumbrasev         if (!my_client_list[idx].empty()) {
45*c4568449SPavel Kumbrasev             return &*my_client_list[idx].begin();
46*c4568449SPavel Kumbrasev         }
47*c4568449SPavel Kumbrasev     }
48*c4568449SPavel Kumbrasev 
49*c4568449SPavel Kumbrasev     return hint;
50*c4568449SPavel Kumbrasev }
51*c4568449SPavel Kumbrasev 
create_client(arena & a)52*c4568449SPavel Kumbrasev thread_dispatcher_client* thread_dispatcher::create_client(arena& a) {
53*c4568449SPavel Kumbrasev     return new (cache_aligned_allocate(sizeof(thread_dispatcher_client))) thread_dispatcher_client(a, my_clients_aba_epoch);
54*c4568449SPavel Kumbrasev }
55*c4568449SPavel Kumbrasev 
56*c4568449SPavel Kumbrasev 
register_client(thread_dispatcher_client * client)57*c4568449SPavel Kumbrasev void thread_dispatcher::register_client(thread_dispatcher_client* client) {
58*c4568449SPavel Kumbrasev     client_list_mutex_type::scoped_lock lock(my_list_mutex);
59*c4568449SPavel Kumbrasev     insert_client(*client);
60*c4568449SPavel Kumbrasev }
61*c4568449SPavel Kumbrasev 
try_unregister_client(thread_dispatcher_client * client,std::uint64_t aba_epoch,unsigned priority)62*c4568449SPavel Kumbrasev bool thread_dispatcher::try_unregister_client(thread_dispatcher_client* client, std::uint64_t aba_epoch, unsigned priority) {
63*c4568449SPavel Kumbrasev     __TBB_ASSERT(client, nullptr);
64*c4568449SPavel Kumbrasev     // we hold reference to the server, so market cannot be destroyed at any moment here
65*c4568449SPavel Kumbrasev     __TBB_ASSERT(!is_poisoned(my_server), nullptr);
66*c4568449SPavel Kumbrasev     my_list_mutex.lock();
67*c4568449SPavel Kumbrasev     for (auto& it : my_client_list[priority]) {
68*c4568449SPavel Kumbrasev         if (client == &it) {
69*c4568449SPavel Kumbrasev             if (it.get_aba_epoch() == aba_epoch) {
70*c4568449SPavel Kumbrasev                 // Client is alive
71*c4568449SPavel Kumbrasev                 // Acquire my_references to sync with threads that just left the arena
72*c4568449SPavel Kumbrasev                 // Pay attention that references should be read before workers_requested because
73*c4568449SPavel Kumbrasev                 // if references is no zero some other thread might call adjust_demand and lead to
74*c4568449SPavel Kumbrasev                 // a race over workers_requested
75*c4568449SPavel Kumbrasev                 if (!client->references() && !client->has_request()) {
76*c4568449SPavel Kumbrasev                     // Client is abandoned. Destroy it.
77*c4568449SPavel Kumbrasev                     remove_client(*client);
78*c4568449SPavel Kumbrasev                     ++my_clients_aba_epoch;
79*c4568449SPavel Kumbrasev 
80*c4568449SPavel Kumbrasev                     my_list_mutex.unlock();
81*c4568449SPavel Kumbrasev                     destroy_client(client);
82*c4568449SPavel Kumbrasev 
83*c4568449SPavel Kumbrasev                     return true;
84*c4568449SPavel Kumbrasev                 }
85*c4568449SPavel Kumbrasev             }
86*c4568449SPavel Kumbrasev             break;
87*c4568449SPavel Kumbrasev         }
88*c4568449SPavel Kumbrasev     }
89*c4568449SPavel Kumbrasev     my_list_mutex.unlock();
90*c4568449SPavel Kumbrasev     return false;
91*c4568449SPavel Kumbrasev }
92*c4568449SPavel Kumbrasev 
destroy_client(thread_dispatcher_client * client)93*c4568449SPavel Kumbrasev void thread_dispatcher::destroy_client(thread_dispatcher_client* client) {
94*c4568449SPavel Kumbrasev     client->~thread_dispatcher_client();
95*c4568449SPavel Kumbrasev     cache_aligned_deallocate(client);
96*c4568449SPavel Kumbrasev }
97*c4568449SPavel Kumbrasev 
98*c4568449SPavel Kumbrasev // Should be called under lock
insert_client(thread_dispatcher_client & client)99*c4568449SPavel Kumbrasev void thread_dispatcher::insert_client(thread_dispatcher_client& client) {
100*c4568449SPavel Kumbrasev     __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr);
101*c4568449SPavel Kumbrasev     my_client_list[client.priority_level()].push_front(client);
102*c4568449SPavel Kumbrasev 
103*c4568449SPavel Kumbrasev     __TBB_ASSERT(!my_next_client || my_next_client->priority_level() < num_priority_levels, nullptr);
104*c4568449SPavel Kumbrasev     my_next_client = select_next_client(my_next_client);
105*c4568449SPavel Kumbrasev }
106*c4568449SPavel Kumbrasev 
107*c4568449SPavel Kumbrasev // Should be called under lock
remove_client(thread_dispatcher_client & client)108*c4568449SPavel Kumbrasev void thread_dispatcher::remove_client(thread_dispatcher_client& client) {
109*c4568449SPavel Kumbrasev     __TBB_ASSERT(client.priority_level() < num_priority_levels, nullptr);
110*c4568449SPavel Kumbrasev     my_client_list[client.priority_level()].remove(client);
111*c4568449SPavel Kumbrasev 
112*c4568449SPavel Kumbrasev     if (my_next_client == &client) {
113*c4568449SPavel Kumbrasev         my_next_client = nullptr;
114*c4568449SPavel Kumbrasev     }
115*c4568449SPavel Kumbrasev     my_next_client = select_next_client(my_next_client);
116*c4568449SPavel Kumbrasev }
117*c4568449SPavel Kumbrasev 
is_client_alive(thread_dispatcher_client * client)118*c4568449SPavel Kumbrasev bool thread_dispatcher::is_client_alive(thread_dispatcher_client* client) {
119*c4568449SPavel Kumbrasev     if (!client) {
120*c4568449SPavel Kumbrasev         return false;
121*c4568449SPavel Kumbrasev     }
122*c4568449SPavel Kumbrasev 
123*c4568449SPavel Kumbrasev     // Still cannot access internals of the client since the object itself might be destroyed.
124*c4568449SPavel Kumbrasev     for (auto& priority_list : my_client_list) {
125*c4568449SPavel Kumbrasev         for (auto& c : priority_list) {
126*c4568449SPavel Kumbrasev             if (client == &c) {
127*c4568449SPavel Kumbrasev                 return true;
128*c4568449SPavel Kumbrasev             }
129*c4568449SPavel Kumbrasev         }
130*c4568449SPavel Kumbrasev     }
131*c4568449SPavel Kumbrasev     return false;
132*c4568449SPavel Kumbrasev }
133*c4568449SPavel Kumbrasev 
client_in_need(client_list_type * clients,thread_dispatcher_client * hint)134*c4568449SPavel Kumbrasev thread_dispatcher_client* thread_dispatcher::client_in_need(client_list_type* clients, thread_dispatcher_client* hint) {
135*c4568449SPavel Kumbrasev     // TODO: make sure client with higher priority returned only if there are available slots in it.
136*c4568449SPavel Kumbrasev     hint = select_next_client(hint);
137*c4568449SPavel Kumbrasev     if (!hint) {
138*c4568449SPavel Kumbrasev         return nullptr;
139*c4568449SPavel Kumbrasev     }
140*c4568449SPavel Kumbrasev 
141*c4568449SPavel Kumbrasev     client_list_type::iterator it = hint;
142*c4568449SPavel Kumbrasev     unsigned curr_priority_level = hint->priority_level();
143*c4568449SPavel Kumbrasev     __TBB_ASSERT(it != clients[curr_priority_level].end(), nullptr);
144*c4568449SPavel Kumbrasev     do {
145*c4568449SPavel Kumbrasev         thread_dispatcher_client& t = *it;
146*c4568449SPavel Kumbrasev         if (++it == clients[curr_priority_level].end()) {
147*c4568449SPavel Kumbrasev             do {
148*c4568449SPavel Kumbrasev                 ++curr_priority_level %= num_priority_levels;
149*c4568449SPavel Kumbrasev             } while (clients[curr_priority_level].empty());
150*c4568449SPavel Kumbrasev             it = clients[curr_priority_level].begin();
151*c4568449SPavel Kumbrasev         }
152*c4568449SPavel Kumbrasev         if (t.try_join()) {
153*c4568449SPavel Kumbrasev             return &t;
154*c4568449SPavel Kumbrasev         }
155*c4568449SPavel Kumbrasev     } while (it != hint);
156*c4568449SPavel Kumbrasev     return nullptr;
157*c4568449SPavel Kumbrasev }
158*c4568449SPavel Kumbrasev 
client_in_need(thread_dispatcher_client * prev)159*c4568449SPavel Kumbrasev thread_dispatcher_client* thread_dispatcher::client_in_need(thread_dispatcher_client* prev) {
160*c4568449SPavel Kumbrasev     client_list_mutex_type::scoped_lock lock(my_list_mutex, /*is_writer=*/false);
161*c4568449SPavel Kumbrasev     if (is_client_alive(prev)) {
162*c4568449SPavel Kumbrasev         return client_in_need(my_client_list, prev);
163*c4568449SPavel Kumbrasev     }
164*c4568449SPavel Kumbrasev     return client_in_need(my_client_list, my_next_client);
165*c4568449SPavel Kumbrasev }
166*c4568449SPavel Kumbrasev 
adjust_job_count_estimate(int delta)167*c4568449SPavel Kumbrasev void thread_dispatcher::adjust_job_count_estimate(int delta) {
168*c4568449SPavel Kumbrasev     my_server->adjust_job_count_estimate(delta);
169*c4568449SPavel Kumbrasev }
170*c4568449SPavel Kumbrasev 
release(bool blocking_terminate)171*c4568449SPavel Kumbrasev void thread_dispatcher::release(bool blocking_terminate) {
172*c4568449SPavel Kumbrasev     my_join_workers = blocking_terminate;
173*c4568449SPavel Kumbrasev     my_server->request_close_connection();
174*c4568449SPavel Kumbrasev }
175*c4568449SPavel Kumbrasev 
process(job & j)176*c4568449SPavel Kumbrasev void thread_dispatcher::process(job& j) {
177*c4568449SPavel Kumbrasev     thread_data& td = static_cast<thread_data&>(j);
178*c4568449SPavel Kumbrasev     // td.my_last_client can be dead. Don't access it until client_in_need is called
179*c4568449SPavel Kumbrasev     thread_dispatcher_client* client = td.my_last_client;
180*c4568449SPavel Kumbrasev     for (int i = 0; i < 2; ++i) {
181*c4568449SPavel Kumbrasev         while ((client = client_in_need(client)) ) {
182*c4568449SPavel Kumbrasev             td.my_last_client = client;
183*c4568449SPavel Kumbrasev             client->process(td);
184*c4568449SPavel Kumbrasev         }
185*c4568449SPavel Kumbrasev         // Workers leave thread_dispatcher because there is no client in need. It can happen earlier than
186*c4568449SPavel Kumbrasev         // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
187*c4568449SPavel Kumbrasev         // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
188*c4568449SPavel Kumbrasev         // the yield refines this spinning.
189*c4568449SPavel Kumbrasev         if ( !i ) {
190*c4568449SPavel Kumbrasev             yield();
191*c4568449SPavel Kumbrasev         }
192*c4568449SPavel Kumbrasev     }
193*c4568449SPavel Kumbrasev }
194*c4568449SPavel Kumbrasev 
195*c4568449SPavel Kumbrasev 
196*c4568449SPavel Kumbrasev //! Used when RML asks for join mode during workers termination.
must_join_workers() const197*c4568449SPavel Kumbrasev bool thread_dispatcher::must_join_workers() const { return my_join_workers; }
198*c4568449SPavel Kumbrasev 
199*c4568449SPavel Kumbrasev //! Returns the requested stack size of worker threads.
worker_stack_size() const200*c4568449SPavel Kumbrasev std::size_t thread_dispatcher::worker_stack_size() const { return my_stack_size; }
201*c4568449SPavel Kumbrasev 
acknowledge_close_connection()202*c4568449SPavel Kumbrasev void thread_dispatcher::acknowledge_close_connection() {
203*c4568449SPavel Kumbrasev     my_threading_control.destroy();
204*c4568449SPavel Kumbrasev }
205*c4568449SPavel Kumbrasev 
create_one_job()206*c4568449SPavel Kumbrasev ::rml::job* thread_dispatcher::create_one_job() {
207*c4568449SPavel Kumbrasev     unsigned short index = ++my_first_unused_worker_idx;
208*c4568449SPavel Kumbrasev     __TBB_ASSERT(index > 0, nullptr);
209*c4568449SPavel Kumbrasev     ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
210*c4568449SPavel Kumbrasev     // index serves as a hint decreasing conflicts between workers when they migrate between arenas
211*c4568449SPavel Kumbrasev     thread_data* td = new (cache_aligned_allocate(sizeof(thread_data))) thread_data{ index, true };
212*c4568449SPavel Kumbrasev     __TBB_ASSERT(index <= my_num_workers_hard_limit, nullptr);
213*c4568449SPavel Kumbrasev     my_threading_control.register_thread(*td);
214*c4568449SPavel Kumbrasev     return td;
215*c4568449SPavel Kumbrasev }
216*c4568449SPavel Kumbrasev 
cleanup(job & j)217*c4568449SPavel Kumbrasev void thread_dispatcher::cleanup(job& j) {
218*c4568449SPavel Kumbrasev     my_threading_control.unregister_thread(static_cast<thread_data&>(j));
219*c4568449SPavel Kumbrasev     governor::auto_terminate(&j);
220*c4568449SPavel Kumbrasev }
221*c4568449SPavel Kumbrasev 
222*c4568449SPavel Kumbrasev } // namespace r1
223*c4568449SPavel Kumbrasev } // namespace detail
224*c4568449SPavel Kumbrasev } // namespace tbb
225