1 /*
2 Copyright (c) 2020-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_test_common_utils_concurrency_limit_H
18 #define __TBB_test_common_utils_concurrency_limit_H
19
20 #include "config.h"
21 #include "utils_assert.h"
22 #include "utils_report.h"
23 #include "oneapi/tbb/task_arena.h"
24 #include "oneapi/tbb/task_scheduler_observer.h"
25 #include "oneapi/tbb/enumerable_thread_specific.h"
26
27 #include <cstddef>
28 #include <vector>
29 #include <algorithm>
30
31 #if _WIN32 || _WIN64
32 #include <windows.h>
33 #elif __unix__
34 #include <unistd.h>
35 #if __linux__
36 #include <sys/sysinfo.h>
37 #endif
38 #include <string.h>
39 #include <sched.h>
40 #if __FreeBSD__
41 #include <errno.h>
42 #include <sys/param.h>
43 #include <sys/cpuset.h>
44 #endif
45 #endif
46 #include <thread>
47
48 namespace utils {
49 using thread_num_type = std::size_t;
50
get_platform_max_threads()51 inline thread_num_type get_platform_max_threads() {
52 static thread_num_type platform_max_threads = tbb::this_task_arena::max_concurrency();
53 return platform_max_threads;
54 }
55
concurrency_range(thread_num_type max_threads)56 inline std::vector<thread_num_type> concurrency_range(thread_num_type max_threads) {
57 std::vector<thread_num_type> threads_range;
58 thread_num_type step = 1;
59 for(thread_num_type thread_num = 1; thread_num <= max_threads; thread_num += step++)
60 threads_range.push_back(thread_num);
61 if(threads_range.back() != max_threads)
62 threads_range.push_back(max_threads);
63 // rotate in order to make threads_range non-monotonic
64 std::rotate(threads_range.begin(), threads_range.begin() + threads_range.size()/2, threads_range.end());
65 return threads_range;
66 }
67
concurrency_range()68 inline std::vector<thread_num_type> concurrency_range() {
69 static std::vector<thread_num_type> threads_range = concurrency_range(get_platform_max_threads());
70 return threads_range;
71 }
72
73 #if !__TBB_TEST_SKIP_AFFINITY
74
75 static int maxProcs = 0;
76
get_max_procs()77 static int get_max_procs() {
78 if (!maxProcs) {
79 #if _WIN32||_WIN64
80 DWORD_PTR pam, sam, m = 1;
81 GetProcessAffinityMask( GetCurrentProcess(), &pam, &sam );
82 int nproc = 0;
83 for ( std::size_t i = 0; i < sizeof(DWORD_PTR) * CHAR_BIT; ++i, m <<= 1 ) {
84 if ( pam & m )
85 ++nproc;
86 }
87 maxProcs = nproc;
88 #elif __linux__
89 cpu_set_t mask;
90 int result = 0;
91 sched_getaffinity(0, sizeof(cpu_set_t), &mask);
92 int nproc = sysconf(_SC_NPROCESSORS_ONLN);
93 for (int i = 0; i < nproc; ++i) {
94 if (CPU_ISSET(i, &mask)) ++result;
95 }
96 maxProcs = result;
97 #else // FreeBSD
98 maxProcs = sysconf(_SC_NPROCESSORS_ONLN);
99 #endif
100 }
101 return maxProcs;
102 }
103
get_start_affinity_process()104 int get_start_affinity_process() {
105 #if __linux__
106 cpu_set_t mask;
107 sched_getaffinity(0, sizeof(cpu_set_t), &mask);
108
109 int result = -1;
110
111 int nproc = sysconf(_SC_NPROCESSORS_ONLN);
112 for (int i = 0; i < nproc; ++i) {
113 if (CPU_ISSET(i, &mask)) {
114 result = i;
115 break;
116 }
117 }
118 ASSERT(result != -1, nullptr);
119 return result;
120 #else
121 // TODO: add affinity support for Windows and FreeBSD
122 return 0;
123 #endif
124 }
125
limit_number_of_threads(int max_threads)126 int limit_number_of_threads( int max_threads ) {
127 ASSERT(max_threads >= 1,"The limited number of threads should be positive");
128 maxProcs = get_max_procs();
129 if (maxProcs < max_threads) {
130 // Suppose that process mask is not set so the number of available threads equals maxProcs
131 return maxProcs;
132 }
133 #if _WIN32 || _WIN64
134 ASSERT(max_threads <= 64, "LimitNumberOfThreads doesn't support max_threads to be more than 64 on Windows");
135 DWORD_PTR mask = 1;
136 for (int i = 1; i < max_threads; ++i) {
137 mask |= mask << 1;
138 }
139 bool err = !SetProcessAffinityMask(GetCurrentProcess(), mask);
140 #else
141 #if __linux__
142 using mask_t = cpu_set_t;
143 #define setaffinity(mask) sched_setaffinity(getpid(), sizeof(mask_t), &mask)
144 #else /*__FreeBSD*/
145 using mask_t = cpuset_t;
146 #define setaffinity(mask) cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1, sizeof(mask_t), &mask)
147 #endif
148
149 mask_t new_mask;
150 CPU_ZERO(&new_mask);
151
152 int mask_size = int(sizeof(mask_t) * CHAR_BIT);
153 if ( mask_size < maxProcs ) {
154 REPORT("The mask size doesn't seem to be big enough to call setaffinity. The call may return an error.");
155 }
156
157 ASSERT(max_threads <= int(sizeof(mask_t) * CHAR_BIT), "The mask size is not enough to set the requested number of threads.");
158 int st = get_start_affinity_process();
159 for (int i = st; i < st + max_threads; ++i) {
160 CPU_SET(i, &new_mask);
161 }
162 int err = setaffinity(new_mask);
163 #endif
164 ASSERT(!err, "Setting process affinity failed");
165 return max_threads;
166 }
167
168 #endif // __TBB_TEST_SKIP_AFFINITY
169
170 // TODO: consider using cpuset_setaffinity/sched_getaffinity on FreeBSD to enable the functionality
171 #define OS_AFFINITY_SYSCALL_PRESENT (__linux__ && !__ANDROID__)
172
173 #if OS_AFFINITY_SYSCALL_PRESENT
get_thread_affinity_mask(std::size_t & ncpus,std::vector<int> & free_indexes)174 void get_thread_affinity_mask(std::size_t& ncpus, std::vector<int>& free_indexes) {
175 cpu_set_t* mask = nullptr;
176 ncpus = sizeof(cpu_set_t) * CHAR_BIT;
177 do {
178 mask = CPU_ALLOC(ncpus);
179 if (!mask) break;
180 const size_t size = CPU_ALLOC_SIZE(ncpus);
181 CPU_ZERO_S(size, mask);
182 const int err = sched_getaffinity(0, size, mask);
183 if (!err) break;
184
185 CPU_FREE(mask);
186 mask = nullptr;
187 if (errno != EINVAL) break;
188 ncpus <<= 1;
189 } while (ncpus < 16 * 1024 /* some reasonable limit */ );
190 ASSERT(mask, "Failed to obtain process affinity mask.");
191
192 const size_t size = CPU_ALLOC_SIZE(ncpus);
193 const int num_cpus = CPU_COUNT_S(size, mask);
194 for (int i = 0; i < num_cpus; ++i) {
195 if (CPU_ISSET_S(i, size, mask)) {
196 free_indexes.push_back(i);
197 }
198 }
199
200 CPU_FREE(mask);
201 }
202
pin_thread_imp(std::size_t ncpus,std::vector<int> & free_indexes,std::atomic<int> & curr_idx)203 void pin_thread_imp(std::size_t ncpus, std::vector<int>& free_indexes, std::atomic<int>& curr_idx) {
204 const size_t size = CPU_ALLOC_SIZE(ncpus);
205
206 ASSERT(free_indexes.size() > 0, nullptr);
207 int mapped_idx = free_indexes[curr_idx++ % free_indexes.size()];
208
209 cpu_set_t *target_mask = CPU_ALLOC(ncpus);
210 ASSERT(target_mask, nullptr);
211 CPU_ZERO_S(size, target_mask);
212 CPU_SET_S(mapped_idx, size, target_mask);
213 const int err = sched_setaffinity(0, size, target_mask);
214 ASSERT(err == 0, "Failed to set thread affinity");
215
216 CPU_FREE(target_mask);
217 }
218 #endif
219
220 class thread_pinner {
221 public:
thread_pinner()222 thread_pinner() {
223 tbb::detail::suppress_unused_warning(thread_index);
224 #if OS_AFFINITY_SYSCALL_PRESENT
225 get_thread_affinity_mask(ncpus, free_indexes);
226 #endif
227 }
228
pin_thread()229 void pin_thread() {
230 #if OS_AFFINITY_SYSCALL_PRESENT
231 pin_thread_imp(ncpus, free_indexes, thread_index);
232 #endif
233 }
234
235 private:
236 #if OS_AFFINITY_SYSCALL_PRESENT
237 std::size_t ncpus;
238 std::vector<int> free_indexes{};
239 #endif
240 std::atomic<int> thread_index{};
241 };
242
243 class pinning_observer : public tbb::task_scheduler_observer {
244 thread_pinner pinner;
245 tbb::enumerable_thread_specific<bool> register_threads;
246 public:
pinning_observer(tbb::task_arena & arena)247 pinning_observer(tbb::task_arena& arena) : tbb::task_scheduler_observer(arena), pinner() {
248 observe(true);
249 }
250
on_scheduler_entry(bool)251 void on_scheduler_entry( bool ) override {
252 bool& is_pinned = register_threads.local();
253 if (is_pinned) return;
254
255 pinner.pin_thread();
256
257 is_pinned = true;
258 }
259
~pinning_observer()260 ~pinning_observer() {
261 observe(false);
262 }
263 };
264
265 #if __unix__
266 #include <sched.h>
267 #endif
268
can_change_thread_priority()269 bool can_change_thread_priority() {
270 #if __unix__
271 pthread_t this_thread = pthread_self();
272 sched_param old_params;
273 int old_policy;
274 int err = pthread_getschedparam(this_thread, &old_policy, &old_params);
275 ASSERT(err == 0, nullptr);
276
277 sched_param params;
278 params.sched_priority = sched_get_priority_max(SCHED_FIFO);
279 ASSERT(params.sched_priority != -1, nullptr);
280 err = pthread_setschedparam(this_thread, SCHED_FIFO, ¶ms);
281 if (err == 0) {
282 err = pthread_setschedparam(this_thread, old_policy, &old_params);
283 ASSERT(err == 0, nullptr);
284 }
285 return err == 0;
286 #endif
287 return false;
288 }
289
290 #if __unix__
291 class increased_priority_guard {
292 public:
increased_priority_guard()293 increased_priority_guard() : m_backup(get_current_schedparam()) {
294 increase_thread_priority();
295 }
296
~increased_priority_guard()297 ~increased_priority_guard() {
298 // restore priority on destruction
299 pthread_t this_thread = pthread_self();
300 int err = pthread_setschedparam(this_thread,
301 /*policy*/ m_backup.first, /*sched_param*/ &m_backup.second);
302 ASSERT(err == 0, nullptr);
303 }
304 private:
get_current_schedparam()305 std::pair<int, sched_param> get_current_schedparam() {
306 pthread_t this_thread = pthread_self();
307 sched_param params;
308 int policy = 0;
309 int err = pthread_getschedparam(this_thread, &policy, ¶ms);
310 ASSERT(err == 0, nullptr);
311 return std::make_pair(policy, params);
312 }
313
increase_thread_priority()314 void increase_thread_priority() {
315 pthread_t this_thread = pthread_self();
316 sched_param params;
317 params.sched_priority = sched_get_priority_max(SCHED_FIFO);
318 ASSERT(params.sched_priority != -1, nullptr);
319 int err = pthread_setschedparam(this_thread, SCHED_FIFO, ¶ms);
320 ASSERT(err == 0, "Can not change thread priority.");
321 }
322
323 std::pair<int, sched_param> m_backup;
324 };
325 #else
326 class increased_priority_guard{};
327 #endif
328
329 } // namespace utils
330
331 #endif // __TBB_test_common_utils_concurrency_limit_H
332