xref: /oneTBB/src/tbb/threading_control.cpp (revision 89b2e0e3)
1 /*
2     Copyright (c) 2022-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "threading_control.h"
18 #include "permit_manager.h"
19 #include "market.h"
20 #include "thread_dispatcher.h"
21 #include "governor.h"
22 #include "thread_dispatcher_client.h"
23 
24 namespace tbb {
25 namespace detail {
26 namespace r1 {
27 
28 // ---------------------------------------- threading_control_impl --------------------------------------------------------------
29 
30 std::size_t global_control_active_value_unsafe(d1::global_control::parameter);
31 
32 std::pair<unsigned, unsigned> threading_control_impl::calculate_workers_limits() {
33     // Expecting that 4P is suitable for most applications.
34     // Limit to 2P for large thread number.
35     // TODO: ask RML for max concurrency and possibly correct hard_limit
36     unsigned factor = governor::default_num_threads() <= 128 ? 4 : 2;
37 
38     // The requested number of threads is intentionally not considered in
39     // computation of the hard limit, in order to separate responsibilities
40     // and avoid complicated interactions between global_control and task_scheduler_init.
41     // The threading control guarantees that at least 256 threads might be created.
42     unsigned workers_app_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
43     unsigned workers_hard_limit = max(max(factor * governor::default_num_threads(), 256u), workers_app_limit);
44     unsigned workers_soft_limit = calc_workers_soft_limit(workers_hard_limit);
45 
46     return std::make_pair(workers_soft_limit, workers_hard_limit);
47 }
48 
49 unsigned threading_control_impl::calc_workers_soft_limit(unsigned workers_hard_limit) {
50     unsigned workers_soft_limit{};
51     unsigned soft_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
52 
53     // if user set no limits (yet), use default value
54     workers_soft_limit = soft_limit != 0 ? soft_limit - 1 : governor::default_num_threads() - 1;
55 
56     if (workers_soft_limit >= workers_hard_limit) {
57         workers_soft_limit = workers_hard_limit - 1;
58     }
59 
60     return workers_soft_limit;
61 }
62 
63 cache_aligned_unique_ptr<permit_manager> threading_control_impl::make_permit_manager(unsigned workers_soft_limit) {
64     return make_cache_aligned_unique<market>(workers_soft_limit);
65 }
66 
67 cache_aligned_unique_ptr<thread_dispatcher> threading_control_impl::make_thread_dispatcher(threading_control& tc,
68                                                                                            unsigned workers_soft_limit,
69                                                                                            unsigned workers_hard_limit)
70 {
71     stack_size_type stack_size = global_control_active_value_unsafe(global_control::thread_stack_size);
72 
73     cache_aligned_unique_ptr<thread_dispatcher> td =
74         make_cache_aligned_unique<thread_dispatcher>(tc, workers_hard_limit, stack_size);
75     // This check relies on the fact that for shared RML default_concurrency == max_concurrency
76     if (!governor::UsePrivateRML && td->my_server->default_concurrency() < workers_soft_limit) {
77         runtime_warning("RML might limit the number of workers to %u while %u is requested.\n",
78             td->my_server->default_concurrency(), workers_soft_limit);
79     }
80 
81     return td;
82 }
83 
84 threading_control_impl::threading_control_impl(threading_control* tc) {
85     unsigned workers_soft_limit{}, workers_hard_limit{};
86     std::tie(workers_soft_limit, workers_hard_limit) = calculate_workers_limits();
87 
88     my_permit_manager = make_permit_manager(workers_soft_limit);
89     my_thread_dispatcher = make_thread_dispatcher(*tc, workers_soft_limit, workers_hard_limit);
90     my_thread_request_serializer =
91         make_cache_aligned_unique<thread_request_serializer_proxy>(*my_thread_dispatcher, workers_soft_limit);
92     my_permit_manager->set_thread_request_observer(*my_thread_request_serializer);
93 
94     my_cancellation_disseminator = make_cache_aligned_unique<cancellation_disseminator>();
95     my_waiting_threads_monitor = make_cache_aligned_unique<thread_control_monitor>();
96 }
97 
98 void threading_control_impl::release(bool blocking_terminate) {
99     my_thread_dispatcher->release(blocking_terminate);
100 }
101 
102 void threading_control_impl::set_active_num_workers(unsigned soft_limit) {
103     __TBB_ASSERT(soft_limit <= my_thread_dispatcher->my_num_workers_hard_limit, nullptr);
104     my_thread_request_serializer->set_active_num_workers(soft_limit);
105     my_permit_manager->set_active_num_workers(soft_limit);
106 }
107 
108 threading_control_client threading_control_impl::create_client(arena& a) {
109     pm_client* pm_client = my_permit_manager->create_client(a);
110     thread_dispatcher_client* td_client = my_thread_dispatcher->create_client(a);
111 
112     return threading_control_client{pm_client, td_client};
113 }
114 
115 threading_control_impl::client_snapshot threading_control_impl::prepare_client_destruction(threading_control_client client) {
116     auto td_client = client.get_thread_dispatcher_client();
117     return {td_client->get_aba_epoch(), td_client->priority_level(), td_client, client.get_pm_client()};
118 }
119 
120 bool threading_control_impl::try_destroy_client(threading_control_impl::client_snapshot snapshot) {
121     if (my_thread_dispatcher->try_unregister_client(snapshot.my_td_client, snapshot.aba_epoch, snapshot.priority_level)) {
122         my_permit_manager->unregister_and_destroy_client(*snapshot.my_pm_client);
123         return true;
124     }
125     return false;
126 }
127 
128 void threading_control_impl::publish_client(threading_control_client tc_client) {
129     my_permit_manager->register_client(tc_client.get_pm_client());
130     my_thread_dispatcher->register_client(tc_client.get_thread_dispatcher_client());
131 }
132 
133 void threading_control_impl::register_thread(thread_data& td) {
134     my_cancellation_disseminator->register_thread(td);
135 }
136 void threading_control_impl::unregister_thread(thread_data& td) {
137     my_cancellation_disseminator->unregister_thread(td);
138 }
139 
140 void threading_control_impl::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
141                                                         d1::task_group_context& src, uint32_t new_state)
142 {
143     my_cancellation_disseminator->propagate_task_group_state(mptr_state, src, new_state);
144 }
145 
146 std::size_t threading_control_impl::worker_stack_size() {
147     return my_thread_dispatcher->worker_stack_size();
148 }
149 
150 unsigned threading_control_impl::max_num_workers() {
151     return my_thread_dispatcher->my_num_workers_hard_limit;
152 }
153 
154 void threading_control_impl::adjust_demand(threading_control_client tc_client, int mandatory_delta, int workers_delta) {
155     auto& c = *tc_client.get_pm_client();
156     my_thread_request_serializer->register_mandatory_request(mandatory_delta);
157     my_permit_manager->adjust_demand(c, mandatory_delta, workers_delta);
158 }
159 
160 thread_control_monitor& threading_control_impl::get_waiting_threads_monitor() {
161     return *my_waiting_threads_monitor;
162 }
163 
164 // ---------------------------------------- threading_control -------------------------------------------------------------------
165 
166 // Defined in global_control.cpp
167 void global_control_lock();
168 void global_control_unlock();
169 
170 void threading_control::add_ref(bool is_public) {
171     ++my_ref_count;
172     if (is_public) {
173         my_public_ref_count++;
174     }
175 }
176 
177 bool threading_control::remove_ref(bool is_public) {
178     if (is_public) {
179         __TBB_ASSERT(g_threading_control == this, "Global threading control instance was destroyed prematurely?");
180         __TBB_ASSERT(my_public_ref_count.load(std::memory_order_relaxed), nullptr);
181         --my_public_ref_count;
182     }
183 
184     bool is_last_ref = --my_ref_count == 0;
185     if (is_last_ref) {
186         __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), nullptr);
187         g_threading_control = nullptr;
188     }
189 
190     return is_last_ref;
191 }
192 
193 threading_control* threading_control::get_threading_control(bool is_public) {
194     threading_control* control = g_threading_control;
195     if (control) {
196         control->add_ref(is_public);
197     }
198 
199     return control;
200 }
201 
202 threading_control* threading_control::create_threading_control() {
203     // Global control should be locked before threading_control_impl
204     global_control_lock();
205 
206     threading_control* thr_control{ nullptr };
207     try_call([&] {
208         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
209 
210         thr_control = get_threading_control(/*public = */ true);
211         if (thr_control == nullptr) {
212             thr_control =  new (cache_aligned_allocate(sizeof(threading_control))) threading_control(/*public_ref = */ 1, /*private_ref = */ 1);
213             thr_control->my_pimpl = make_cache_aligned_unique<threading_control_impl>(thr_control);
214 
215             __TBB_InitOnce::add_ref();
216 
217             if (global_control_active_value_unsafe(global_control::scheduler_handle)) {
218                 ++thr_control->my_public_ref_count;
219                 ++thr_control->my_ref_count;
220             }
221 
222             g_threading_control = thr_control;
223         }
224     }).on_exception([&] {
225         global_control_unlock();
226 
227         cache_aligned_deleter deleter{};
228         deleter(thr_control);
229     });
230 
231     global_control_unlock();
232     return thr_control;
233 }
234 
235 void threading_control::destroy () {
236     cache_aligned_deleter deleter;
237     deleter(this);
238     __TBB_InitOnce::remove_ref();
239 }
240 
241 void threading_control::wait_last_reference(global_mutex_type::scoped_lock& lock) {
242     while (my_public_ref_count.load(std::memory_order_relaxed) == 1 && my_ref_count.load(std::memory_order_relaxed) > 1) {
243         lock.release();
244         // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
245         // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
246         // Theoretically, new private references to the threading control can be added during waiting making it potentially
247         // endless.
248         // TODO: revise why the weak scheduler needs threading control's pointer and try to remove this wait.
249         // Note that the threading control should know about its schedulers for cancellation/exception/priority propagation,
250         // see e.g. task_group_context::cancel_group_execution()
251         while (my_public_ref_count.load(std::memory_order_acquire) == 1 && my_ref_count.load(std::memory_order_acquire) > 1) {
252             yield();
253         }
254         lock.acquire(g_threading_control_mutex);
255     }
256 }
257 
258 bool threading_control::release(bool is_public, bool blocking_terminate) {
259     bool do_release = false;
260     {
261         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
262         if (blocking_terminate) {
263             __TBB_ASSERT(is_public, "Only an object with a public reference can request the blocking terminate");
264             wait_last_reference(lock);
265         }
266         do_release = remove_ref(is_public);
267     }
268 
269     if (do_release) {
270         __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), "No public references must remain if we remove the threading control.");
271         // inform RML that blocking termination is required
272         my_pimpl->release(blocking_terminate);
273         return blocking_terminate;
274     }
275     return false;
276 }
277 
278 threading_control::threading_control(unsigned public_ref, unsigned ref) : my_public_ref_count(public_ref), my_ref_count(ref)
279 {}
280 
281 threading_control* threading_control::register_public_reference() {
282     threading_control* control{nullptr};
283     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
284     control = get_threading_control(/*public = */ true);
285     if (!control) {
286         // We are going to create threading_control_impl, we should acquire mutexes in right order
287         lock.release();
288         control = create_threading_control();
289     }
290 
291     return control;
292 }
293 
294 bool threading_control::unregister_public_reference(bool blocking_terminate) {
295     __TBB_ASSERT(g_threading_control, "Threading control should exist until last public reference");
296     __TBB_ASSERT(g_threading_control->my_public_ref_count.load(std::memory_order_relaxed), nullptr);
297     return g_threading_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
298 }
299 
300 threading_control_client threading_control::create_client(arena& a) {
301     {
302         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
303         add_ref(/*public = */ false);
304     }
305 
306     return my_pimpl->create_client(a);
307 }
308 
309 void threading_control::publish_client(threading_control_client client) {
310     return my_pimpl->publish_client(client);
311 }
312 
313 threading_control::client_snapshot threading_control::prepare_client_destruction(threading_control_client client) {
314     return my_pimpl->prepare_client_destruction(client);
315 }
316 
317 bool threading_control::try_destroy_client(threading_control::client_snapshot deleter) {
318     bool res = my_pimpl->try_destroy_client(deleter);
319     if (res) {
320         release(/*public = */ false, /*blocking_terminate = */ false);
321     }
322     return res;
323 }
324 
325 void threading_control::set_active_num_workers(unsigned soft_limit) {
326     threading_control* thr_control = get_threading_control(/*public = */ false);
327     if (thr_control != nullptr) {
328         thr_control->my_pimpl->set_active_num_workers(soft_limit);
329         thr_control->release(/*is_public=*/false, /*blocking_terminate=*/false);
330     }
331 }
332 
333 bool threading_control::is_present() {
334     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
335     return g_threading_control != nullptr;
336 }
337 
338 bool threading_control::register_lifetime_control() {
339     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
340     return get_threading_control(/*public = */ true) != nullptr;
341 }
342 
343 bool threading_control::unregister_lifetime_control(bool blocking_terminate) {
344     threading_control* thr_control{nullptr};
345     {
346         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
347         thr_control = g_threading_control;
348     }
349 
350     bool released{true};
351     if (thr_control) {
352         released = thr_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
353     }
354 
355     return released;
356 }
357 
358 void threading_control::register_thread(thread_data& td) {
359     my_pimpl->register_thread(td);
360 }
361 
362 void threading_control::unregister_thread(thread_data& td) {
363     my_pimpl->unregister_thread(td);
364 }
365 
366 void threading_control::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
367                                                    d1::task_group_context& src, uint32_t new_state)
368 {
369     my_pimpl->propagate_task_group_state(mptr_state, src, new_state);
370 }
371 
372 std::size_t threading_control::worker_stack_size() {
373     return my_pimpl->worker_stack_size();
374 }
375 
376 unsigned threading_control::max_num_workers() {
377     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
378     return g_threading_control ? g_threading_control->my_pimpl->max_num_workers() : 0;
379 }
380 
381 void threading_control::adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta) {
382     my_pimpl->adjust_demand(client, mandatory_delta, workers_delta);
383 }
384 
385 thread_control_monitor& threading_control::get_waiting_threads_monitor() {
386     return my_pimpl->get_waiting_threads_monitor();
387 }
388 
389 } // r1
390 } // detail
391 } // tbb
392