1*c4568449SPavel Kumbrasev /*
2*c4568449SPavel Kumbrasev     Copyright (c) 2022-2023 Intel Corporation
3*c4568449SPavel Kumbrasev 
4*c4568449SPavel Kumbrasev     Licensed under the Apache License, Version 2.0 (the "License");
5*c4568449SPavel Kumbrasev     you may not use this file except in compliance with the License.
6*c4568449SPavel Kumbrasev     You may obtain a copy of the License at
7*c4568449SPavel Kumbrasev 
8*c4568449SPavel Kumbrasev         http://www.apache.org/licenses/LICENSE-2.0
9*c4568449SPavel Kumbrasev 
10*c4568449SPavel Kumbrasev     Unless required by applicable law or agreed to in writing, software
11*c4568449SPavel Kumbrasev     distributed under the License is distributed on an "AS IS" BASIS,
12*c4568449SPavel Kumbrasev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*c4568449SPavel Kumbrasev     See the License for the specific language governing permissions and
14*c4568449SPavel Kumbrasev     limitations under the License.
15*c4568449SPavel Kumbrasev */
16*c4568449SPavel Kumbrasev 
17*c4568449SPavel Kumbrasev #include "misc.h"
18*c4568449SPavel Kumbrasev #include "thread_request_serializer.h"
19*c4568449SPavel Kumbrasev 
20*c4568449SPavel Kumbrasev namespace tbb {
21*c4568449SPavel Kumbrasev namespace detail {
22*c4568449SPavel Kumbrasev namespace r1 {
23*c4568449SPavel Kumbrasev 
thread_request_serializer(thread_dispatcher & td,int soft_limit)24*c4568449SPavel Kumbrasev thread_request_serializer::thread_request_serializer(thread_dispatcher& td, int soft_limit)
25*c4568449SPavel Kumbrasev     : my_thread_dispatcher(td)
26*c4568449SPavel Kumbrasev     , my_soft_limit(soft_limit)
27*c4568449SPavel Kumbrasev {}
28*c4568449SPavel Kumbrasev 
update(int delta)29*c4568449SPavel Kumbrasev void thread_request_serializer::update(int delta) {
30*c4568449SPavel Kumbrasev     constexpr std::uint64_t delta_mask = (pending_delta_base << 1) - 1;
31*c4568449SPavel Kumbrasev     constexpr std::uint64_t counter_value = delta_mask + 1;
32*c4568449SPavel Kumbrasev 
33*c4568449SPavel Kumbrasev     int prev_pending_delta = my_pending_delta.fetch_add(counter_value + delta);
34*c4568449SPavel Kumbrasev 
35*c4568449SPavel Kumbrasev     // There is a pseudo request aggregator, so only thread that see pending_delta_base in my_pending_delta
36*c4568449SPavel Kumbrasev     // Will enter to critical section and call adjust_job_count_estimate
37*c4568449SPavel Kumbrasev     if (prev_pending_delta == pending_delta_base) {
38*c4568449SPavel Kumbrasev         delta = int(my_pending_delta.exchange(pending_delta_base) & delta_mask) - int(pending_delta_base);
39*c4568449SPavel Kumbrasev         mutex_type::scoped_lock lock(my_mutex);
40*c4568449SPavel Kumbrasev         my_total_request += delta;
41*c4568449SPavel Kumbrasev         delta = limit_delta(delta, my_soft_limit, my_total_request);
42*c4568449SPavel Kumbrasev         my_thread_dispatcher.adjust_job_count_estimate(delta);
43*c4568449SPavel Kumbrasev     }
44*c4568449SPavel Kumbrasev }
45*c4568449SPavel Kumbrasev 
set_active_num_workers(int soft_limit)46*c4568449SPavel Kumbrasev void thread_request_serializer::set_active_num_workers(int soft_limit) {
47*c4568449SPavel Kumbrasev     mutex_type::scoped_lock lock(my_mutex);
48*c4568449SPavel Kumbrasev     int delta = soft_limit - my_soft_limit;
49*c4568449SPavel Kumbrasev     delta = limit_delta(delta, my_total_request, soft_limit);
50*c4568449SPavel Kumbrasev     my_thread_dispatcher.adjust_job_count_estimate(delta);
51*c4568449SPavel Kumbrasev     my_soft_limit = soft_limit;
52*c4568449SPavel Kumbrasev }
53*c4568449SPavel Kumbrasev 
limit_delta(int delta,int limit,int new_value)54*c4568449SPavel Kumbrasev int thread_request_serializer::limit_delta(int delta, int limit, int new_value) {
55*c4568449SPavel Kumbrasev     // This method can be described with such pseudocode:
56*c4568449SPavel Kumbrasev     // bool above_limit = prev_value >= limit && new_value >= limit;
57*c4568449SPavel Kumbrasev     // bool below_limit = prev_value <= limit && new_value <= limit;
58*c4568449SPavel Kumbrasev     // enum request_type { ABOVE_LIMIT, CROSS_LIMIT, BELOW_LIMIT };
59*c4568449SPavel Kumbrasev     // request = above_limit ? ABOVE_LIMIT : below_limit ? BELOW_LIMIT : CROSS_LIMIT;
60*c4568449SPavel Kumbrasev 
61*c4568449SPavel Kumbrasev     // switch (request) {
62*c4568449SPavel Kumbrasev     // case ABOVE_LIMIT:
63*c4568449SPavel Kumbrasev     //     delta = 0;
64*c4568449SPavel Kumbrasev     // case CROSS_LIMIT:
65*c4568449SPavel Kumbrasev     //     delta = delta > 0 ? limit - prev_value : new_value - limit;
66*c4568449SPavel Kumbrasev     // case BELOW_LIMIT:
67*c4568449SPavel Kumbrasev     //     // No changes to delta
68*c4568449SPavel Kumbrasev     // }
69*c4568449SPavel Kumbrasev 
70*c4568449SPavel Kumbrasev    int prev_value = new_value - delta;
71*c4568449SPavel Kumbrasev 
72*c4568449SPavel Kumbrasev     // actual new_value and prev_value cannot exceed the limit
73*c4568449SPavel Kumbrasev     new_value = min(limit, new_value);
74*c4568449SPavel Kumbrasev     prev_value = min(limit, prev_value);
75*c4568449SPavel Kumbrasev     return new_value - prev_value;
76*c4568449SPavel Kumbrasev }
77*c4568449SPavel Kumbrasev 
78*c4568449SPavel Kumbrasev 
thread_request_serializer_proxy(thread_dispatcher & td,int soft_limit)79*c4568449SPavel Kumbrasev thread_request_serializer_proxy::thread_request_serializer_proxy(thread_dispatcher& td, int soft_limit) : my_serializer(td, soft_limit)
80*c4568449SPavel Kumbrasev {}
81*c4568449SPavel Kumbrasev 
register_mandatory_request(int mandatory_delta)82*c4568449SPavel Kumbrasev void thread_request_serializer_proxy::register_mandatory_request(int mandatory_delta) {
83*c4568449SPavel Kumbrasev     if (mandatory_delta != 0) {
84*c4568449SPavel Kumbrasev         mutex_type::scoped_lock lock(my_mutex, /* is_write = */ false);
85*c4568449SPavel Kumbrasev         int prev_value = my_num_mandatory_requests.fetch_add(mandatory_delta);
86*c4568449SPavel Kumbrasev 
87*c4568449SPavel Kumbrasev         const bool should_try_enable = mandatory_delta > 0 && prev_value == 0;
88*c4568449SPavel Kumbrasev         const bool should_try_disable = mandatory_delta < 0 && prev_value == 1;
89*c4568449SPavel Kumbrasev 
90*c4568449SPavel Kumbrasev         if (should_try_enable) {
91*c4568449SPavel Kumbrasev             enable_mandatory_concurrency(lock);
92*c4568449SPavel Kumbrasev         } else if (should_try_disable) {
93*c4568449SPavel Kumbrasev             disable_mandatory_concurrency(lock);
94*c4568449SPavel Kumbrasev         }
95*c4568449SPavel Kumbrasev     }
96*c4568449SPavel Kumbrasev }
97*c4568449SPavel Kumbrasev 
set_active_num_workers(int soft_limit)98*c4568449SPavel Kumbrasev void thread_request_serializer_proxy::set_active_num_workers(int soft_limit) {
99*c4568449SPavel Kumbrasev     mutex_type::scoped_lock lock(my_mutex, /* is_write = */ true);
100*c4568449SPavel Kumbrasev 
101*c4568449SPavel Kumbrasev     if (soft_limit != 0) {
102*c4568449SPavel Kumbrasev         my_is_mandatory_concurrency_enabled = false;
103*c4568449SPavel Kumbrasev         my_serializer.set_active_num_workers(soft_limit);
104*c4568449SPavel Kumbrasev     } else {
105*c4568449SPavel Kumbrasev         if (my_num_mandatory_requests > 0 && !my_is_mandatory_concurrency_enabled) {
106*c4568449SPavel Kumbrasev             my_is_mandatory_concurrency_enabled = true;
107*c4568449SPavel Kumbrasev             my_serializer.set_active_num_workers(1);
108*c4568449SPavel Kumbrasev         }
109*c4568449SPavel Kumbrasev     }
110*c4568449SPavel Kumbrasev }
111*c4568449SPavel Kumbrasev 
update(int delta)112*c4568449SPavel Kumbrasev void thread_request_serializer_proxy::update(int delta) { my_serializer.update(delta); }
113*c4568449SPavel Kumbrasev 
enable_mandatory_concurrency(mutex_type::scoped_lock & lock)114*c4568449SPavel Kumbrasev void thread_request_serializer_proxy::enable_mandatory_concurrency(mutex_type::scoped_lock& lock) {
115*c4568449SPavel Kumbrasev     lock.upgrade_to_writer();
116*c4568449SPavel Kumbrasev     bool still_should_enable = my_num_mandatory_requests.load(std::memory_order_relaxed) > 0 &&
117*c4568449SPavel Kumbrasev             !my_is_mandatory_concurrency_enabled && my_serializer.is_no_workers_avaliable();
118*c4568449SPavel Kumbrasev 
119*c4568449SPavel Kumbrasev     if (still_should_enable) {
120*c4568449SPavel Kumbrasev         my_is_mandatory_concurrency_enabled = true;
121*c4568449SPavel Kumbrasev         my_serializer.set_active_num_workers(1);
122*c4568449SPavel Kumbrasev     }
123*c4568449SPavel Kumbrasev }
124*c4568449SPavel Kumbrasev 
disable_mandatory_concurrency(mutex_type::scoped_lock & lock)125*c4568449SPavel Kumbrasev void thread_request_serializer_proxy::disable_mandatory_concurrency(mutex_type::scoped_lock& lock) {
126*c4568449SPavel Kumbrasev     lock.upgrade_to_writer();
127*c4568449SPavel Kumbrasev     bool still_should_disable = my_num_mandatory_requests.load(std::memory_order_relaxed) <= 0 &&
128*c4568449SPavel Kumbrasev             my_is_mandatory_concurrency_enabled && !my_serializer.is_no_workers_avaliable();
129*c4568449SPavel Kumbrasev 
130*c4568449SPavel Kumbrasev     if (still_should_disable) {
131*c4568449SPavel Kumbrasev         my_is_mandatory_concurrency_enabled = false;
132*c4568449SPavel Kumbrasev         my_serializer.set_active_num_workers(0);
133*c4568449SPavel Kumbrasev     }
134*c4568449SPavel Kumbrasev }
135*c4568449SPavel Kumbrasev 
136*c4568449SPavel Kumbrasev } // r1
137*c4568449SPavel Kumbrasev } // detail
138*c4568449SPavel Kumbrasev } // tbb
139