1 /* 2 Copyright (c) 2022-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "threading_control.h" 18 #include "permit_manager.h" 19 #include "market.h" 20 #include "tcm_adaptor.h" 21 #include "thread_dispatcher.h" 22 #include "governor.h" 23 #include "thread_dispatcher_client.h" 24 25 namespace tbb { 26 namespace detail { 27 namespace r1 { 28 29 // ---------------------------------------- threading_control_impl -------------------------------------------------------------- 30 31 std::size_t global_control_active_value_unsafe(d1::global_control::parameter); 32 33 std::pair<unsigned, unsigned> threading_control_impl::calculate_workers_limits() { 34 // Expecting that 4P is suitable for most applications. 35 // Limit to 2P for large thread number. 36 // TODO: ask RML for max concurrency and possibly correct hard_limit 37 unsigned factor = governor::default_num_threads() <= 128 ? 4 : 2; 38 39 // The requested number of threads is intentionally not considered in 40 // computation of the hard limit, in order to separate responsibilities 41 // and avoid complicated interactions between global_control and task_scheduler_init. 42 // The threading control guarantees that at least 256 threads might be created. 43 unsigned workers_app_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism); 44 unsigned workers_hard_limit = max(max(factor * governor::default_num_threads(), 256u), workers_app_limit); 45 unsigned workers_soft_limit = calc_workers_soft_limit(workers_hard_limit); 46 47 return std::make_pair(workers_soft_limit, workers_hard_limit); 48 } 49 50 unsigned threading_control_impl::calc_workers_soft_limit(unsigned workers_hard_limit) { 51 unsigned workers_soft_limit{}; 52 unsigned soft_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism); 53 54 // if user set no limits (yet), use default value 55 workers_soft_limit = soft_limit != 0 ? soft_limit - 1 : governor::default_num_threads() - 1; 56 57 if (workers_soft_limit >= workers_hard_limit) { 58 workers_soft_limit = workers_hard_limit - 1; 59 } 60 61 return workers_soft_limit; 62 } 63 64 cache_aligned_unique_ptr<permit_manager> threading_control_impl::make_permit_manager(unsigned workers_soft_limit) { 65 if (tcm_adaptor::is_initialized()) { 66 auto tcm = make_cache_aligned_unique<tcm_adaptor>(); 67 if (tcm->is_connected()) { 68 return tcm; 69 } 70 } 71 return make_cache_aligned_unique<market>(workers_soft_limit); 72 } 73 74 cache_aligned_unique_ptr<thread_dispatcher> threading_control_impl::make_thread_dispatcher(threading_control& tc, 75 unsigned workers_soft_limit, 76 unsigned workers_hard_limit) 77 { 78 stack_size_type stack_size = global_control_active_value_unsafe(global_control::thread_stack_size); 79 80 cache_aligned_unique_ptr<thread_dispatcher> td = 81 make_cache_aligned_unique<thread_dispatcher>(tc, workers_hard_limit, stack_size); 82 // This check relies on the fact that for shared RML default_concurrency == max_concurrency 83 if (!governor::UsePrivateRML && td->my_server->default_concurrency() < workers_soft_limit) { 84 runtime_warning("RML might limit the number of workers to %u while %u is requested.\n", 85 td->my_server->default_concurrency(), workers_soft_limit); 86 } 87 88 return td; 89 } 90 91 threading_control_impl::threading_control_impl(threading_control* tc) { 92 unsigned workers_soft_limit{}, workers_hard_limit{}; 93 std::tie(workers_soft_limit, workers_hard_limit) = calculate_workers_limits(); 94 95 my_permit_manager = make_permit_manager(workers_soft_limit); 96 my_thread_dispatcher = make_thread_dispatcher(*tc, workers_soft_limit, workers_hard_limit); 97 my_thread_request_serializer = 98 make_cache_aligned_unique<thread_request_serializer_proxy>(*my_thread_dispatcher, workers_soft_limit); 99 my_permit_manager->set_thread_request_observer(*my_thread_request_serializer); 100 101 my_cancellation_disseminator = make_cache_aligned_unique<cancellation_disseminator>(); 102 my_waiting_threads_monitor = make_cache_aligned_unique<thread_control_monitor>(); 103 } 104 105 void threading_control_impl::release(bool blocking_terminate) { 106 my_thread_dispatcher->release(blocking_terminate); 107 } 108 109 void threading_control_impl::set_active_num_workers(unsigned soft_limit) { 110 __TBB_ASSERT(soft_limit <= my_thread_dispatcher->my_num_workers_hard_limit, nullptr); 111 my_thread_request_serializer->set_active_num_workers(soft_limit); 112 my_permit_manager->set_active_num_workers(soft_limit); 113 } 114 115 threading_control_client threading_control_impl::create_client(arena& a) { 116 pm_client* pm_client = my_permit_manager->create_client(a); 117 thread_dispatcher_client* td_client = my_thread_dispatcher->create_client(a); 118 119 return threading_control_client{pm_client, td_client}; 120 } 121 122 threading_control_impl::client_snapshot threading_control_impl::prepare_client_destruction(threading_control_client client) { 123 auto td_client = client.get_thread_dispatcher_client(); 124 return {td_client->get_aba_epoch(), td_client->priority_level(), td_client, client.get_pm_client()}; 125 } 126 127 bool threading_control_impl::try_destroy_client(threading_control_impl::client_snapshot snapshot) { 128 if (my_thread_dispatcher->try_unregister_client(snapshot.my_td_client, snapshot.aba_epoch, snapshot.priority_level)) { 129 my_permit_manager->unregister_and_destroy_client(*snapshot.my_pm_client); 130 return true; 131 } 132 return false; 133 } 134 135 void threading_control_impl::publish_client(threading_control_client tc_client, d1::constraints& constraints) { 136 my_permit_manager->register_client(tc_client.get_pm_client(), constraints); 137 my_thread_dispatcher->register_client(tc_client.get_thread_dispatcher_client()); 138 } 139 140 void threading_control_impl::register_thread(thread_data& td) { 141 my_cancellation_disseminator->register_thread(td); 142 } 143 void threading_control_impl::unregister_thread(thread_data& td) { 144 my_cancellation_disseminator->unregister_thread(td); 145 } 146 147 void threading_control_impl::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state, 148 d1::task_group_context& src, uint32_t new_state) 149 { 150 my_cancellation_disseminator->propagate_task_group_state(mptr_state, src, new_state); 151 } 152 153 std::size_t threading_control_impl::worker_stack_size() { 154 return my_thread_dispatcher->worker_stack_size(); 155 } 156 157 unsigned threading_control_impl::max_num_workers() { 158 return my_thread_dispatcher->my_num_workers_hard_limit; 159 } 160 161 void threading_control_impl::adjust_demand(threading_control_client tc_client, int mandatory_delta, int workers_delta) { 162 auto& c = *tc_client.get_pm_client(); 163 my_thread_request_serializer->register_mandatory_request(mandatory_delta); 164 my_permit_manager->adjust_demand(c, mandatory_delta, workers_delta); 165 } 166 167 thread_control_monitor& threading_control_impl::get_waiting_threads_monitor() { 168 return *my_waiting_threads_monitor; 169 } 170 171 // ---------------------------------------- threading_control ------------------------------------------------------------------- 172 173 // Defined in global_control.cpp 174 void global_control_lock(); 175 void global_control_unlock(); 176 177 void threading_control::add_ref(bool is_public) { 178 ++my_ref_count; 179 if (is_public) { 180 my_public_ref_count++; 181 } 182 } 183 184 bool threading_control::remove_ref(bool is_public) { 185 if (is_public) { 186 __TBB_ASSERT(g_threading_control == this, "Global threading control instance was destroyed prematurely?"); 187 __TBB_ASSERT(my_public_ref_count.load(std::memory_order_relaxed), nullptr); 188 --my_public_ref_count; 189 } 190 191 bool is_last_ref = --my_ref_count == 0; 192 if (is_last_ref) { 193 __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), nullptr); 194 g_threading_control = nullptr; 195 } 196 197 return is_last_ref; 198 } 199 200 threading_control* threading_control::get_threading_control(bool is_public) { 201 threading_control* control = g_threading_control; 202 if (control) { 203 control->add_ref(is_public); 204 } 205 206 return control; 207 } 208 209 threading_control* threading_control::create_threading_control() { 210 // Global control should be locked before threading_control_impl 211 global_control_lock(); 212 213 threading_control* thr_control{ nullptr }; 214 try_call([&] { 215 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 216 217 thr_control = get_threading_control(/*public = */ true); 218 if (thr_control == nullptr) { 219 thr_control = new (cache_aligned_allocate(sizeof(threading_control))) threading_control(/*public_ref = */ 1, /*private_ref = */ 1); 220 thr_control->my_pimpl = make_cache_aligned_unique<threading_control_impl>(thr_control); 221 222 __TBB_InitOnce::add_ref(); 223 224 if (global_control_active_value_unsafe(global_control::scheduler_handle)) { 225 ++thr_control->my_public_ref_count; 226 ++thr_control->my_ref_count; 227 } 228 229 g_threading_control = thr_control; 230 } 231 }).on_exception([&] { 232 global_control_unlock(); 233 234 cache_aligned_deleter deleter{}; 235 deleter(thr_control); 236 }); 237 238 global_control_unlock(); 239 return thr_control; 240 } 241 242 void threading_control::destroy () { 243 cache_aligned_deleter deleter; 244 deleter(this); 245 __TBB_InitOnce::remove_ref(); 246 } 247 248 void threading_control::wait_last_reference(global_mutex_type::scoped_lock& lock) { 249 while (my_public_ref_count.load(std::memory_order_relaxed) == 1 && my_ref_count.load(std::memory_order_relaxed) > 1) { 250 lock.release(); 251 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all 252 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created. 253 // Theoretically, new private references to the threading control can be added during waiting making it potentially 254 // endless. 255 // TODO: revise why the weak scheduler needs threading control's pointer and try to remove this wait. 256 // Note that the threading control should know about its schedulers for cancellation/exception/priority propagation, 257 // see e.g. task_group_context::cancel_group_execution() 258 while (my_public_ref_count.load(std::memory_order_acquire) == 1 && my_ref_count.load(std::memory_order_acquire) > 1) { 259 yield(); 260 } 261 lock.acquire(g_threading_control_mutex); 262 } 263 } 264 265 bool threading_control::release(bool is_public, bool blocking_terminate) { 266 bool do_release = false; 267 { 268 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 269 if (blocking_terminate) { 270 __TBB_ASSERT(is_public, "Only an object with a public reference can request the blocking terminate"); 271 wait_last_reference(lock); 272 } 273 do_release = remove_ref(is_public); 274 } 275 276 if (do_release) { 277 __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), "No public references must remain if we remove the threading control."); 278 // inform RML that blocking termination is required 279 my_pimpl->release(blocking_terminate); 280 return blocking_terminate; 281 } 282 return false; 283 } 284 285 threading_control::threading_control(unsigned public_ref, unsigned ref) : my_public_ref_count(public_ref), my_ref_count(ref) 286 {} 287 288 threading_control* threading_control::register_public_reference() { 289 threading_control* control{nullptr}; 290 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 291 control = get_threading_control(/*public = */ true); 292 if (!control) { 293 // We are going to create threading_control_impl, we should acquire mutexes in right order 294 lock.release(); 295 control = create_threading_control(); 296 } 297 298 return control; 299 } 300 301 bool threading_control::unregister_public_reference(bool blocking_terminate) { 302 __TBB_ASSERT(g_threading_control, "Threading control should exist until last public reference"); 303 __TBB_ASSERT(g_threading_control->my_public_ref_count.load(std::memory_order_relaxed), nullptr); 304 return g_threading_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate); 305 } 306 307 threading_control_client threading_control::create_client(arena& a) { 308 { 309 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 310 add_ref(/*public = */ false); 311 } 312 313 return my_pimpl->create_client(a); 314 } 315 316 void threading_control::publish_client(threading_control_client client, d1::constraints& constraints) { 317 return my_pimpl->publish_client(client, constraints); 318 } 319 320 threading_control::client_snapshot threading_control::prepare_client_destruction(threading_control_client client) { 321 return my_pimpl->prepare_client_destruction(client); 322 } 323 324 bool threading_control::try_destroy_client(threading_control::client_snapshot deleter) { 325 bool res = my_pimpl->try_destroy_client(deleter); 326 if (res) { 327 release(/*public = */ false, /*blocking_terminate = */ false); 328 } 329 return res; 330 } 331 332 void threading_control::set_active_num_workers(unsigned soft_limit) { 333 threading_control* thr_control = get_threading_control(/*public = */ false); 334 if (thr_control != nullptr) { 335 thr_control->my_pimpl->set_active_num_workers(soft_limit); 336 thr_control->release(/*is_public=*/false, /*blocking_terminate=*/false); 337 } 338 } 339 340 bool threading_control::is_present() { 341 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 342 return g_threading_control != nullptr; 343 } 344 345 bool threading_control::register_lifetime_control() { 346 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 347 return get_threading_control(/*public = */ true) != nullptr; 348 } 349 350 bool threading_control::unregister_lifetime_control(bool blocking_terminate) { 351 threading_control* thr_control{nullptr}; 352 { 353 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 354 thr_control = g_threading_control; 355 } 356 357 bool released{true}; 358 if (thr_control) { 359 released = thr_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate); 360 } 361 362 return released; 363 } 364 365 void threading_control::register_thread(thread_data& td) { 366 my_pimpl->register_thread(td); 367 } 368 369 void threading_control::unregister_thread(thread_data& td) { 370 my_pimpl->unregister_thread(td); 371 } 372 373 void threading_control::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state, 374 d1::task_group_context& src, uint32_t new_state) 375 { 376 my_pimpl->propagate_task_group_state(mptr_state, src, new_state); 377 } 378 379 std::size_t threading_control::worker_stack_size() { 380 return my_pimpl->worker_stack_size(); 381 } 382 383 unsigned threading_control::max_num_workers() { 384 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 385 return g_threading_control ? g_threading_control->my_pimpl->max_num_workers() : 0; 386 } 387 388 void threading_control::adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta) { 389 my_pimpl->adjust_demand(client, mandatory_delta, workers_delta); 390 } 391 392 thread_control_monitor& threading_control::get_waiting_threads_monitor() { 393 return my_pimpl->get_waiting_threads_monitor(); 394 } 395 396 } // r1 397 } // detail 398 } // tbb 399