1 /*
2 Copyright (c) 2021-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/test.h"
18
19 #include "tbb/parallel_for.h"
20 #include "tbb/task_arena.h"
21 #include "tbb/global_control.h"
22 #include "oneapi/tbb/mutex.h"
23
24 #include "common/utils.h"
25 #include "common/utils_concurrency_limit.h"
26 #include "common/dummy_body.h"
27 #include "common/spin_barrier.h"
28
29 #include <cstddef>
30 #include <utility>
31 #include <vector>
32 #include <algorithm> // std::min_element
33
34 //! \file test_partitioner.cpp
35 //! \brief Test for [internal] functionality
36
37 namespace task_affinity_retention {
38
test(PerBodyFunc && body)39 template <typename PerBodyFunc> float test(PerBodyFunc&& body) {
40 const std::size_t num_threads = 2 * utils::get_platform_max_threads();
41 tbb::global_control concurrency(tbb::global_control::max_allowed_parallelism, num_threads);
42 tbb::task_arena big_arena(static_cast<int>(num_threads));
43
44 #if __TBB_USE_THREAD_SANITIZER
45 // Reduce execution time under Thread Sanitizer
46 const std::size_t repeats = 50;
47 #elif EMSCRIPTEN
48 // Reduce execution time for emscripten
49 const std::size_t repeats = 10;
50 #else
51 const std::size_t repeats = 100;
52 #endif
53 const std::size_t per_thread_iters = 1000;
54
55 using range = std::pair<std::size_t, std::size_t>;
56 using execution_trace = std::vector< std::vector<range> >;
57
58 execution_trace trace(num_threads);
59 for (auto& v : trace)
60 v.reserve(repeats);
61
62 for (std::size_t repeat = 0; repeat < repeats; ++repeat) {
63 big_arena.execute([&] {
64 tbb::parallel_for(
65 tbb::blocked_range<std::size_t>(0, per_thread_iters * num_threads),
66 [&](const tbb::blocked_range<std::size_t>& r) {
67 int thread_id = tbb::this_task_arena::current_thread_index();
68 trace[thread_id].emplace_back(r.begin(), r.end());
69
70 const bool is_uniform_split = r.size() == per_thread_iters;
71 CHECK_MESSAGE(is_uniform_split, "static partitioner split the range incorrectly.");
72
73 std::this_thread::yield();
74
75 std::forward<PerBodyFunc>(body)();
76 },
77 tbb::static_partitioner()
78 );
79 });
80 // TODO:
81 // - Consider introducing an observer to guarantee the threads left the arena.
82 }
83
84 std::size_t range_shifts = 0;
85 for (std::size_t thread_id = 0; thread_id < num_threads; ++thread_id) {
86 auto trace_size = trace[thread_id].size();
87 if (trace_size > 1) {
88 auto previous_call_range = trace[thread_id][1];
89
90 for (std::size_t invocation = 2; invocation < trace_size; ++invocation) {
91 const auto& current_call_range = trace[thread_id][invocation];
92
93 const bool is_range_changed = previous_call_range != current_call_range;
94 if (is_range_changed) {
95 previous_call_range = current_call_range;
96 // count thread changes its execution strategy
97 ++range_shifts;
98 }
99 }
100 }
101
102 #if TBB_USE_DEBUG
103 WARN_MESSAGE(
104 trace_size <= repeats,
105 "Thread " << thread_id << " executed extra " << trace_size - repeats
106 << " ranges assigned to other threads."
107 );
108 WARN_MESSAGE(
109 trace_size >= repeats,
110 "Thread " << thread_id << " executed " << repeats - trace_size
111 << " fewer ranges than expected."
112 );
113 #endif
114 }
115
116 #if TBB_USE_DEBUG
117 WARN_MESSAGE(
118 range_shifts == 0,
119 "Threads change subranges " << range_shifts << " times out of "
120 << num_threads * repeats - num_threads << " possible."
121 );
122 #endif
123
124 return float(range_shifts) / float(repeats * num_threads);
125 }
126
relaxed_test()127 void relaxed_test() {
128 float range_shifts_part = test(/*per body invocation call*/[]{});
129 const float require_tolerance = 0.5f;
130 // TODO: investigate why switching could happen in more than half of the cases
131 WARN_MESSAGE(
132 (0 <= range_shifts_part && range_shifts_part <= require_tolerance),
133 "Tasks affinitization was not respected in " << range_shifts_part * 100 << "% of the cases."
134 );
135 }
136
strict_test()137 void strict_test() {
138 utils::SpinBarrier barrier(2 * utils::get_platform_max_threads());
139 const float tolerance = 1e-5f;
140 while (test(/*per body invocation call*/[&barrier] { barrier.wait(); }) > tolerance);
141 }
142
143 } // namespace task_affinity_retention
144
145 //! Testing affinitized tasks are not stolen
146 //! \brief \ref error_guessing
147 TEST_CASE("Threads respect task affinity") {
148 task_affinity_retention::relaxed_test();
149 task_affinity_retention::strict_test();
150 }
151
152 template <typename Range>
test_custom_range(int diff_mult)153 void test_custom_range(int diff_mult) {
154 int num_trials = 100;
155
156 std::vector<std::vector<std::size_t>> results(num_trials);
157 oneapi::tbb::mutex results_mutex;
158
159 for (int i = 0; i < num_trials; ++i) {
160 oneapi::tbb::parallel_for(Range(0, int(100 * utils::get_platform_max_threads()), 1), [&] (const Range& r) {
161 oneapi::tbb::mutex::scoped_lock lock(results_mutex);
162 results[i].push_back(r.size());
163 }, oneapi::tbb::static_partitioner{});
164 }
165
166 for (auto& res : results) {
167 REQUIRE(res.size() == utils::get_platform_max_threads());
168
169 std::size_t min_size = *std::min_element(res.begin(), res.end());
170 for (auto elem : res) {
171 REQUIRE(min_size * diff_mult + 2 >= elem);
172 }
173 }
174 }
175
176 //! \brief \ref regression
177 TEST_CASE("Test partitioned tasks count and size for static_partitioner") {
178 class custom_range : public oneapi::tbb::blocked_range<int> {
179 using base_type = oneapi::tbb::blocked_range<int>;
180 public:
custom_range(int l,int r,int g)181 custom_range(int l, int r, int g) : base_type(l, r, g) {}
custom_range(const custom_range & r)182 custom_range(const custom_range& r) : base_type(r) {}
183
custom_range(custom_range & r,tbb::split)184 custom_range(custom_range& r, tbb::split) : base_type(r, tbb::split()) {}
185 };
186
187 test_custom_range<custom_range>(2);
188
189 class custom_range_with_psplit : public oneapi::tbb::blocked_range<int> {
190 using base_type = oneapi::tbb::blocked_range<int>;
191 public:
custom_range_with_psplit(int l,int r,int g)192 custom_range_with_psplit(int l, int r, int g) : base_type(l, r, g) {}
custom_range_with_psplit(const custom_range_with_psplit & r)193 custom_range_with_psplit(const custom_range_with_psplit& r) : base_type(r) {}
194
custom_range_with_psplit(custom_range_with_psplit & r,tbb::split)195 custom_range_with_psplit(custom_range_with_psplit& r, tbb::split) : base_type(r, tbb::split()) {}
custom_range_with_psplit(custom_range_with_psplit & r,tbb::proportional_split & p)196 custom_range_with_psplit(custom_range_with_psplit& r, tbb::proportional_split& p) : base_type(r, p) {}
197 };
198
199 test_custom_range<custom_range_with_psplit>(1);
200 }
201