1 /* 2 Copyright (c) 2022-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "threading_control.h" 18 #include "permit_manager.h" 19 #include "market.h" 20 #include "thread_dispatcher.h" 21 #include "governor.h" 22 #include "thread_dispatcher_client.h" 23 24 namespace tbb { 25 namespace detail { 26 namespace r1 { 27 28 // ---------------------------------------- threading_control_impl -------------------------------------------------------------- 29 30 std::size_t global_control_active_value_unsafe(d1::global_control::parameter); 31 32 std::pair<unsigned, unsigned> threading_control_impl::calculate_workers_limits() { 33 // Expecting that 4P is suitable for most applications. 34 // Limit to 2P for large thread number. 35 // TODO: ask RML for max concurrency and possibly correct hard_limit 36 unsigned factor = governor::default_num_threads() <= 128 ? 4 : 2; 37 38 // The requested number of threads is intentionally not considered in 39 // computation of the hard limit, in order to separate responsibilities 40 // and avoid complicated interactions between global_control and task_scheduler_init. 41 // The threading control guarantees that at least 256 threads might be created. 42 unsigned workers_app_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism); 43 unsigned workers_hard_limit = max(max(factor * governor::default_num_threads(), 256u), workers_app_limit); 44 unsigned workers_soft_limit = calc_workers_soft_limit(workers_hard_limit); 45 46 return std::make_pair(workers_soft_limit, workers_hard_limit); 47 } 48 49 unsigned threading_control_impl::calc_workers_soft_limit(unsigned workers_hard_limit) { 50 unsigned workers_soft_limit{}; 51 unsigned soft_limit = global_control_active_value_unsafe(global_control::max_allowed_parallelism); 52 53 // if user set no limits (yet), use default value 54 workers_soft_limit = soft_limit != 0 ? soft_limit - 1 : governor::default_num_threads() - 1; 55 56 if (workers_soft_limit >= workers_hard_limit) { 57 workers_soft_limit = workers_hard_limit - 1; 58 } 59 60 return workers_soft_limit; 61 } 62 63 cache_aligned_unique_ptr<permit_manager> threading_control_impl::make_permit_manager(unsigned workers_soft_limit) { 64 return make_cache_aligned_unique<market>(workers_soft_limit); 65 } 66 67 cache_aligned_unique_ptr<thread_dispatcher> threading_control_impl::make_thread_dispatcher(threading_control& tc, 68 unsigned workers_soft_limit, 69 unsigned workers_hard_limit) 70 { 71 stack_size_type stack_size = global_control_active_value_unsafe(global_control::thread_stack_size); 72 73 cache_aligned_unique_ptr<thread_dispatcher> td = 74 make_cache_aligned_unique<thread_dispatcher>(tc, workers_hard_limit, stack_size); 75 // This check relies on the fact that for shared RML default_concurrency == max_concurrency 76 if (!governor::UsePrivateRML && td->my_server->default_concurrency() < workers_soft_limit) { 77 runtime_warning("RML might limit the number of workers to %u while %u is requested.\n", 78 td->my_server->default_concurrency(), workers_soft_limit); 79 } 80 81 return td; 82 } 83 84 threading_control_impl::threading_control_impl(threading_control* tc) { 85 unsigned workers_soft_limit{}, workers_hard_limit{}; 86 std::tie(workers_soft_limit, workers_hard_limit) = calculate_workers_limits(); 87 88 my_permit_manager = make_permit_manager(workers_soft_limit); 89 my_thread_dispatcher = make_thread_dispatcher(*tc, workers_soft_limit, workers_hard_limit); 90 my_thread_request_serializer = 91 make_cache_aligned_unique<thread_request_serializer_proxy>(*my_thread_dispatcher, workers_soft_limit); 92 my_permit_manager->set_thread_request_observer(*my_thread_request_serializer); 93 94 my_cancellation_disseminator = make_cache_aligned_unique<cancellation_disseminator>(); 95 my_waiting_threads_monitor = make_cache_aligned_unique<thread_control_monitor>(); 96 } 97 98 void threading_control_impl::release(bool blocking_terminate) { 99 my_thread_dispatcher->release(blocking_terminate); 100 } 101 102 void threading_control_impl::set_active_num_workers(unsigned soft_limit) { 103 __TBB_ASSERT(soft_limit <= my_thread_dispatcher->my_num_workers_hard_limit, nullptr); 104 my_thread_request_serializer->set_active_num_workers(soft_limit); 105 my_permit_manager->set_active_num_workers(soft_limit); 106 } 107 108 threading_control_client threading_control_impl::create_client(arena& a) { 109 pm_client* pm_client = my_permit_manager->create_client(a); 110 thread_dispatcher_client* td_client = my_thread_dispatcher->create_client(a); 111 112 return threading_control_client{pm_client, td_client}; 113 } 114 115 threading_control_impl::client_snapshot threading_control_impl::prepare_client_destruction(threading_control_client client) { 116 auto td_client = client.get_thread_dispatcher_client(); 117 return {td_client->get_aba_epoch(), td_client->priority_level(), td_client, client.get_pm_client()}; 118 } 119 120 bool threading_control_impl::try_destroy_client(threading_control_impl::client_snapshot snapshot) { 121 if (my_thread_dispatcher->try_unregister_client(snapshot.my_td_client, snapshot.aba_epoch, snapshot.priority_level)) { 122 my_permit_manager->unregister_and_destroy_client(*snapshot.my_pm_client); 123 return true; 124 } 125 return false; 126 } 127 128 void threading_control_impl::publish_client(threading_control_client tc_client) { 129 my_permit_manager->register_client(tc_client.get_pm_client()); 130 my_thread_dispatcher->register_client(tc_client.get_thread_dispatcher_client()); 131 } 132 133 void threading_control_impl::register_thread(thread_data& td) { 134 my_cancellation_disseminator->register_thread(td); 135 } 136 void threading_control_impl::unregister_thread(thread_data& td) { 137 my_cancellation_disseminator->unregister_thread(td); 138 } 139 140 void threading_control_impl::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state, 141 d1::task_group_context& src, uint32_t new_state) 142 { 143 my_cancellation_disseminator->propagate_task_group_state(mptr_state, src, new_state); 144 } 145 146 std::size_t threading_control_impl::worker_stack_size() { 147 return my_thread_dispatcher->worker_stack_size(); 148 } 149 150 unsigned threading_control_impl::max_num_workers() { 151 return my_thread_dispatcher->my_num_workers_hard_limit; 152 } 153 154 void threading_control_impl::adjust_demand(threading_control_client tc_client, int mandatory_delta, int workers_delta) { 155 auto& c = *tc_client.get_pm_client(); 156 my_thread_request_serializer->register_mandatory_request(mandatory_delta); 157 my_permit_manager->adjust_demand(c, mandatory_delta, workers_delta); 158 } 159 160 thread_control_monitor& threading_control_impl::get_waiting_threads_monitor() { 161 return *my_waiting_threads_monitor; 162 } 163 164 // ---------------------------------------- threading_control ------------------------------------------------------------------- 165 166 // Defined in global_control.cpp 167 void global_control_lock(); 168 void global_control_unlock(); 169 170 void threading_control::add_ref(bool is_public) { 171 ++my_ref_count; 172 if (is_public) { 173 my_public_ref_count++; 174 } 175 } 176 177 bool threading_control::remove_ref(bool is_public) { 178 if (is_public) { 179 __TBB_ASSERT(g_threading_control == this, "Global threading control instance was destroyed prematurely?"); 180 __TBB_ASSERT(my_public_ref_count.load(std::memory_order_relaxed), nullptr); 181 --my_public_ref_count; 182 } 183 184 bool is_last_ref = --my_ref_count == 0; 185 if (is_last_ref) { 186 __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), nullptr); 187 g_threading_control = nullptr; 188 } 189 190 return is_last_ref; 191 } 192 193 threading_control* threading_control::get_threading_control(bool is_public) { 194 threading_control* control = g_threading_control; 195 if (control) { 196 control->add_ref(is_public); 197 } 198 199 return control; 200 } 201 202 threading_control* threading_control::create_threading_control() { 203 // Global control should be locked before threading_control_impl 204 global_control_lock(); 205 206 threading_control* thr_control{ nullptr }; 207 try_call([&] { 208 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 209 210 thr_control = get_threading_control(/*public = */ true); 211 if (thr_control == nullptr) { 212 thr_control = new (cache_aligned_allocate(sizeof(threading_control))) threading_control(/*public_ref = */ 1, /*private_ref = */ 1); 213 thr_control->my_pimpl = make_cache_aligned_unique<threading_control_impl>(thr_control); 214 215 __TBB_InitOnce::add_ref(); 216 217 if (global_control_active_value_unsafe(global_control::scheduler_handle)) { 218 ++thr_control->my_public_ref_count; 219 ++thr_control->my_ref_count; 220 } 221 222 g_threading_control = thr_control; 223 } 224 }).on_exception([&] { 225 global_control_unlock(); 226 227 cache_aligned_deleter deleter{}; 228 deleter(thr_control); 229 }); 230 231 global_control_unlock(); 232 return thr_control; 233 } 234 235 void threading_control::destroy () { 236 cache_aligned_deleter deleter; 237 deleter(this); 238 __TBB_InitOnce::remove_ref(); 239 } 240 241 void threading_control::wait_last_reference(global_mutex_type::scoped_lock& lock) { 242 while (my_public_ref_count.load(std::memory_order_relaxed) == 1 && my_ref_count.load(std::memory_order_relaxed) > 1) { 243 lock.release(); 244 // To guarantee that request_close_connection() is called by the last external thread, we need to wait till all 245 // references are released. Re-read my_public_ref_count to limit waiting if new external threads are created. 246 // Theoretically, new private references to the threading control can be added during waiting making it potentially 247 // endless. 248 // TODO: revise why the weak scheduler needs threading control's pointer and try to remove this wait. 249 // Note that the threading control should know about its schedulers for cancellation/exception/priority propagation, 250 // see e.g. task_group_context::cancel_group_execution() 251 while (my_public_ref_count.load(std::memory_order_acquire) == 1 && my_ref_count.load(std::memory_order_acquire) > 1) { 252 yield(); 253 } 254 lock.acquire(g_threading_control_mutex); 255 } 256 } 257 258 bool threading_control::release(bool is_public, bool blocking_terminate) { 259 bool do_release = false; 260 { 261 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 262 if (blocking_terminate) { 263 __TBB_ASSERT(is_public, "Only an object with a public reference can request the blocking terminate"); 264 wait_last_reference(lock); 265 } 266 do_release = remove_ref(is_public); 267 } 268 269 if (do_release) { 270 __TBB_ASSERT(!my_public_ref_count.load(std::memory_order_relaxed), "No public references must remain if we remove the threading control."); 271 // inform RML that blocking termination is required 272 my_pimpl->release(blocking_terminate); 273 return blocking_terminate; 274 } 275 return false; 276 } 277 278 threading_control::threading_control(unsigned public_ref, unsigned ref) : my_public_ref_count(public_ref), my_ref_count(ref) 279 {} 280 281 threading_control* threading_control::register_public_reference() { 282 threading_control* control{nullptr}; 283 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 284 control = get_threading_control(/*public = */ true); 285 if (!control) { 286 // We are going to create threading_control_impl, we should acquire mutexes in right order 287 lock.release(); 288 control = create_threading_control(); 289 } 290 291 return control; 292 } 293 294 bool threading_control::unregister_public_reference(bool blocking_terminate) { 295 __TBB_ASSERT(g_threading_control, "Threading control should exist until last public reference"); 296 __TBB_ASSERT(g_threading_control->my_public_ref_count.load(std::memory_order_relaxed), nullptr); 297 return g_threading_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate); 298 } 299 300 threading_control_client threading_control::create_client(arena& a) { 301 { 302 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 303 add_ref(/*public = */ false); 304 } 305 306 return my_pimpl->create_client(a); 307 } 308 309 void threading_control::publish_client(threading_control_client client) { 310 return my_pimpl->publish_client(client); 311 } 312 313 threading_control::client_snapshot threading_control::prepare_client_destruction(threading_control_client client) { 314 return my_pimpl->prepare_client_destruction(client); 315 } 316 317 bool threading_control::try_destroy_client(threading_control::client_snapshot deleter) { 318 bool res = my_pimpl->try_destroy_client(deleter); 319 if (res) { 320 release(/*public = */ false, /*blocking_terminate = */ false); 321 } 322 return res; 323 } 324 325 void threading_control::set_active_num_workers(unsigned soft_limit) { 326 threading_control* thr_control = get_threading_control(/*public = */ false); 327 if (thr_control != nullptr) { 328 thr_control->my_pimpl->set_active_num_workers(soft_limit); 329 thr_control->release(/*is_public=*/false, /*blocking_terminate=*/false); 330 } 331 } 332 333 bool threading_control::is_present() { 334 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 335 return g_threading_control != nullptr; 336 } 337 338 bool threading_control::register_lifetime_control() { 339 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 340 return get_threading_control(/*public = */ true) != nullptr; 341 } 342 343 bool threading_control::unregister_lifetime_control(bool blocking_terminate) { 344 threading_control* thr_control{nullptr}; 345 { 346 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 347 thr_control = g_threading_control; 348 } 349 350 bool released{true}; 351 if (thr_control) { 352 released = thr_control->release(/*public = */ true, /*blocking_terminate = */ blocking_terminate); 353 } 354 355 return released; 356 } 357 358 void threading_control::register_thread(thread_data& td) { 359 my_pimpl->register_thread(td); 360 } 361 362 void threading_control::unregister_thread(thread_data& td) { 363 my_pimpl->unregister_thread(td); 364 } 365 366 void threading_control::propagate_task_group_state(std::atomic<uint32_t> d1::task_group_context::*mptr_state, 367 d1::task_group_context& src, uint32_t new_state) 368 { 369 my_pimpl->propagate_task_group_state(mptr_state, src, new_state); 370 } 371 372 std::size_t threading_control::worker_stack_size() { 373 return my_pimpl->worker_stack_size(); 374 } 375 376 unsigned threading_control::max_num_workers() { 377 global_mutex_type::scoped_lock lock(g_threading_control_mutex); 378 return g_threading_control ? g_threading_control->my_pimpl->max_num_workers() : 0; 379 } 380 381 void threading_control::adjust_demand(threading_control_client client, int mandatory_delta, int workers_delta) { 382 my_pimpl->adjust_demand(client, mandatory_delta, workers_delta); 383 } 384 385 thread_control_monitor& threading_control::get_waiting_threads_monitor() { 386 return my_pimpl->get_waiting_threads_monitor(); 387 } 388 389 } // r1 390 } // detail 391 } // tbb 392