xref: /oneTBB/test/tbb/test_partitioner.cpp (revision bb45f092)
1 /*
2     Copyright (c) 2021-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #include "tbb/parallel_for.h"
20 #include "tbb/task_arena.h"
21 #include "tbb/global_control.h"
22 #include "oneapi/tbb/mutex.h"
23 
24 #include "common/utils.h"
25 #include "common/utils_concurrency_limit.h"
26 #include "common/dummy_body.h"
27 #include "common/spin_barrier.h"
28 
29 #include <cstddef>
30 #include <utility>
31 #include <vector>
32 #include <algorithm> // std::min_element
33 
34 //! \file test_partitioner.cpp
35 //! \brief Test for [internal] functionality
36 
37 namespace task_affinity_retention {
38 
39 template <typename PerBodyFunc> float test(PerBodyFunc&& body) {
40     const std::size_t num_threads = 2 * utils::get_platform_max_threads();
41     tbb::global_control concurrency(tbb::global_control::max_allowed_parallelism, num_threads);
42     tbb::task_arena big_arena(static_cast<int>(num_threads));
43 
44 #if __TBB_USE_THREAD_SANITIZER
45     // Reduce execution time under Thread Sanitizer
46     const std::size_t repeats = 50;
47 #else
48     const std::size_t repeats = 100;
49 #endif
50     const std::size_t per_thread_iters = 1000;
51 
52     using range = std::pair<std::size_t, std::size_t>;
53     using execution_trace = std::vector< std::vector<range> >;
54 
55     execution_trace trace(num_threads);
56     for (auto& v : trace)
57         v.reserve(repeats);
58 
59     for (std::size_t repeat = 0; repeat < repeats; ++repeat) {
60         big_arena.execute([&] {
61             tbb::parallel_for(
62                 tbb::blocked_range<std::size_t>(0, per_thread_iters * num_threads),
63                 [&](const tbb::blocked_range<std::size_t>& r) {
64                     int thread_id = tbb::this_task_arena::current_thread_index();
65                     trace[thread_id].emplace_back(r.begin(), r.end());
66 
67                     const bool is_uniform_split = r.size() == per_thread_iters;
68                     CHECK_MESSAGE(is_uniform_split, "static partitioner split the range incorrectly.");
69 
70                     std::this_thread::yield();
71 
72                     std::forward<PerBodyFunc>(body)();
73                 },
74                 tbb::static_partitioner()
75             );
76         });
77         // TODO:
78         //   - Consider introducing an observer to guarantee the threads left the arena.
79     }
80 
81     std::size_t range_shifts = 0;
82     for (std::size_t thread_id = 0; thread_id < num_threads; ++thread_id) {
83         auto trace_size = trace[thread_id].size();
84         if (trace_size > 1) {
85             auto previous_call_range = trace[thread_id][1];
86 
87             for (std::size_t invocation = 2; invocation < trace_size; ++invocation) {
88                 const auto& current_call_range = trace[thread_id][invocation];
89 
90                 const bool is_range_changed = previous_call_range != current_call_range;
91                 if (is_range_changed) {
92                     previous_call_range = current_call_range;
93                     // count thread changes its execution strategy
94                     ++range_shifts;
95                 }
96             }
97         }
98 
99 #if TBB_USE_DEBUG
100         WARN_MESSAGE(
101             trace_size <= repeats,
102             "Thread " << thread_id << " executed extra " << trace_size - repeats
103             << " ranges assigned to other threads."
104         );
105         WARN_MESSAGE(
106             trace_size >= repeats,
107             "Thread " << thread_id << " executed " << repeats - trace_size
108             << " fewer ranges than expected."
109         );
110 #endif
111     }
112 
113 #if TBB_USE_DEBUG
114     WARN_MESSAGE(
115         range_shifts == 0,
116         "Threads change subranges " << range_shifts << " times out of "
117         << num_threads * repeats - num_threads << " possible."
118     );
119 #endif
120 
121     return float(range_shifts) / float(repeats * num_threads);
122 }
123 
124 void relaxed_test() {
125     float range_shifts_part = test(/*per body invocation call*/[]{});
126     const float require_tolerance = 0.5f;
127     // TODO: investigate why switching could happen in more than half of the cases
128     WARN_MESSAGE(
129         (0 <= range_shifts_part && range_shifts_part <= require_tolerance),
130         "Tasks affinitization was not respected in " << range_shifts_part * 100 << "% of the cases."
131     );
132 }
133 
134 void strict_test() {
135     utils::SpinBarrier barrier(2 * utils::get_platform_max_threads());
136     const float tolerance = 1e-5f;
137     while (test(/*per body invocation call*/[&barrier] { barrier.wait(); }) > tolerance);
138 }
139 
140 } // namespace task_affinity_retention
141 
142 //! Testing affinitized tasks are not stolen
143 //! \brief \ref error_guessing
144 TEST_CASE("Threads respect task affinity") {
145     task_affinity_retention::relaxed_test();
146     task_affinity_retention::strict_test();
147 }
148 
149 template <typename Range>
150 void test_custom_range(int diff_mult) {
151     int num_trials = 100;
152 
153     std::vector<std::vector<std::size_t>> results(num_trials);
154     oneapi::tbb::mutex results_mutex;
155 
156     for (int i = 0; i < num_trials; ++i) {
157         oneapi::tbb::parallel_for(Range(0, int(100 * utils::get_platform_max_threads()), 1), [&] (const Range& r) {
158             oneapi::tbb::mutex::scoped_lock lock(results_mutex);
159             results[i].push_back(r.size());
160         }, oneapi::tbb::static_partitioner{});
161     }
162 
163     for (auto& res : results) {
164         REQUIRE(res.size() == utils::get_platform_max_threads());
165 
166         std::size_t min_size = *std::min_element(res.begin(), res.end());
167         for (auto elem : res) {
168             REQUIRE(min_size * diff_mult + 2 >= elem);
169         }
170     }
171 }
172 
173 //! \brief \ref regression
174 TEST_CASE("Test partitioned tasks count and size for static_partitioner") {
175     class custom_range : public oneapi::tbb::blocked_range<int> {
176         using base_type = oneapi::tbb::blocked_range<int>;
177     public:
178         custom_range(int l, int r, int g) : base_type(l, r, g) {}
179         custom_range(const custom_range& r) : base_type(r) {}
180 
181         custom_range(custom_range& r, tbb::split) : base_type(r, tbb::split()) {}
182     };
183 
184     test_custom_range<custom_range>(2);
185 
186     class custom_range_with_psplit : public oneapi::tbb::blocked_range<int> {
187         using base_type = oneapi::tbb::blocked_range<int>;
188     public:
189         custom_range_with_psplit(int l, int r, int g) : base_type(l, r, g) {}
190         custom_range_with_psplit(const custom_range_with_psplit& r) : base_type(r) {}
191 
192         custom_range_with_psplit(custom_range_with_psplit& r, tbb::split) : base_type(r, tbb::split()) {}
193         custom_range_with_psplit(custom_range_with_psplit& r, tbb::proportional_split& p) : base_type(r, p) {}
194     };
195 
196     test_custom_range<custom_range_with_psplit>(1);
197 }
198