1 /* 2 Copyright (c) 2022-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "misc.h" 18 #include "thread_request_serializer.h" 19 20 namespace tbb { 21 namespace detail { 22 namespace r1 { 23 24 thread_request_serializer::thread_request_serializer(thread_dispatcher& td, int soft_limit) 25 : my_thread_dispatcher(td) 26 , my_soft_limit(soft_limit) 27 {} 28 29 void thread_request_serializer::update(int delta) { 30 constexpr std::uint64_t delta_mask = (pending_delta_base << 1) - 1; 31 constexpr std::uint64_t counter_value = delta_mask + 1; 32 33 int prev_pending_delta = my_pending_delta.fetch_add(counter_value + delta); 34 35 // There is a pseudo request aggregator, so only thread that see pending_delta_base in my_pending_delta 36 // Will enter to critical section and call adjust_job_count_estimate 37 if (prev_pending_delta == pending_delta_base) { 38 delta = int(my_pending_delta.exchange(pending_delta_base) & delta_mask) - int(pending_delta_base); 39 mutex_type::scoped_lock lock(my_mutex); 40 my_total_request += delta; 41 delta = limit_delta(delta, my_soft_limit, my_total_request); 42 my_thread_dispatcher.adjust_job_count_estimate(delta); 43 } 44 } 45 46 void thread_request_serializer::set_active_num_workers(int soft_limit) { 47 mutex_type::scoped_lock lock(my_mutex); 48 int delta = soft_limit - my_soft_limit; 49 delta = limit_delta(delta, my_total_request, soft_limit); 50 my_thread_dispatcher.adjust_job_count_estimate(delta); 51 my_soft_limit = soft_limit; 52 } 53 54 int thread_request_serializer::limit_delta(int delta, int limit, int new_value) { 55 // This method can be described with such pseudocode: 56 // bool above_limit = prev_value >= limit && new_value >= limit; 57 // bool below_limit = prev_value <= limit && new_value <= limit; 58 // enum request_type { ABOVE_LIMIT, CROSS_LIMIT, BELOW_LIMIT }; 59 // request = above_limit ? ABOVE_LIMIT : below_limit ? BELOW_LIMIT : CROSS_LIMIT; 60 61 // switch (request) { 62 // case ABOVE_LIMIT: 63 // delta = 0; 64 // case CROSS_LIMIT: 65 // delta = delta > 0 ? limit - prev_value : new_value - limit; 66 // case BELOW_LIMIT: 67 // // No changes to delta 68 // } 69 70 int prev_value = new_value - delta; 71 72 // actual new_value and prev_value cannot exceed the limit 73 new_value = min(limit, new_value); 74 prev_value = min(limit, prev_value); 75 return new_value - prev_value; 76 } 77 78 79 thread_request_serializer_proxy::thread_request_serializer_proxy(thread_dispatcher& td, int soft_limit) : my_serializer(td, soft_limit) 80 {} 81 82 void thread_request_serializer_proxy::register_mandatory_request(int mandatory_delta) { 83 if (mandatory_delta != 0) { 84 mutex_type::scoped_lock lock(my_mutex, /* is_write = */ false); 85 int prev_value = my_num_mandatory_requests.fetch_add(mandatory_delta); 86 87 const bool should_try_enable = mandatory_delta > 0 && prev_value == 0; 88 const bool should_try_disable = mandatory_delta < 0 && prev_value == 1; 89 90 if (should_try_enable) { 91 enable_mandatory_concurrency(lock); 92 } else if (should_try_disable) { 93 disable_mandatory_concurrency(lock); 94 } 95 } 96 } 97 98 void thread_request_serializer_proxy::set_active_num_workers(int soft_limit) { 99 mutex_type::scoped_lock lock(my_mutex, /* is_write = */ true); 100 101 if (soft_limit != 0) { 102 my_is_mandatory_concurrency_enabled = false; 103 my_serializer.set_active_num_workers(soft_limit); 104 } else { 105 if (my_num_mandatory_requests > 0 && !my_is_mandatory_concurrency_enabled) { 106 my_is_mandatory_concurrency_enabled = true; 107 my_serializer.set_active_num_workers(1); 108 } 109 } 110 } 111 112 void thread_request_serializer_proxy::update(int delta) { my_serializer.update(delta); } 113 114 void thread_request_serializer_proxy::enable_mandatory_concurrency(mutex_type::scoped_lock& lock) { 115 lock.upgrade_to_writer(); 116 bool still_should_enable = my_num_mandatory_requests.load(std::memory_order_relaxed) > 0 && 117 !my_is_mandatory_concurrency_enabled && my_serializer.is_no_workers_avaliable(); 118 119 if (still_should_enable) { 120 my_is_mandatory_concurrency_enabled = true; 121 my_serializer.set_active_num_workers(1); 122 } 123 } 124 125 void thread_request_serializer_proxy::disable_mandatory_concurrency(mutex_type::scoped_lock& lock) { 126 lock.upgrade_to_writer(); 127 bool still_should_disable = my_num_mandatory_requests.load(std::memory_order_relaxed) <= 0 && 128 my_is_mandatory_concurrency_enabled && !my_serializer.is_no_workers_avaliable(); 129 130 if (still_should_disable) { 131 my_is_mandatory_concurrency_enabled = false; 132 my_serializer.set_active_num_workers(0); 133 } 134 } 135 136 } // r1 137 } // detail 138 } // tbb 139