xref: /oneTBB/src/tbb/threading_control.cpp (revision 71e1bb8e)
1 /*
2     Copyright (c) 2022-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "threading_control.h"
18 #include "permit_manager.h"
19 #include "market.h"
20 #include "tcm_adaptor.h"
21 #include "thread_dispatcher.h"
22 #include "governor.h"
23 #include "thread_dispatcher_client.h"
24 
25 namespace tbb {
26 namespace detail {
27 namespace r1 {
28 
29 // ---------------------------------------- threading_control_impl --------------------------------------------------------------
30 
31 std::size_t global_control_active_value_unsafe(d1::global_control::parameter);
32 
calculate_workers_limits()33 std::pair<unsigned, unsigned> threading_control_impl::calculate_workers_limits() {
34     // Expecting that 4P is suitable for most applications.
35     // Limit to 2P for large thread number.
36     // TODO: ask RML for max concurrency and possibly correct hard_limit
37     unsigned factor = governor::default_num_threads() <= 128 ? 4 : 2;
38 
39     // The requested number of threads is intentionally not considered in
40     // computation of the hard limit, in order to separate responsibilities
41     // and avoid complicated interactions between global_control and task_scheduler_init.
42     // The threading control guarantees that at least 256 threads might be created.
43     unsigned workers_app_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
44     unsigned workers_hard_limit = max(max(factor * governor::default_num_threads(), 256u), workers_app_limit);
45     unsigned workers_soft_limit = calc_workers_soft_limit(workers_hard_limit);
46 
47     return std::make_pair(workers_soft_limit, workers_hard_limit);
48 }
49 
calc_workers_soft_limit(unsigned workers_hard_limit)50 unsigned threading_control_impl::calc_workers_soft_limit(unsigned workers_hard_limit) {
51     unsigned workers_soft_limit{};
52     unsigned soft_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
53 
54     // if user set no limits (yet), use default value
55     workers_soft_limit = soft_limit != 0 ? soft_limit - 1 : governor::default_num_threads() - 1;
56 
57     if (workers_soft_limit >= workers_hard_limit) {
58         workers_soft_limit = workers_hard_limit - 1;
59     }
60 
61     return workers_soft_limit;
62 }
63 
make_permit_manager(unsigned workers_soft_limit)64 cache_aligned_unique_ptr<permit_manager> threading_control_impl::make_permit_manager(unsigned workers_soft_limit) {
65      if (tcm_adaptor::is_initialized()) {
66         auto tcm = make_cache_aligned_unique<tcm_adaptor>();
67         if (tcm->is_connected()) {
68             return tcm;
69         }
70     }
71     return make_cache_aligned_unique<market>(workers_soft_limit);
72 }
73 
make_thread_dispatcher(threading_control & tc,unsigned workers_soft_limit,unsigned workers_hard_limit)74 cache_aligned_unique_ptr<thread_dispatcher> threading_control_impl::make_thread_dispatcher(threading_control& tc,
75                                                                                            unsigned workers_soft_limit,
76                                                                                            unsigned workers_hard_limit)
77 {
78     stack_size_type stack_size = global_control_active_value_unsafe(global_control::thread_stack_size);
79 
80     cache_aligned_unique_ptr<thread_dispatcher> td =
81         make_cache_aligned_unique<thread_dispatcher>(tc, workers_hard_limit, stack_size);
82     // This check relies on the fact that for shared RML default_concurrency == max_concurrency
83     if (!governor::UsePrivateRML && td->my_server->default_concurrency() < workers_soft_limit) {
84         runtime_warning("RML might limit the number of workers to %u while %u is requested.\n",
85             td->my_server->default_concurrency(), workers_soft_limit);
86     }
87 
88     return td;
89 }
90 
threading_control_impl(threading_control * tc)91 threading_control_impl::threading_control_impl(threading_control* tc) {
92     unsigned workers_soft_limit{}, workers_hard_limit{};
93     std::tie(workers_soft_limit, workers_hard_limit) = calculate_workers_limits();
94 
95     my_permit_manager = make_permit_manager(workers_soft_limit);
96     my_thread_dispatcher = make_thread_dispatcher(*tc, workers_soft_limit, workers_hard_limit);
97     my_thread_request_serializer =
98         make_cache_aligned_unique<thread_request_serializer_proxy>(*my_thread_dispatcher, workers_soft_limit);
99     my_permit_manager->set_thread_request_observer(*my_thread_request_serializer);
100 
101     my_cancellation_disseminator = make_cache_aligned_unique<cancellation_disseminator>();
102     my_waiting_threads_monitor = make_cache_aligned_unique<thread_control_monitor>();
103 }
104 
release(bool blocking_terminate)105 void threading_control_impl::release(bool blocking_terminate) {
106     my_thread_dispatcher->release(blocking_terminate);
107 }
108 
set_active_num_workers(unsigned soft_limit)109 void threading_control_impl::set_active_num_workers(unsigned soft_limit) {
110     __TBB_ASSERT(soft_limit <= my_thread_dispatcher->my_num_workers_hard_limit, nullptr);
111     my_thread_request_serializer->set_active_num_workers(soft_limit);
112     my_permit_manager->set_active_num_workers(soft_limit);
113 }
114 
create_client(arena & a)115 threading_control_client threading_control_impl::create_client(arena& a) {
116     pm_client* pm_client = my_permit_manager->create_client(a);
117     thread_dispatcher_client* td_client = my_thread_dispatcher->create_client(a);
118 
119     return threading_control_client{pm_client, td_client};
120 }
121 
prepare_client_destruction(threading_control_client client)122 threading_control_impl::client_snapshot threading_control_impl::prepare_client_destruction(threading_control_client client) {
123     auto td_client = client.get_thread_dispatcher_client();
124     return {td_client->get_aba_epoch(), td_client->priority_level(), td_client, client.get_pm_client()};
125 }
126 
try_destroy_client(threading_control_impl::client_snapshot snapshot)127 bool threading_control_impl::try_destroy_client(threading_control_impl::client_snapshot snapshot) {
128     if (my_thread_dispatcher->try_unregister_client(snapshot.my_td_client, snapshot.aba_epoch, snapshot.priority_level)) {
129         my_permit_manager->unregister_and_destroy_client(*snapshot.my_pm_client);
130         return true;
131     }
132     return false;
133 }
134 
publish_client(threading_control_client tc_client,d1::constraints & constraints)135 void threading_control_impl::publish_client(threading_control_client tc_client, d1::constraints& constraints) {
136     my_permit_manager->register_client(tc_client.get_pm_client(), constraints);
137     my_thread_dispatcher->register_client(tc_client.get_thread_dispatcher_client());
138 }
139 
register_thread(thread_data & td)140 void threading_control_impl::register_thread(thread_data& td) {
141     my_cancellation_disseminator->register_thread(td);
142 }
unregister_thread(thread_data & td)143 void threading_control_impl::unregister_thread(thread_data& td) {
144     my_cancellation_disseminator->unregister_thread(td);
145 }
146 
propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state,d1::task_group_context & src,uint32_t new_state)147 void threading_control_impl::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
148                                                         d1::task_group_context& src, uint32_t new_state)
149 {
150     my_cancellation_disseminator->propagate_task_group_state(mptr_state, src, new_state);
151 }
152 
worker_stack_size()153 std::size_t threading_control_impl::worker_stack_size() {
154     return my_thread_dispatcher->worker_stack_size();
155 }
156 
max_num_workers()157 unsigned threading_control_impl::max_num_workers() {
158     return my_thread_dispatcher->my_num_workers_hard_limit;
159 }
160 
adjust_demand(threading_control_client tc_client,int mandatory_delta,int workers_delta)161 void threading_control_impl::adjust_demand(threading_control_client tc_client, int mandatory_delta, int workers_delta) {
162     auto& c = *tc_client.get_pm_client();
163     my_thread_request_serializer->register_mandatory_request(mandatory_delta);
164     my_permit_manager->adjust_demand(c, mandatory_delta, workers_delta);
165 }
166 
get_waiting_threads_monitor()167 thread_control_monitor& threading_control_impl::get_waiting_threads_monitor() {
168     return *my_waiting_threads_monitor;
169 }
170 
171 // ---------------------------------------- threading_control -------------------------------------------------------------------
172 
173 // Defined in global_control.cpp
174 void global_control_lock();
175 void global_control_unlock();
176 
add_ref(bool is_public)177 void threading_control::add_ref(bool is_public) {
178     ++my_ref_count;
179     if (is_public) {
180         my_public_ref_count++;
181     }
182 }
183 
remove_ref(bool is_public)184 bool threading_control::remove_ref(bool is_public) {
185     if (is_public) {
186         __TBB_ASSERT(g_threading_control == this, "Global threading control instance was destroyed prematurely?");
187         __TBB_ASSERT(my_public_ref_count.load(std::memory_order_relaxed), nullptr);
188         --my_public_ref_count;
189     }
190 
191     bool is_last_ref = --my_ref_count == 0;
192     if (is_last_ref) {
193         __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), nullptr);
194         g_threading_control = nullptr;
195     }
196 
197     return is_last_ref;
198 }
199 
get_threading_control(bool is_public)200 threading_control* threading_control::get_threading_control(bool is_public) {
201     threading_control* control = g_threading_control;
202     if (control) {
203         control->add_ref(is_public);
204     }
205 
206     return control;
207 }
208 
create_threading_control()209 threading_control* threading_control::create_threading_control() {
210     // Global control should be locked before threading_control_impl
211     global_control_lock();
212 
213     threading_control* thr_control{ nullptr };
214     try_call([&] {
215         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
216 
217         thr_control = get_threading_control(/*public = */ true);
218         if (thr_control == nullptr) {
219             thr_control =  new (cache_aligned_allocate(sizeof(threading_control))) threading_control(/*public_ref = */ 1, /*private_ref = */ 1);
220             thr_control->my_pimpl = make_cache_aligned_unique<threading_control_impl>(thr_control);
221 
222             __TBB_InitOnce::add_ref();
223 
224             if (global_control_active_value_unsafe(global_control::scheduler_handle)) {
225                 ++thr_control->my_public_ref_count;
226                 ++thr_control->my_ref_count;
227             }
228 
229             g_threading_control = thr_control;
230         }
231     }).on_exception([&] {
232         global_control_unlock();
233 
234         cache_aligned_deleter deleter{};
235         deleter(thr_control);
236     });
237 
238     global_control_unlock();
239     return thr_control;
240 }
241 
destroy()242 void threading_control::destroy () {
243     cache_aligned_deleter deleter;
244     deleter(this);
245     __TBB_InitOnce::remove_ref();
246 }
247 
wait_last_reference(global_mutex_type::scoped_lock & lock)248 void threading_control::wait_last_reference(global_mutex_type::scoped_lock& lock) {
249     while (my_public_ref_count.load(std::memory_order_relaxed) == 1 && my_ref_count.load(std::memory_order_relaxed) > 1) {
250         lock.release();
251         // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
252         // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
253         // Theoretically, new private references to the threading control can be added during waiting making it potentially
254         // endless.
255         // TODO: revise why the weak scheduler needs threading control's pointer and try to remove this wait.
256         // Note that the threading control should know about its schedulers for cancellation/exception/priority propagation,
257         // see e.g. task_group_context::cancel_group_execution()
258         while (my_public_ref_count.load(std::memory_order_acquire) == 1 && my_ref_count.load(std::memory_order_acquire) > 1) {
259             yield();
260         }
261         lock.acquire(g_threading_control_mutex);
262     }
263 }
264 
release(bool is_public,bool blocking_terminate)265 bool threading_control::release(bool is_public, bool blocking_terminate) {
266     bool do_release = false;
267     {
268         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
269         if (blocking_terminate) {
270             __TBB_ASSERT(is_public, "Only an object with a public reference can request the blocking terminate");
271             wait_last_reference(lock);
272         }
273         do_release = remove_ref(is_public);
274     }
275 
276     if (do_release) {
277         __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), "No public references must remain if we remove the threading control.");
278         // inform RML that blocking termination is required
279         my_pimpl->release(blocking_terminate);
280         return blocking_terminate;
281     }
282     return false;
283 }
284 
threading_control(unsigned public_ref,unsigned ref)285 threading_control::threading_control(unsigned public_ref, unsigned ref) : my_public_ref_count(public_ref), my_ref_count(ref)
286 {}
287 
register_public_reference()288 threading_control* threading_control::register_public_reference() {
289     threading_control* control{nullptr};
290     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
291     control = get_threading_control(/*public = */ true);
292     if (!control) {
293         // We are going to create threading_control_impl, we should acquire mutexes in right order
294         lock.release();
295         control = create_threading_control();
296     }
297 
298     return control;
299 }
300 
unregister_public_reference(bool blocking_terminate)301 bool threading_control::unregister_public_reference(bool blocking_terminate) {
302     __TBB_ASSERT(g_threading_control, "Threading control should exist until last public reference");
303     __TBB_ASSERT(g_threading_control->my_public_ref_count.load(std::memory_order_relaxed), nullptr);
304     return g_threading_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
305 }
306 
create_client(arena & a)307 threading_control_client threading_control::create_client(arena& a) {
308     {
309         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
310         add_ref(/*public = */ false);
311     }
312 
313     return my_pimpl->create_client(a);
314 }
315 
publish_client(threading_control_client client,d1::constraints & constraints)316 void threading_control::publish_client(threading_control_client client, d1::constraints& constraints) {
317     return my_pimpl->publish_client(client, constraints);
318 }
319 
prepare_client_destruction(threading_control_client client)320 threading_control::client_snapshot threading_control::prepare_client_destruction(threading_control_client client) {
321     return my_pimpl->prepare_client_destruction(client);
322 }
323 
try_destroy_client(threading_control::client_snapshot deleter)324 bool threading_control::try_destroy_client(threading_control::client_snapshot deleter) {
325     bool res = my_pimpl->try_destroy_client(deleter);
326     if (res) {
327         release(/*public = */ false, /*blocking_terminate = */ false);
328     }
329     return res;
330 }
331 
set_active_num_workers(unsigned soft_limit)332 void threading_control::set_active_num_workers(unsigned soft_limit) {
333     threading_control* thr_control = get_threading_control(/*public = */ false);
334     if (thr_control != nullptr) {
335         thr_control->my_pimpl->set_active_num_workers(soft_limit);
336         thr_control->release(/*is_public=*/false, /*blocking_terminate=*/false);
337     }
338 }
339 
is_present()340 bool threading_control::is_present() {
341     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
342     return g_threading_control != nullptr;
343 }
344 
register_lifetime_control()345 bool threading_control::register_lifetime_control() {
346     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
347     return get_threading_control(/*public = */ true) != nullptr;
348 }
349 
unregister_lifetime_control(bool blocking_terminate)350 bool threading_control::unregister_lifetime_control(bool blocking_terminate) {
351     threading_control* thr_control{nullptr};
352     {
353         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
354         thr_control = g_threading_control;
355     }
356 
357     bool released{true};
358     if (thr_control) {
359         released = thr_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
360     }
361 
362     return released;
363 }
364 
register_thread(thread_data & td)365 void threading_control::register_thread(thread_data& td) {
366     my_pimpl->register_thread(td);
367 }
368 
unregister_thread(thread_data & td)369 void threading_control::unregister_thread(thread_data& td) {
370     my_pimpl->unregister_thread(td);
371 }
372 
propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::* mptr_state,d1::task_group_context & src,uint32_t new_state)373 void threading_control::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
374                                                    d1::task_group_context& src, uint32_t new_state)
375 {
376     my_pimpl->propagate_task_group_state(mptr_state, src, new_state);
377 }
378 
worker_stack_size()379 std::size_t threading_control::worker_stack_size() {
380     return my_pimpl->worker_stack_size();
381 }
382 
max_num_workers()383 unsigned threading_control::max_num_workers() {
384     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
385     return g_threading_control ? g_threading_control->my_pimpl->max_num_workers() : 0;
386 }
387 
adjust_demand(threading_control_client client,int mandatory_delta,int workers_delta)388 void threading_control::adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta) {
389     my_pimpl->adjust_demand(client, mandatory_delta, workers_delta);
390 }
391 
get_waiting_threads_monitor()392 thread_control_monitor& threading_control::get_waiting_threads_monitor() {
393     return my_pimpl->get_waiting_threads_monitor();
394 }
395 
396 } // r1
397 } // detail
398 } // tbb
399