xref: /oneTBB/src/tbb/threading_control.cpp (revision b2474bfc)
1 /*
2     Copyright (c) 2022-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "threading_control.h"
18 #include "permit_manager.h"
19 #include "market.h"
20 #include "tcm_adaptor.h"
21 #include "thread_dispatcher.h"
22 #include "governor.h"
23 #include "thread_dispatcher_client.h"
24 
25 namespace tbb {
26 namespace detail {
27 namespace r1 {
28 
29 // ---------------------------------------- threading_control_impl --------------------------------------------------------------
30 
31 std::size_t global_control_active_value_unsafe(d1::global_control::parameter);
32 
33 std::pair<unsigned, unsigned> threading_control_impl::calculate_workers_limits() {
34     // Expecting that 4P is suitable for most applications.
35     // Limit to 2P for large thread number.
36     // TODO: ask RML for max concurrency and possibly correct hard_limit
37     unsigned factor = governor::default_num_threads() <= 128 ? 4 : 2;
38 
39     // The requested number of threads is intentionally not considered in
40     // computation of the hard limit, in order to separate responsibilities
41     // and avoid complicated interactions between global_control and task_scheduler_init.
42     // The threading control guarantees that at least 256 threads might be created.
43     unsigned workers_app_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
44     unsigned workers_hard_limit = max(max(factor * governor::default_num_threads(), 256u), workers_app_limit);
45     unsigned workers_soft_limit = calc_workers_soft_limit(workers_hard_limit);
46 
47     return std::make_pair(workers_soft_limit, workers_hard_limit);
48 }
49 
50 unsigned threading_control_impl::calc_workers_soft_limit(unsigned workers_hard_limit) {
51     unsigned workers_soft_limit{};
52     unsigned soft_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism);
53 
54     // if user set no limits (yet), use default value
55     workers_soft_limit = soft_limit != 0 ? soft_limit - 1 : governor::default_num_threads() - 1;
56 
57     if (workers_soft_limit >= workers_hard_limit) {
58         workers_soft_limit = workers_hard_limit - 1;
59     }
60 
61     return workers_soft_limit;
62 }
63 
64 cache_aligned_unique_ptr<permit_manager> threading_control_impl::make_permit_manager(unsigned workers_soft_limit) {
65      if (tcm_adaptor::is_initialized()) {
66         auto tcm = make_cache_aligned_unique<tcm_adaptor>();
67         if (tcm->is_connected()) {
68             return tcm;
69         }
70     }
71     return make_cache_aligned_unique<market>(workers_soft_limit);
72 }
73 
74 cache_aligned_unique_ptr<thread_dispatcher> threading_control_impl::make_thread_dispatcher(threading_control& tc,
75                                                                                            unsigned workers_soft_limit,
76                                                                                            unsigned workers_hard_limit)
77 {
78     stack_size_type stack_size = global_control_active_value_unsafe(global_control::thread_stack_size);
79 
80     cache_aligned_unique_ptr<thread_dispatcher> td =
81         make_cache_aligned_unique<thread_dispatcher>(tc, workers_hard_limit, stack_size);
82     // This check relies on the fact that for shared RML default_concurrency == max_concurrency
83     if (!governor::UsePrivateRML && td->my_server->default_concurrency() < workers_soft_limit) {
84         runtime_warning("RML might limit the number of workers to %u while %u is requested.\n",
85             td->my_server->default_concurrency(), workers_soft_limit);
86     }
87 
88     return td;
89 }
90 
91 threading_control_impl::threading_control_impl(threading_control* tc) {
92     unsigned workers_soft_limit{}, workers_hard_limit{};
93     std::tie(workers_soft_limit, workers_hard_limit) = calculate_workers_limits();
94 
95     my_permit_manager = make_permit_manager(workers_soft_limit);
96     my_thread_dispatcher = make_thread_dispatcher(*tc, workers_soft_limit, workers_hard_limit);
97     my_thread_request_serializer =
98         make_cache_aligned_unique<thread_request_serializer_proxy>(*my_thread_dispatcher, workers_soft_limit);
99     my_permit_manager->set_thread_request_observer(*my_thread_request_serializer);
100 
101     my_cancellation_disseminator = make_cache_aligned_unique<cancellation_disseminator>();
102     my_waiting_threads_monitor = make_cache_aligned_unique<thread_control_monitor>();
103 }
104 
105 void threading_control_impl::release(bool blocking_terminate) {
106     my_thread_dispatcher->release(blocking_terminate);
107 }
108 
109 void threading_control_impl::set_active_num_workers(unsigned soft_limit) {
110     __TBB_ASSERT(soft_limit <= my_thread_dispatcher->my_num_workers_hard_limit, nullptr);
111     my_thread_request_serializer->set_active_num_workers(soft_limit);
112     my_permit_manager->set_active_num_workers(soft_limit);
113 }
114 
115 threading_control_client threading_control_impl::create_client(arena& a) {
116     pm_client* pm_client = my_permit_manager->create_client(a);
117     thread_dispatcher_client* td_client = my_thread_dispatcher->create_client(a);
118 
119     return threading_control_client{pm_client, td_client};
120 }
121 
122 threading_control_impl::client_snapshot threading_control_impl::prepare_client_destruction(threading_control_client client) {
123     auto td_client = client.get_thread_dispatcher_client();
124     return {td_client->get_aba_epoch(), td_client->priority_level(), td_client, client.get_pm_client()};
125 }
126 
127 bool threading_control_impl::try_destroy_client(threading_control_impl::client_snapshot snapshot) {
128     if (my_thread_dispatcher->try_unregister_client(snapshot.my_td_client, snapshot.aba_epoch, snapshot.priority_level)) {
129         my_permit_manager->unregister_and_destroy_client(*snapshot.my_pm_client);
130         return true;
131     }
132     return false;
133 }
134 
135 void threading_control_impl::publish_client(threading_control_client tc_client, d1::constraints& constraints) {
136     my_permit_manager->register_client(tc_client.get_pm_client(), constraints);
137     my_thread_dispatcher->register_client(tc_client.get_thread_dispatcher_client());
138 }
139 
140 void threading_control_impl::register_thread(thread_data& td) {
141     my_cancellation_disseminator->register_thread(td);
142 }
143 void threading_control_impl::unregister_thread(thread_data& td) {
144     my_cancellation_disseminator->unregister_thread(td);
145 }
146 
147 void threading_control_impl::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
148                                                         d1::task_group_context& src, uint32_t new_state)
149 {
150     my_cancellation_disseminator->propagate_task_group_state(mptr_state, src, new_state);
151 }
152 
153 std::size_t threading_control_impl::worker_stack_size() {
154     return my_thread_dispatcher->worker_stack_size();
155 }
156 
157 unsigned threading_control_impl::max_num_workers() {
158     return my_thread_dispatcher->my_num_workers_hard_limit;
159 }
160 
161 void threading_control_impl::adjust_demand(threading_control_client tc_client, int mandatory_delta, int workers_delta) {
162     auto& c = *tc_client.get_pm_client();
163     my_thread_request_serializer->register_mandatory_request(mandatory_delta);
164     my_permit_manager->adjust_demand(c, mandatory_delta, workers_delta);
165 }
166 
167 thread_control_monitor& threading_control_impl::get_waiting_threads_monitor() {
168     return *my_waiting_threads_monitor;
169 }
170 
171 // ---------------------------------------- threading_control -------------------------------------------------------------------
172 
173 // Defined in global_control.cpp
174 void global_control_lock();
175 void global_control_unlock();
176 
177 void threading_control::add_ref(bool is_public) {
178     ++my_ref_count;
179     if (is_public) {
180         my_public_ref_count++;
181     }
182 }
183 
184 bool threading_control::remove_ref(bool is_public) {
185     if (is_public) {
186         __TBB_ASSERT(g_threading_control == this, "Global threading control instance was destroyed prematurely?");
187         __TBB_ASSERT(my_public_ref_count.load(std::memory_order_relaxed), nullptr);
188         --my_public_ref_count;
189     }
190 
191     bool is_last_ref = --my_ref_count == 0;
192     if (is_last_ref) {
193         __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), nullptr);
194         g_threading_control = nullptr;
195     }
196 
197     return is_last_ref;
198 }
199 
200 threading_control* threading_control::get_threading_control(bool is_public) {
201     threading_control* control = g_threading_control;
202     if (control) {
203         control->add_ref(is_public);
204     }
205 
206     return control;
207 }
208 
209 threading_control* threading_control::create_threading_control() {
210     // Global control should be locked before threading_control_impl
211     global_control_lock();
212 
213     threading_control* thr_control{ nullptr };
214     try_call([&] {
215         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
216 
217         thr_control = get_threading_control(/*public = */ true);
218         if (thr_control == nullptr) {
219             thr_control =  new (cache_aligned_allocate(sizeof(threading_control))) threading_control(/*public_ref = */ 1, /*private_ref = */ 1);
220             thr_control->my_pimpl = make_cache_aligned_unique<threading_control_impl>(thr_control);
221 
222             __TBB_InitOnce::add_ref();
223 
224             if (global_control_active_value_unsafe(global_control::scheduler_handle)) {
225                 ++thr_control->my_public_ref_count;
226                 ++thr_control->my_ref_count;
227             }
228 
229             g_threading_control = thr_control;
230         }
231     }).on_exception([&] {
232         global_control_unlock();
233 
234         cache_aligned_deleter deleter{};
235         deleter(thr_control);
236     });
237 
238     global_control_unlock();
239     return thr_control;
240 }
241 
242 void threading_control::destroy () {
243     cache_aligned_deleter deleter;
244     deleter(this);
245     __TBB_InitOnce::remove_ref();
246 }
247 
248 void threading_control::wait_last_reference(global_mutex_type::scoped_lock& lock) {
249     while (my_public_ref_count.load(std::memory_order_relaxed) == 1 && my_ref_count.load(std::memory_order_relaxed) > 1) {
250         lock.release();
251         // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all
252         // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created.
253         // Theoretically, new private references to the threading control can be added during waiting making it potentially
254         // endless.
255         // TODO: revise why the weak scheduler needs threading control's pointer and try to remove this wait.
256         // Note that the threading control should know about its schedulers for cancellation/exception/priority propagation,
257         // see e.g. task_group_context::cancel_group_execution()
258         while (my_public_ref_count.load(std::memory_order_acquire) == 1 && my_ref_count.load(std::memory_order_acquire) > 1) {
259             yield();
260         }
261         lock.acquire(g_threading_control_mutex);
262     }
263 }
264 
265 bool threading_control::release(bool is_public, bool blocking_terminate) {
266     bool do_release = false;
267     {
268         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
269         if (blocking_terminate) {
270             __TBB_ASSERT(is_public, "Only an object with a public reference can request the blocking terminate");
271             wait_last_reference(lock);
272         }
273         do_release = remove_ref(is_public);
274     }
275 
276     if (do_release) {
277         __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), "No public references must remain if we remove the threading control.");
278         // inform RML that blocking termination is required
279         my_pimpl->release(blocking_terminate);
280         return blocking_terminate;
281     }
282     return false;
283 }
284 
285 threading_control::threading_control(unsigned public_ref, unsigned ref) : my_public_ref_count(public_ref), my_ref_count(ref)
286 {}
287 
288 threading_control* threading_control::register_public_reference() {
289     threading_control* control{nullptr};
290     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
291     control = get_threading_control(/*public = */ true);
292     if (!control) {
293         // We are going to create threading_control_impl, we should acquire mutexes in right order
294         lock.release();
295         control = create_threading_control();
296     }
297 
298     return control;
299 }
300 
301 bool threading_control::unregister_public_reference(bool blocking_terminate) {
302     __TBB_ASSERT(g_threading_control, "Threading control should exist until last public reference");
303     __TBB_ASSERT(g_threading_control->my_public_ref_count.load(std::memory_order_relaxed), nullptr);
304     return g_threading_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
305 }
306 
307 threading_control_client threading_control::create_client(arena& a) {
308     {
309         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
310         add_ref(/*public = */ false);
311     }
312 
313     return my_pimpl->create_client(a);
314 }
315 
316 void threading_control::publish_client(threading_control_client client, d1::constraints& constraints) {
317     return my_pimpl->publish_client(client, constraints);
318 }
319 
320 threading_control::client_snapshot threading_control::prepare_client_destruction(threading_control_client client) {
321     return my_pimpl->prepare_client_destruction(client);
322 }
323 
324 bool threading_control::try_destroy_client(threading_control::client_snapshot deleter) {
325     bool res = my_pimpl->try_destroy_client(deleter);
326     if (res) {
327         release(/*public = */ false, /*blocking_terminate = */ false);
328     }
329     return res;
330 }
331 
332 void threading_control::set_active_num_workers(unsigned soft_limit) {
333     threading_control* thr_control = get_threading_control(/*public = */ false);
334     if (thr_control != nullptr) {
335         thr_control->my_pimpl->set_active_num_workers(soft_limit);
336         thr_control->release(/*is_public=*/false, /*blocking_terminate=*/false);
337     }
338 }
339 
340 bool threading_control::is_present() {
341     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
342     return g_threading_control != nullptr;
343 }
344 
345 bool threading_control::register_lifetime_control() {
346     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
347     return get_threading_control(/*public = */ true) != nullptr;
348 }
349 
350 bool threading_control::unregister_lifetime_control(bool blocking_terminate) {
351     threading_control* thr_control{nullptr};
352     {
353         global_mutex_type::scoped_lock lock(g_threading_control_mutex);
354         thr_control = g_threading_control;
355     }
356 
357     bool released{true};
358     if (thr_control) {
359         released = thr_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate);
360     }
361 
362     return released;
363 }
364 
365 void threading_control::register_thread(thread_data& td) {
366     my_pimpl->register_thread(td);
367 }
368 
369 void threading_control::unregister_thread(thread_data& td) {
370     my_pimpl->unregister_thread(td);
371 }
372 
373 void threading_control::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state,
374                                                    d1::task_group_context& src, uint32_t new_state)
375 {
376     my_pimpl->propagate_task_group_state(mptr_state, src, new_state);
377 }
378 
379 std::size_t threading_control::worker_stack_size() {
380     return my_pimpl->worker_stack_size();
381 }
382 
383 unsigned threading_control::max_num_workers() {
384     global_mutex_type::scoped_lock lock(g_threading_control_mutex);
385     return g_threading_control ? g_threading_control->my_pimpl->max_num_workers() : 0;
386 }
387 
388 void threading_control::adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta) {
389     my_pimpl->adjust_demand(client, mandatory_delta, workers_delta);
390 }
391 
392 thread_control_monitor& threading_control::get_waiting_threads_monitor() {
393     return my_pimpl->get_waiting_threads_monitor();
394 }
395 
396 } // r1
397 } // detail
398 } // tbb
399