1 /*
2     Copyright (c) 2020-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_test_common_utils_concurrency_limit_H
18 #define __TBB_test_common_utils_concurrency_limit_H
19 
20 #include "config.h"
21 #include "utils_assert.h"
22 #include "utils_report.h"
23 #include "oneapi/tbb/task_arena.h"
24 #include "oneapi/tbb/task_scheduler_observer.h"
25 #include "oneapi/tbb/enumerable_thread_specific.h"
26 
27 #include <cstddef>
28 #include <vector>
29 #include <algorithm>
30 
31 #if _WIN32 || _WIN64
32 #include <windows.h>
33 #elif __unix__
34 #include <unistd.h>
35 #if __linux__
36 #include <sys/sysinfo.h>
37 #endif
38 #include <string.h>
39 #include <sched.h>
40 #if __FreeBSD__
41 #include <errno.h>
42 #include <sys/param.h>
43 #include <sys/cpuset.h>
44 #endif
45 #endif
46 #include <thread>
47 
48 namespace utils {
49 using thread_num_type = std::size_t;
50 
get_platform_max_threads()51 inline thread_num_type get_platform_max_threads() {
52     static thread_num_type platform_max_threads = tbb::this_task_arena::max_concurrency();
53     return platform_max_threads;
54 }
55 
concurrency_range(thread_num_type max_threads)56 inline std::vector<thread_num_type> concurrency_range(thread_num_type max_threads) {
57     std::vector<thread_num_type> threads_range;
58     thread_num_type step = 1;
59     for(thread_num_type thread_num = 1; thread_num <= max_threads; thread_num += step++)
60         threads_range.push_back(thread_num);
61     if(threads_range.back() != max_threads)
62         threads_range.push_back(max_threads);
63     // rotate in order to make threads_range non-monotonic
64     std::rotate(threads_range.begin(), threads_range.begin() + threads_range.size()/2, threads_range.end());
65     return threads_range;
66 }
67 
concurrency_range()68 inline std::vector<thread_num_type> concurrency_range() {
69     static std::vector<thread_num_type> threads_range = concurrency_range(get_platform_max_threads());
70     return threads_range;
71 }
72 
73 #if !__TBB_TEST_SKIP_AFFINITY
74 
75 static int maxProcs = 0;
76 
get_max_procs()77 static int get_max_procs() {
78     if (!maxProcs) {
79 #if _WIN32||_WIN64
80         DWORD_PTR pam, sam, m = 1;
81         GetProcessAffinityMask( GetCurrentProcess(), &pam, &sam );
82         int nproc = 0;
83         for ( std::size_t i = 0; i < sizeof(DWORD_PTR) * CHAR_BIT; ++i, m <<= 1 ) {
84             if ( pam & m )
85                 ++nproc;
86         }
87         maxProcs = nproc;
88 #elif __linux__
89         cpu_set_t mask;
90         int result = 0;
91         sched_getaffinity(0, sizeof(cpu_set_t), &mask);
92         int nproc = sysconf(_SC_NPROCESSORS_ONLN);
93         for (int i = 0; i < nproc; ++i) {
94             if (CPU_ISSET(i, &mask)) ++result;
95         }
96         maxProcs = result;
97 #else // FreeBSD
98         maxProcs = sysconf(_SC_NPROCESSORS_ONLN);
99 #endif
100     }
101     return maxProcs;
102 }
103 
get_start_affinity_process()104 int get_start_affinity_process() {
105 #if __linux__
106     cpu_set_t mask;
107     sched_getaffinity(0, sizeof(cpu_set_t), &mask);
108 
109     int result = -1;
110 
111     int nproc = sysconf(_SC_NPROCESSORS_ONLN);
112     for (int i = 0; i < nproc; ++i) {
113         if (CPU_ISSET(i, &mask)) {
114             result = i;
115             break;
116         }
117     }
118     ASSERT(result != -1, nullptr);
119     return result;
120 #else
121     // TODO: add affinity support for Windows and FreeBSD
122     return 0;
123 #endif
124 }
125 
limit_number_of_threads(int max_threads)126 int limit_number_of_threads( int max_threads ) {
127     ASSERT(max_threads >= 1,"The limited number of threads should be positive");
128     maxProcs = get_max_procs();
129     if (maxProcs < max_threads) {
130         // Suppose that process mask is not set so the number of available threads equals maxProcs
131         return maxProcs;
132     }
133 #if _WIN32 || _WIN64
134     ASSERT(max_threads <= 64, "LimitNumberOfThreads doesn't support max_threads to be more than 64 on Windows");
135     DWORD_PTR mask = 1;
136     for (int i = 1; i < max_threads; ++i) {
137         mask |= mask << 1;
138     }
139     bool err = !SetProcessAffinityMask(GetCurrentProcess(), mask);
140 #else
141 #if __linux__
142     using mask_t = cpu_set_t;
143 #define setaffinity(mask) sched_setaffinity(getpid(), sizeof(mask_t), &mask)
144 #else /*__FreeBSD*/
145     using mask_t = cpuset_t;
146 #define setaffinity(mask) cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_PID, -1, sizeof(mask_t), &mask)
147 #endif
148 
149     mask_t new_mask;
150     CPU_ZERO(&new_mask);
151 
152     int mask_size = int(sizeof(mask_t) * CHAR_BIT);
153     if ( mask_size < maxProcs ) {
154         REPORT("The mask size doesn't seem to be big enough to call setaffinity. The call may return an error.");
155     }
156 
157     ASSERT(max_threads <= int(sizeof(mask_t) * CHAR_BIT), "The mask size is not enough to set the requested number of threads.");
158     int st = get_start_affinity_process();
159     for (int i = st; i < st + max_threads; ++i) {
160         CPU_SET(i, &new_mask);
161     }
162     int err = setaffinity(new_mask);
163 #endif
164     ASSERT(!err, "Setting process affinity failed");
165     return max_threads;
166 }
167 
168 #endif // __TBB_TEST_SKIP_AFFINITY
169 
170 // TODO: consider using cpuset_setaffinity/sched_getaffinity on FreeBSD to enable the functionality
171 #define OS_AFFINITY_SYSCALL_PRESENT (__linux__ && !__ANDROID__)
172 
173 #if OS_AFFINITY_SYSCALL_PRESENT
get_thread_affinity_mask(std::size_t & ncpus,std::vector<int> & free_indexes)174 void get_thread_affinity_mask(std::size_t& ncpus, std::vector<int>& free_indexes) {
175     cpu_set_t* mask = nullptr;
176     ncpus = sizeof(cpu_set_t) * CHAR_BIT;
177     do {
178         mask = CPU_ALLOC(ncpus);
179         if (!mask) break;
180         const size_t size = CPU_ALLOC_SIZE(ncpus);
181         CPU_ZERO_S(size, mask);
182         const int err = sched_getaffinity(0, size, mask);
183         if (!err) break;
184 
185         CPU_FREE(mask);
186         mask = nullptr;
187         if (errno != EINVAL) break;
188         ncpus <<= 1;
189     } while (ncpus < 16 * 1024 /* some reasonable limit */ );
190     ASSERT(mask, "Failed to obtain process affinity mask.");
191 
192     const size_t size = CPU_ALLOC_SIZE(ncpus);
193     const int num_cpus = CPU_COUNT_S(size, mask);
194     for (int i = 0; i < num_cpus; ++i) {
195         if (CPU_ISSET_S(i, size, mask)) {
196             free_indexes.push_back(i);
197         }
198     }
199 
200     CPU_FREE(mask);
201 }
202 
pin_thread_imp(std::size_t ncpus,std::vector<int> & free_indexes,std::atomic<int> & curr_idx)203 void pin_thread_imp(std::size_t ncpus, std::vector<int>& free_indexes, std::atomic<int>& curr_idx) {
204     const size_t size = CPU_ALLOC_SIZE(ncpus);
205 
206     ASSERT(free_indexes.size() > 0, nullptr);
207     int mapped_idx = free_indexes[curr_idx++ % free_indexes.size()];
208 
209     cpu_set_t *target_mask = CPU_ALLOC(ncpus);
210     ASSERT(target_mask, nullptr);
211     CPU_ZERO_S(size, target_mask);
212     CPU_SET_S(mapped_idx, size, target_mask);
213     const int err = sched_setaffinity(0, size, target_mask);
214     ASSERT(err == 0, "Failed to set thread affinity");
215 
216     CPU_FREE(target_mask);
217 }
218 #endif
219 
220 class thread_pinner {
221 public:
thread_pinner()222     thread_pinner() {
223         tbb::detail::suppress_unused_warning(thread_index);
224 #if OS_AFFINITY_SYSCALL_PRESENT
225         get_thread_affinity_mask(ncpus, free_indexes);
226 #endif
227     }
228 
pin_thread()229     void pin_thread() {
230 #if OS_AFFINITY_SYSCALL_PRESENT
231         pin_thread_imp(ncpus, free_indexes, thread_index);
232 #endif
233     }
234 
235 private:
236 #if OS_AFFINITY_SYSCALL_PRESENT
237     std::size_t ncpus;
238     std::vector<int> free_indexes{};
239 #endif
240     std::atomic<int> thread_index{};
241 };
242 
243 class pinning_observer : public tbb::task_scheduler_observer {
244     thread_pinner pinner;
245     tbb::enumerable_thread_specific<bool> register_threads;
246 public:
pinning_observer(tbb::task_arena & arena)247     pinning_observer(tbb::task_arena& arena) : tbb::task_scheduler_observer(arena), pinner() {
248         observe(true);
249     }
250 
on_scheduler_entry(bool)251     void on_scheduler_entry( bool ) override {
252         bool& is_pinned = register_threads.local();
253         if (is_pinned) return;
254 
255         pinner.pin_thread();
256 
257         is_pinned = true;
258     }
259 
~pinning_observer()260     ~pinning_observer() {
261         observe(false);
262     }
263 };
264 
265 #if __unix__
266 #include <sched.h>
267 #endif
268 
can_change_thread_priority()269 bool can_change_thread_priority() {
270 #if __unix__
271     pthread_t this_thread = pthread_self();
272     sched_param old_params;
273     int old_policy;
274     int err = pthread_getschedparam(this_thread, &old_policy, &old_params);
275     ASSERT(err == 0, nullptr);
276 
277     sched_param params;
278     params.sched_priority = sched_get_priority_max(SCHED_FIFO);
279     ASSERT(params.sched_priority != -1, nullptr);
280     err = pthread_setschedparam(this_thread, SCHED_FIFO, &params);
281     if (err == 0) {
282         err = pthread_setschedparam(this_thread, old_policy, &old_params);
283         ASSERT(err == 0, nullptr);
284     }
285     return err == 0;
286 #endif
287     return false;
288 }
289 
290 #if __unix__
291 class increased_priority_guard {
292 public:
increased_priority_guard()293     increased_priority_guard() : m_backup(get_current_schedparam()) {
294         increase_thread_priority();
295     }
296 
~increased_priority_guard()297     ~increased_priority_guard() {
298         // restore priority on destruction
299         pthread_t this_thread = pthread_self();
300         int err = pthread_setschedparam(this_thread,
301             /*policy*/ m_backup.first, /*sched_param*/ &m_backup.second);
302         ASSERT(err == 0, nullptr);
303     }
304 private:
get_current_schedparam()305     std::pair<int, sched_param> get_current_schedparam() {
306         pthread_t this_thread = pthread_self();
307         sched_param params;
308         int policy = 0;
309         int err = pthread_getschedparam(this_thread, &policy, &params);
310         ASSERT(err == 0, nullptr);
311         return std::make_pair(policy, params);
312     }
313 
increase_thread_priority()314     void increase_thread_priority() {
315         pthread_t this_thread = pthread_self();
316         sched_param params;
317         params.sched_priority = sched_get_priority_max(SCHED_FIFO);
318         ASSERT(params.sched_priority != -1, nullptr);
319         int err = pthread_setschedparam(this_thread, SCHED_FIFO, &params);
320         ASSERT(err == 0, "Can not change thread priority.");
321     }
322 
323     std::pair<int, sched_param> m_backup;
324 };
325 #else
326     class increased_priority_guard{};
327 #endif
328 
329 } // namespace utils
330 
331 #endif // __TBB_test_common_utils_concurrency_limit_H
332