xref: /oneTBB/test/tbb/test_partitioner.cpp (revision 7cee2251)
1 /*
2     Copyright (c) 2021-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #include "tbb/parallel_for.h"
20 #include "tbb/task_arena.h"
21 #include "tbb/global_control.h"
22 #include "oneapi/tbb/mutex.h"
23 
24 #include "common/utils.h"
25 #include "common/utils_concurrency_limit.h"
26 #include "common/dummy_body.h"
27 #include "common/spin_barrier.h"
28 
29 #include <cstddef>
30 #include <utility>
31 #include <vector>
32 #include <algorithm> // std::min_element
33 
34 //! \file test_partitioner.cpp
35 //! \brief Test for [internal] functionality
36 
37 namespace task_affinity_retention {
38 
test(PerBodyFunc && body)39 template <typename PerBodyFunc> float test(PerBodyFunc&& body) {
40     const std::size_t num_threads = 2 * utils::get_platform_max_threads();
41     tbb::global_control concurrency(tbb::global_control::max_allowed_parallelism, num_threads);
42     tbb::task_arena big_arena(static_cast<int>(num_threads));
43 
44 #if __TBB_USE_THREAD_SANITIZER
45     // Reduce execution time under Thread Sanitizer
46     const std::size_t repeats = 50;
47 #elif EMSCRIPTEN
48     // Reduce execution time for emscripten
49     const std::size_t repeats = 10;
50 #else
51     const std::size_t repeats = 100;
52 #endif
53     const std::size_t per_thread_iters = 1000;
54 
55     using range = std::pair<std::size_t, std::size_t>;
56     using execution_trace = std::vector< std::vector<range> >;
57 
58     execution_trace trace(num_threads);
59     for (auto& v : trace)
60         v.reserve(repeats);
61 
62     for (std::size_t repeat = 0; repeat < repeats; ++repeat) {
63         big_arena.execute([&] {
64             tbb::parallel_for(
65                 tbb::blocked_range<std::size_t>(0, per_thread_iters * num_threads),
66                 [&](const tbb::blocked_range<std::size_t>& r) {
67                     int thread_id = tbb::this_task_arena::current_thread_index();
68                     trace[thread_id].emplace_back(r.begin(), r.end());
69 
70                     const bool is_uniform_split = r.size() == per_thread_iters;
71                     CHECK_MESSAGE(is_uniform_split, "static partitioner split the range incorrectly.");
72 
73                     std::this_thread::yield();
74 
75                     std::forward<PerBodyFunc>(body)();
76                 },
77                 tbb::static_partitioner()
78             );
79         });
80         // TODO:
81         //   - Consider introducing an observer to guarantee the threads left the arena.
82     }
83 
84     std::size_t range_shifts = 0;
85     for (std::size_t thread_id = 0; thread_id < num_threads; ++thread_id) {
86         auto trace_size = trace[thread_id].size();
87         if (trace_size > 1) {
88             auto previous_call_range = trace[thread_id][1];
89 
90             for (std::size_t invocation = 2; invocation < trace_size; ++invocation) {
91                 const auto& current_call_range = trace[thread_id][invocation];
92 
93                 const bool is_range_changed = previous_call_range != current_call_range;
94                 if (is_range_changed) {
95                     previous_call_range = current_call_range;
96                     // count thread changes its execution strategy
97                     ++range_shifts;
98                 }
99             }
100         }
101 
102 #if TBB_USE_DEBUG
103         WARN_MESSAGE(
104             trace_size <= repeats,
105             "Thread " << thread_id << " executed extra " << trace_size - repeats
106             << " ranges assigned to other threads."
107         );
108         WARN_MESSAGE(
109             trace_size >= repeats,
110             "Thread " << thread_id << " executed " << repeats - trace_size
111             << " fewer ranges than expected."
112         );
113 #endif
114     }
115 
116 #if TBB_USE_DEBUG
117     WARN_MESSAGE(
118         range_shifts == 0,
119         "Threads change subranges " << range_shifts << " times out of "
120         << num_threads * repeats - num_threads << " possible."
121     );
122 #endif
123 
124     return float(range_shifts) / float(repeats * num_threads);
125 }
126 
relaxed_test()127 void relaxed_test() {
128     float range_shifts_part = test(/*per body invocation call*/[]{});
129     const float require_tolerance = 0.5f;
130     // TODO: investigate why switching could happen in more than half of the cases
131     WARN_MESSAGE(
132         (0 <= range_shifts_part && range_shifts_part <= require_tolerance),
133         "Tasks affinitization was not respected in " << range_shifts_part * 100 << "% of the cases."
134     );
135 }
136 
strict_test()137 void strict_test() {
138     utils::SpinBarrier barrier(2 * utils::get_platform_max_threads());
139     const float tolerance = 1e-5f;
140     while (test(/*per body invocation call*/[&barrier] { barrier.wait(); }) > tolerance);
141 }
142 
143 } // namespace task_affinity_retention
144 
145 //! Testing affinitized tasks are not stolen
146 //! \brief \ref error_guessing
147 TEST_CASE("Threads respect task affinity") {
148     task_affinity_retention::relaxed_test();
149     task_affinity_retention::strict_test();
150 }
151 
152 template <typename Range>
test_custom_range(int diff_mult)153 void test_custom_range(int diff_mult) {
154     int num_trials = 100;
155 
156     std::vector<std::vector<std::size_t>> results(num_trials);
157     oneapi::tbb::mutex results_mutex;
158 
159     for (int i = 0; i < num_trials; ++i) {
160         oneapi::tbb::parallel_for(Range(0, int(100 * utils::get_platform_max_threads()), 1), [&] (const Range& r) {
161             oneapi::tbb::mutex::scoped_lock lock(results_mutex);
162             results[i].push_back(r.size());
163         }, oneapi::tbb::static_partitioner{});
164     }
165 
166     for (auto& res : results) {
167         REQUIRE(res.size() == utils::get_platform_max_threads());
168 
169         std::size_t min_size = *std::min_element(res.begin(), res.end());
170         for (auto elem : res) {
171             REQUIRE(min_size * diff_mult + 2 >= elem);
172         }
173     }
174 }
175 
176 //! \brief \ref regression
177 TEST_CASE("Test partitioned tasks count and size for static_partitioner") {
178     class custom_range : public oneapi::tbb::blocked_range<int> {
179         using base_type = oneapi::tbb::blocked_range<int>;
180     public:
custom_range(int l,int r,int g)181         custom_range(int l, int r, int g) : base_type(l, r, g) {}
custom_range(const custom_range & r)182         custom_range(const custom_range& r) : base_type(r) {}
183 
custom_range(custom_range & r,tbb::split)184         custom_range(custom_range& r, tbb::split) : base_type(r, tbb::split()) {}
185     };
186 
187     test_custom_range<custom_range>(2);
188 
189     class custom_range_with_psplit : public oneapi::tbb::blocked_range<int> {
190         using base_type = oneapi::tbb::blocked_range<int>;
191     public:
custom_range_with_psplit(int l,int r,int g)192         custom_range_with_psplit(int l, int r, int g) : base_type(l, r, g) {}
custom_range_with_psplit(const custom_range_with_psplit & r)193         custom_range_with_psplit(const custom_range_with_psplit& r) : base_type(r) {}
194 
custom_range_with_psplit(custom_range_with_psplit & r,tbb::split)195         custom_range_with_psplit(custom_range_with_psplit& r, tbb::split) : base_type(r, tbb::split()) {}
custom_range_with_psplit(custom_range_with_psplit & r,tbb::proportional_split & p)196         custom_range_with_psplit(custom_range_with_psplit& r, tbb::proportional_split& p) : base_type(r, p) {}
197     };
198 
199     test_custom_range<custom_range_with_psplit>(1);
200 }
201