1 /*
2 Copyright (c) 2022-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "threading_control.h"
18 #include "permit_manager.h"
19 #include "market.h"
20 #include "tcm_adaptor.h"
21 #include "thread_dispatcher.h"
22 #include "governor.h"
23 #include "thread_dispatcher_client.h"
24
25 namespace tbb {
26 namespace detail {
27 namespace r1 {
28
29 // ---------------------------------------- threading_control_impl --------------------------------------------------------------
30
31 std::size_t global_control_active_value_unsafe(d1::global_control::parameter);
32
calculate_workers_limits()33 std::pair<unsigned, unsigned> threading_control_impl::calculate_workers_limits() {
34 // Expecting that 4P is suitable for most applications.
35 // Limit to 2P for large thread number.
36 // TODO: ask RML for max concurrency and possibly correct hard_limit
37 unsigned factor = governor::default_num_threads() <= 128 ? 4 : 2;
38
39 // The requested number of threads is intentionally not considered in
40 // computation of the hard limit, in order to separate responsibilities
41 // and avoid complicated interactions between global_control and task_scheduler_init.
42 // The threading control guarantees that at least 256 threads might be created.
43 unsigned workers_app_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
44 unsigned workers_hard_limit = max(max(factor * governor::default_num_threads(), 256u), workers_app_limit);
45 unsigned workers_soft_limit = calc_workers_soft_limit(workers_hard_limit);
46
47 return std::make_pair(workers_soft_limit, workers_hard_limit);
48 }
49
calc_workers_soft_limit(unsigned workers_hard_limit)50 unsigned threading_control_impl::calc_workers_soft_limit(unsigned workers_hard_limit) {
51 unsigned workers_soft_limit{};
52 unsigned soft_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
53
54 // if user set no limits (yet), use default value
55 workers_soft_limit = soft_limit != 0 ? soft_limit - 1 : governor::default_num_threads() - 1;
56
57 if (workers_soft_limit >= workers_hard_limit) {
58 workers_soft_limit = workers_hard_limit - 1;
59 }
60
61 return workers_soft_limit;
62 }
63
make_permit_manager(unsigned workers_soft_limit)64 cache_aligned_unique_ptr<permit_manager> threading_control_impl::make_permit_manager(unsigned workers_soft_limit) {
65 if (tcm_adaptor::is_initialized()) {
66 auto tcm = make_cache_aligned_unique<tcm_adaptor>();
67 if (tcm->is_connected()) {
68 return tcm;
69 }
70 }
71 return make_cache_aligned_unique<market>(workers_soft_limit);
72 }
73
make_thread_dispatcher(threading_control & tc,unsigned workers_soft_limit,unsigned workers_hard_limit)74 cache_aligned_unique_ptr<thread_dispatcher> threading_control_impl::make_thread_dispatcher(threading_control& tc,
75 unsigned workers_soft_limit,
76 unsigned workers_hard_limit)
77 {
78 stack_size_type stack_size = global_control_active_value_unsafe(global_control::thread_stack_size);
79
80 cache_aligned_unique_ptr<thread_dispatcher> td =
81 make_cache_aligned_unique<thread_dispatcher>(tc, workers_hard_limit, stack_size);
82 // This check relies on the fact that for shared RML default_concurrency == max_concurrency
83 if (!governor::UsePrivateRML && td->my_server->default_concurrency() < workers_soft_limit) {
84 runtime_warning("RML might limit the number of workers to %u while %u is requested.\n",
85 td->my_server->default_concurrency(), workers_soft_limit);
86 }
87
88 return td;
89 }
90
threading_control_impl(threading_control * tc)91 threading_control_impl::threading_control_impl(threading_control* tc) {
92 unsigned workers_soft_limit{}, workers_hard_limit{};
93 std::tie(workers_soft_limit, workers_hard_limit) = calculate_workers_limits();
94
95 my_permit_manager = make_permit_manager(workers_soft_limit);
96 my_thread_dispatcher = make_thread_dispatcher(*tc, workers_soft_limit, workers_hard_limit);
97 my_thread_request_serializer =
98 make_cache_aligned_unique<thread_request_serializer_proxy>(*my_thread_dispatcher, workers_soft_limit);
99 my_permit_manager->set_thread_request_observer(*my_thread_request_serializer);
100
101 my_cancellation_disseminator = make_cache_aligned_unique<cancellation_disseminator>();
102 my_waiting_threads_monitor = make_cache_aligned_unique<thread_control_monitor>();
103 }
104
release(bool blocking_terminate)105 void threading_control_impl::release(bool blocking_terminate) {
106 my_thread_dispatcher->release(blocking_terminate);
107 }
108
set_active_num_workers(unsigned soft_limit)109 void threading_control_impl::set_active_num_workers(unsigned soft_limit) {
110 __TBB_ASSERT(soft_limit <= my_thread_dispatcher->my_num_workers_hard_limit, nullptr);
111 my_thread_request_serializer->set_active_num_workers(soft_limit);
112 my_permit_manager->set_active_num_workers(soft_limit);
113 }
114
create_client(arena & a)115 threading_control_client threading_control_impl::create_client(arena& a) {
116 pm_client* pm_client = my_permit_manager->create_client(a);
117 thread_dispatcher_client* td_client = my_thread_dispatcher->create_client(a);
118
119 return threading_control_client{pm_client, td_client};
120 }
121
prepare_client_destruction(threading_control_client client)122 threading_control_impl::client_snapshot threading_control_impl::prepare_client_destruction(threading_control_client client) {
123 auto td_client = client.get_thread_dispatcher_client();
124 return {td_client->get_aba_epoch(), td_client->priority_level(), td_client, client.get_pm_client()};
125 }
126
try_destroy_client(threading_control_impl::client_snapshot snapshot)127 bool threading_control_impl::try_destroy_client(threading_control_impl::client_snapshot snapshot) {
128 if (my_thread_dispatcher->try_unregister_client(snapshot.my_td_client, snapshot.aba_epoch, snapshot.priority_level)) {
129 my_permit_manager->unregister_and_destroy_client(*snapshot.my_pm_client);
130 return true;
131 }
132 return false;
133 }
134
publish_client(threading_control_client tc_client,d1::constraints & constraints)135 void threading_control_impl::publish_client(threading_control_client tc_client, d1::constraints& constraints) {
136 my_permit_manager->register_client(tc_client.get_pm_client(), constraints);
137 my_thread_dispatcher->register_client(tc_client.get_thread_dispatcher_client());
138 }
139
register_thread(thread_data & td)140 void threading_control_impl::register_thread(thread_data& td) {
141 my_cancellation_disseminator->register_thread(td);
142 }
unregister_thread(thread_data & td)143 void threading_control_impl::unregister_thread(thread_data& td) {
144 my_cancellation_disseminator->unregister_thread(td);
145 }
146
propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state,d1::task_group_context & src,uint32_t new_state)147 void threading_control_impl::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
148 d1::task_group_context& src, uint32_t new_state)
149 {
150 my_cancellation_disseminator->propagate_task_group_state(mptr_state, src, new_state);
151 }
152
worker_stack_size()153 std::size_t threading_control_impl::worker_stack_size() {
154 return my_thread_dispatcher->worker_stack_size();
155 }
156
max_num_workers()157 unsigned threading_control_impl::max_num_workers() {
158 return my_thread_dispatcher->my_num_workers_hard_limit;
159 }
160
adjust_demand(threading_control_client tc_client,int mandatory_delta,int workers_delta)161 void threading_control_impl::adjust_demand(threading_control_client tc_client, int mandatory_delta, int workers_delta) {
162 auto& c = *tc_client.get_pm_client();
163 my_thread_request_serializer->register_mandatory_request(mandatory_delta);
164 my_permit_manager->adjust_demand(c, mandatory_delta, workers_delta);
165 }
166
get_waiting_threads_monitor()167 thread_control_monitor& threading_control_impl::get_waiting_threads_monitor() {
168 return *my_waiting_threads_monitor;
169 }
170
171 // ---------------------------------------- threading_control -------------------------------------------------------------------
172
173 // Defined in global_control.cpp
174 void global_control_lock();
175 void global_control_unlock();
176
add_ref(bool is_public)177 void threading_control::add_ref(bool is_public) {
178 ++my_ref_count;
179 if (is_public) {
180 my_public_ref_count++;
181 }
182 }
183
remove_ref(bool is_public)184 bool threading_control::remove_ref(bool is_public) {
185 if (is_public) {
186 __TBB_ASSERT(g_threading_control == this, "Global threading control instance was destroyed prematurely?");
187 __TBB_ASSERT(my_public_ref_count.load(std::memory_order_relaxed), nullptr);
188 --my_public_ref_count;
189 }
190
191 bool is_last_ref = --my_ref_count == 0;
192 if (is_last_ref) {
193 __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), nullptr);
194 g_threading_control = nullptr;
195 }
196
197 return is_last_ref;
198 }
199
get_threading_control(bool is_public)200 threading_control* threading_control::get_threading_control(bool is_public) {
201 threading_control* control = g_threading_control;
202 if (control) {
203 control->add_ref(is_public);
204 }
205
206 return control;
207 }
208
create_threading_control()209 threading_control* threading_control::create_threading_control() {
210 // Global control should be locked before threading_control_impl
211 global_control_lock();
212
213 threading_control* thr_control{ nullptr };
214 try_call([&] {
215 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
216
217 thr_control = get_threading_control(/*public = */ true);
218 if (thr_control == nullptr) {
219 thr_control = new (cache_aligned_allocate(sizeof(threading_control))) threading_control(/*public_ref = */ 1, /*private_ref = */ 1);
220 thr_control->my_pimpl = make_cache_aligned_unique<threading_control_impl>(thr_control);
221
222 __TBB_InitOnce::add_ref();
223
224 if (global_control_active_value_unsafe(global_control::scheduler_handle)) {
225 ++thr_control->my_public_ref_count;
226 ++thr_control->my_ref_count;
227 }
228
229 g_threading_control = thr_control;
230 }
231 }).on_exception([&] {
232 global_control_unlock();
233
234 cache_aligned_deleter deleter{};
235 deleter(thr_control);
236 });
237
238 global_control_unlock();
239 return thr_control;
240 }
241
destroy()242 void threading_control::destroy () {
243 cache_aligned_deleter deleter;
244 deleter(this);
245 __TBB_InitOnce::remove_ref();
246 }
247
wait_last_reference(global_mutex_type::scoped_lock & lock)248 void threading_control::wait_last_reference(global_mutex_type::scoped_lock& lock) {
249 while (my_public_ref_count.load(std::memory_order_relaxed) == 1 && my_ref_count.load(std::memory_order_relaxed) > 1) {
250 lock.release();
251 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
252 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
253 // Theoretically, new private references to the threading control can be added during waiting making it potentially
254 // endless.
255 // TODO: revise why the weak scheduler needs threading control's pointer and try to remove this wait.
256 // Note that the threading control should know about its schedulers for cancellation/exception/priority propagation,
257 // see e.g. task_group_context::cancel_group_execution()
258 while (my_public_ref_count.load(std::memory_order_acquire) == 1 && my_ref_count.load(std::memory_order_acquire) > 1) {
259 yield();
260 }
261 lock.acquire(g_threading_control_mutex);
262 }
263 }
264
release(bool is_public,bool blocking_terminate)265 bool threading_control::release(bool is_public, bool blocking_terminate) {
266 bool do_release = false;
267 {
268 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
269 if (blocking_terminate) {
270 __TBB_ASSERT(is_public, "Only an object with a public reference can request the blocking terminate");
271 wait_last_reference(lock);
272 }
273 do_release = remove_ref(is_public);
274 }
275
276 if (do_release) {
277 __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), "No public references must remain if we remove the threading control.");
278 // inform RML that blocking termination is required
279 my_pimpl->release(blocking_terminate);
280 return blocking_terminate;
281 }
282 return false;
283 }
284
threading_control(unsigned public_ref,unsigned ref)285 threading_control::threading_control(unsigned public_ref, unsigned ref) : my_public_ref_count(public_ref), my_ref_count(ref)
286 {}
287
register_public_reference()288 threading_control* threading_control::register_public_reference() {
289 threading_control* control{nullptr};
290 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
291 control = get_threading_control(/*public = */ true);
292 if (!control) {
293 // We are going to create threading_control_impl, we should acquire mutexes in right order
294 lock.release();
295 control = create_threading_control();
296 }
297
298 return control;
299 }
300
unregister_public_reference(bool blocking_terminate)301 bool threading_control::unregister_public_reference(bool blocking_terminate) {
302 __TBB_ASSERT(g_threading_control, "Threading control should exist until last public reference");
303 __TBB_ASSERT(g_threading_control->my_public_ref_count.load(std::memory_order_relaxed), nullptr);
304 return g_threading_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
305 }
306
create_client(arena & a)307 threading_control_client threading_control::create_client(arena& a) {
308 {
309 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
310 add_ref(/*public = */ false);
311 }
312
313 return my_pimpl->create_client(a);
314 }
315
publish_client(threading_control_client client,d1::constraints & constraints)316 void threading_control::publish_client(threading_control_client client, d1::constraints& constraints) {
317 return my_pimpl->publish_client(client, constraints);
318 }
319
prepare_client_destruction(threading_control_client client)320 threading_control::client_snapshot threading_control::prepare_client_destruction(threading_control_client client) {
321 return my_pimpl->prepare_client_destruction(client);
322 }
323
try_destroy_client(threading_control::client_snapshot deleter)324 bool threading_control::try_destroy_client(threading_control::client_snapshot deleter) {
325 bool res = my_pimpl->try_destroy_client(deleter);
326 if (res) {
327 release(/*public = */ false, /*blocking_terminate = */ false);
328 }
329 return res;
330 }
331
set_active_num_workers(unsigned soft_limit)332 void threading_control::set_active_num_workers(unsigned soft_limit) {
333 threading_control* thr_control = get_threading_control(/*public = */ false);
334 if (thr_control != nullptr) {
335 thr_control->my_pimpl->set_active_num_workers(soft_limit);
336 thr_control->release(/*is_public=*/false, /*blocking_terminate=*/false);
337 }
338 }
339
is_present()340 bool threading_control::is_present() {
341 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
342 return g_threading_control != nullptr;
343 }
344
register_lifetime_control()345 bool threading_control::register_lifetime_control() {
346 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
347 return get_threading_control(/*public = */ true) != nullptr;
348 }
349
unregister_lifetime_control(bool blocking_terminate)350 bool threading_control::unregister_lifetime_control(bool blocking_terminate) {
351 threading_control* thr_control{nullptr};
352 {
353 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
354 thr_control = g_threading_control;
355 }
356
357 bool released{true};
358 if (thr_control) {
359 released = thr_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
360 }
361
362 return released;
363 }
364
register_thread(thread_data & td)365 void threading_control::register_thread(thread_data& td) {
366 my_pimpl->register_thread(td);
367 }
368
unregister_thread(thread_data & td)369 void threading_control::unregister_thread(thread_data& td) {
370 my_pimpl->unregister_thread(td);
371 }
372
propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state,d1::task_group_context & src,uint32_t new_state)373 void threading_control::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
374 d1::task_group_context& src, uint32_t new_state)
375 {
376 my_pimpl->propagate_task_group_state(mptr_state, src, new_state);
377 }
378
worker_stack_size()379 std::size_t threading_control::worker_stack_size() {
380 return my_pimpl->worker_stack_size();
381 }
382
max_num_workers()383 unsigned threading_control::max_num_workers() {
384 global_mutex_type::scoped_lock lock(g_threading_control_mutex);
385 return g_threading_control ? g_threading_control->my_pimpl->max_num_workers() : 0;
386 }
387
adjust_demand(threading_control_client client,int mandatory_delta,int workers_delta)388 void threading_control::adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta) {
389 my_pimpl->adjust_demand(client, mandatory_delta, workers_delta);
390 }
391
get_waiting_threads_monitor()392 thread_control_monitor& threading_control::get_waiting_threads_monitor() {
393 return my_pimpl->get_waiting_threads_monitor();
394 }
395
396 } // r1
397 } // detail
398 } // tbb
399