xref: /oneTBB/test/tbb/test_partitioner.cpp (revision f2e97f94)
1 /*
2     Copyright (c) 2021-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #include "tbb/parallel_for.h"
20 #include "tbb/task_arena.h"
21 #include "tbb/global_control.h"
22 #include "oneapi/tbb/mutex.h"
23 
24 #include "common/utils.h"
25 #include "common/utils_concurrency_limit.h"
26 #include "common/dummy_body.h"
27 #include "common/spin_barrier.h"
28 
29 #include <cstddef>
30 #include <utility>
31 #include <vector>
32 #include <algorithm> // std::min_element
33 
34 //! \file test_partitioner.cpp
35 //! \brief Test for [internal] functionality
36 
37 namespace task_affinity_retention {
38 
39 template <typename PerBodyFunc> float test(PerBodyFunc&& body) {
40     const std::size_t num_threads = 2 * utils::get_platform_max_threads();
41     tbb::global_control concurrency(tbb::global_control::max_allowed_parallelism, num_threads);
42     tbb::task_arena big_arena(static_cast<int>(num_threads));
43 
44     const std::size_t repeats = 100;
45     const std::size_t per_thread_iters = 1000;
46 
47     using range = std::pair<std::size_t, std::size_t>;
48     using execution_trace = std::vector< std::vector<range> >;
49 
50     execution_trace trace(num_threads);
51     for (auto& v : trace)
52         v.reserve(repeats);
53 
54     for (std::size_t repeat = 0; repeat < repeats; ++repeat) {
55         big_arena.execute([&] {
56             tbb::parallel_for(
57                 tbb::blocked_range<std::size_t>(0, per_thread_iters * num_threads),
58                 [&](const tbb::blocked_range<std::size_t>& r) {
59                     int thread_id = tbb::this_task_arena::current_thread_index();
60                     trace[thread_id].emplace_back(r.begin(), r.end());
61 
62                     const bool is_uniform_split = r.size() == per_thread_iters;
63                     CHECK_MESSAGE(is_uniform_split, "static partitioner split the range incorrectly.");
64 
65                     std::this_thread::yield();
66 
67                     std::forward<PerBodyFunc>(body)();
68                 },
69                 tbb::static_partitioner()
70             );
71         });
72         // TODO:
73         //   - Consider introducing an observer to guarantee the threads left the arena.
74     }
75 
76     std::size_t range_shifts = 0;
77     for (std::size_t thread_id = 0; thread_id < num_threads; ++thread_id) {
78         auto trace_size = trace[thread_id].size();
79         if (trace_size > 1) {
80             auto previous_call_range = trace[thread_id][1];
81 
82             for (std::size_t invocation = 2; invocation < trace_size; ++invocation) {
83                 const auto& current_call_range = trace[thread_id][invocation];
84 
85                 const bool is_range_changed = previous_call_range != current_call_range;
86                 if (is_range_changed) {
87                     previous_call_range = current_call_range;
88                     // count thread changes its execution strategy
89                     ++range_shifts;
90                 }
91             }
92         }
93 
94 #if TBB_USE_DEBUG
95         WARN_MESSAGE(
96             trace_size <= repeats,
97             "Thread " << thread_id << " executed extra " << trace_size - repeats
98             << " ranges assigned to other threads."
99         );
100         WARN_MESSAGE(
101             trace_size >= repeats,
102             "Thread " << thread_id << " executed " << repeats - trace_size
103             << " fewer ranges than expected."
104         );
105 #endif
106     }
107 
108 #if TBB_USE_DEBUG
109     WARN_MESSAGE(
110         range_shifts == 0,
111         "Threads change subranges " << range_shifts << " times out of "
112         << num_threads * repeats - num_threads << " possible."
113     );
114 #endif
115 
116     return float(range_shifts) / float(repeats * num_threads);
117 }
118 
119 void relaxed_test() {
120     float range_shifts_part = test(/*per body invocation call*/[]{});
121     const float require_tolerance = 0.5f;
122     // TODO: investigate why switching could happen in more than half of the cases
123     WARN_MESSAGE(
124         (0 <= range_shifts_part && range_shifts_part <= require_tolerance),
125         "Tasks affinitization was not respected in " << range_shifts_part * 100 << "% of the cases."
126     );
127 }
128 
129 void strict_test() {
130     utils::SpinBarrier barrier(2 * utils::get_platform_max_threads());
131     const float tolerance = 1e-5f;
132     while (test(/*per body invocation call*/[&barrier] { barrier.wait(); }) > tolerance);
133 }
134 
135 } // namespace task_affinity_retention
136 
137 //! Testing affinitized tasks are not stolen
138 //! \brief \ref error_guessing
139 TEST_CASE("Threads respect task affinity") {
140     task_affinity_retention::relaxed_test();
141     task_affinity_retention::strict_test();
142 }
143 
144 template <typename Range>
145 void test_custom_range(int diff_mult) {
146     int num_trials = 100;
147 
148     std::vector<std::vector<std::size_t>> results(num_trials);
149     oneapi::tbb::mutex results_mutex;
150 
151     for (int i = 0; i < num_trials; ++i) {
152         oneapi::tbb::parallel_for(Range(0, int(100 * utils::get_platform_max_threads()), 1), [&] (const Range& r) {
153             oneapi::tbb::mutex::scoped_lock lock(results_mutex);
154             results[i].push_back(r.size());
155         }, oneapi::tbb::static_partitioner{});
156     }
157 
158     for (auto& res : results) {
159         REQUIRE(res.size() == utils::get_platform_max_threads());
160 
161         std::size_t min_size = *std::min_element(res.begin(), res.end());
162         for (auto elem : res) {
163             REQUIRE(min_size * diff_mult + 2 >= elem);
164         }
165     }
166 }
167 
168 //! \brief \ref regression
169 TEST_CASE("Test partitioned tasks count and size for static_partitioner") {
170     class custom_range : public oneapi::tbb::blocked_range<int> {
171         using base_type = oneapi::tbb::blocked_range<int>;
172     public:
173         custom_range(int l, int r, int g) : base_type(l, r, g) {}
174         custom_range(const custom_range& r) : base_type(r) {}
175 
176         custom_range(custom_range& r, tbb::split) : base_type(r, tbb::split()) {}
177     };
178 
179     test_custom_range<custom_range>(2);
180 
181     class custom_range_with_psplit : public oneapi::tbb::blocked_range<int> {
182         using base_type = oneapi::tbb::blocked_range<int>;
183     public:
184         custom_range_with_psplit(int l, int r, int g) : base_type(l, r, g) {}
185         custom_range_with_psplit(const custom_range_with_psplit& r) : base_type(r) {}
186 
187         custom_range_with_psplit(custom_range_with_psplit& r, tbb::split) : base_type(r, tbb::split()) {}
188         custom_range_with_psplit(custom_range_with_psplit& r, tbb::proportional_split& p) : base_type(r, p) {}
189     };
190 
191     test_custom_range<custom_range_with_psplit>(1);
192 }
193