1 /*
2     Copyright (c) 2022-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "misc.h"
18 #include "thread_request_serializer.h"
19 
20 namespace tbb {
21 namespace detail {
22 namespace r1 {
23 
thread_request_serializer(thread_dispatcher & td,int soft_limit)24 thread_request_serializer::thread_request_serializer(thread_dispatcher& td, int soft_limit)
25     : my_thread_dispatcher(td)
26     , my_soft_limit(soft_limit)
27 {}
28 
update(int delta)29 void thread_request_serializer::update(int delta) {
30     constexpr std::uint64_t delta_mask = (pending_delta_base << 1) - 1;
31     constexpr std::uint64_t counter_value = delta_mask + 1;
32 
33     int prev_pending_delta = my_pending_delta.fetch_add(counter_value + delta);
34 
35     // There is a pseudo request aggregator, so only thread that see pending_delta_base in my_pending_delta
36     // Will enter to critical section and call adjust_job_count_estimate
37     if (prev_pending_delta == pending_delta_base) {
38         delta = int(my_pending_delta.exchange(pending_delta_base) & delta_mask) - int(pending_delta_base);
39         mutex_type::scoped_lock lock(my_mutex);
40         my_total_request += delta;
41         delta = limit_delta(delta, my_soft_limit, my_total_request);
42         my_thread_dispatcher.adjust_job_count_estimate(delta);
43     }
44 }
45 
set_active_num_workers(int soft_limit)46 void thread_request_serializer::set_active_num_workers(int soft_limit) {
47     mutex_type::scoped_lock lock(my_mutex);
48     int delta = soft_limit - my_soft_limit;
49     delta = limit_delta(delta, my_total_request, soft_limit);
50     my_thread_dispatcher.adjust_job_count_estimate(delta);
51     my_soft_limit = soft_limit;
52 }
53 
limit_delta(int delta,int limit,int new_value)54 int thread_request_serializer::limit_delta(int delta, int limit, int new_value) {
55     // This method can be described with such pseudocode:
56     // bool above_limit = prev_value >= limit && new_value >= limit;
57     // bool below_limit = prev_value <= limit && new_value <= limit;
58     // enum request_type { ABOVE_LIMIT, CROSS_LIMIT, BELOW_LIMIT };
59     // request = above_limit ? ABOVE_LIMIT : below_limit ? BELOW_LIMIT : CROSS_LIMIT;
60 
61     // switch (request) {
62     // case ABOVE_LIMIT:
63     //     delta = 0;
64     // case CROSS_LIMIT:
65     //     delta = delta > 0 ? limit - prev_value : new_value - limit;
66     // case BELOW_LIMIT:
67     //     // No changes to delta
68     // }
69 
70    int prev_value = new_value - delta;
71 
72     // actual new_value and prev_value cannot exceed the limit
73     new_value = min(limit, new_value);
74     prev_value = min(limit, prev_value);
75     return new_value - prev_value;
76 }
77 
78 
thread_request_serializer_proxy(thread_dispatcher & td,int soft_limit)79 thread_request_serializer_proxy::thread_request_serializer_proxy(thread_dispatcher& td, int soft_limit) : my_serializer(td, soft_limit)
80 {}
81 
register_mandatory_request(int mandatory_delta)82 void thread_request_serializer_proxy::register_mandatory_request(int mandatory_delta) {
83     if (mandatory_delta != 0) {
84         mutex_type::scoped_lock lock(my_mutex, /* is_write = */ false);
85         int prev_value = my_num_mandatory_requests.fetch_add(mandatory_delta);
86 
87         const bool should_try_enable = mandatory_delta > 0 && prev_value == 0;
88         const bool should_try_disable = mandatory_delta < 0 && prev_value == 1;
89 
90         if (should_try_enable) {
91             enable_mandatory_concurrency(lock);
92         } else if (should_try_disable) {
93             disable_mandatory_concurrency(lock);
94         }
95     }
96 }
97 
set_active_num_workers(int soft_limit)98 void thread_request_serializer_proxy::set_active_num_workers(int soft_limit) {
99     mutex_type::scoped_lock lock(my_mutex, /* is_write = */ true);
100 
101     if (soft_limit != 0) {
102         my_is_mandatory_concurrency_enabled = false;
103         my_serializer.set_active_num_workers(soft_limit);
104     } else {
105         if (my_num_mandatory_requests > 0 && !my_is_mandatory_concurrency_enabled) {
106             my_is_mandatory_concurrency_enabled = true;
107             my_serializer.set_active_num_workers(1);
108         }
109     }
110 }
111 
update(int delta)112 void thread_request_serializer_proxy::update(int delta) { my_serializer.update(delta); }
113 
enable_mandatory_concurrency(mutex_type::scoped_lock & lock)114 void thread_request_serializer_proxy::enable_mandatory_concurrency(mutex_type::scoped_lock& lock) {
115     lock.upgrade_to_writer();
116     bool still_should_enable = my_num_mandatory_requests.load(std::memory_order_relaxed) > 0 &&
117             !my_is_mandatory_concurrency_enabled && my_serializer.is_no_workers_avaliable();
118 
119     if (still_should_enable) {
120         my_is_mandatory_concurrency_enabled = true;
121         my_serializer.set_active_num_workers(1);
122     }
123 }
124 
disable_mandatory_concurrency(mutex_type::scoped_lock & lock)125 void thread_request_serializer_proxy::disable_mandatory_concurrency(mutex_type::scoped_lock& lock) {
126     lock.upgrade_to_writer();
127     bool still_should_disable = my_num_mandatory_requests.load(std::memory_order_relaxed) <= 0 &&
128             my_is_mandatory_concurrency_enabled && !my_serializer.is_no_workers_avaliable();
129 
130     if (still_should_disable) {
131         my_is_mandatory_concurrency_enabled = false;
132         my_serializer.set_active_num_workers(0);
133     }
134 }
135 
136 } // r1
137 } // detail
138 } // tbb
139