1 /* 2 Copyright (c) 2021-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #include "tbb/parallel_for.h" 20 #include "tbb/task_arena.h" 21 #include "tbb/global_control.h" 22 #include "oneapi/tbb/mutex.h" 23 24 #include "common/utils.h" 25 #include "common/utils_concurrency_limit.h" 26 #include "common/dummy_body.h" 27 #include "common/spin_barrier.h" 28 29 #include <cstddef> 30 #include <utility> 31 #include <vector> 32 #include <algorithm> // std::min_element 33 34 //! \file test_partitioner.cpp 35 //! \brief Test for [internal] functionality 36 37 namespace task_affinity_retention { 38 39 template <typename PerBodyFunc> float test(PerBodyFunc&& body) { 40 const std::size_t num_threads = 2 * utils::get_platform_max_threads(); 41 tbb::global_control concurrency(tbb::global_control::max_allowed_parallelism, num_threads); 42 tbb::task_arena big_arena(static_cast<int>(num_threads)); 43 44 #if __TBB_USE_THREAD_SANITIZER 45 // Reduce execution time under Thread Sanitizer 46 const std::size_t repeats = 50; 47 #else 48 const std::size_t repeats = 100; 49 #endif 50 const std::size_t per_thread_iters = 1000; 51 52 using range = std::pair<std::size_t, std::size_t>; 53 using execution_trace = std::vector< std::vector<range> >; 54 55 execution_trace trace(num_threads); 56 for (auto& v : trace) 57 v.reserve(repeats); 58 59 for (std::size_t repeat = 0; repeat < repeats; ++repeat) { 60 big_arena.execute([&] { 61 tbb::parallel_for( 62 tbb::blocked_range<std::size_t>(0, per_thread_iters * num_threads), 63 [&](const tbb::blocked_range<std::size_t>& r) { 64 int thread_id = tbb::this_task_arena::current_thread_index(); 65 trace[thread_id].emplace_back(r.begin(), r.end()); 66 67 const bool is_uniform_split = r.size() == per_thread_iters; 68 CHECK_MESSAGE(is_uniform_split, "static partitioner split the range incorrectly."); 69 70 std::this_thread::yield(); 71 72 std::forward<PerBodyFunc>(body)(); 73 }, 74 tbb::static_partitioner() 75 ); 76 }); 77 // TODO: 78 // - Consider introducing an observer to guarantee the threads left the arena. 79 } 80 81 std::size_t range_shifts = 0; 82 for (std::size_t thread_id = 0; thread_id < num_threads; ++thread_id) { 83 auto trace_size = trace[thread_id].size(); 84 if (trace_size > 1) { 85 auto previous_call_range = trace[thread_id][1]; 86 87 for (std::size_t invocation = 2; invocation < trace_size; ++invocation) { 88 const auto& current_call_range = trace[thread_id][invocation]; 89 90 const bool is_range_changed = previous_call_range != current_call_range; 91 if (is_range_changed) { 92 previous_call_range = current_call_range; 93 // count thread changes its execution strategy 94 ++range_shifts; 95 } 96 } 97 } 98 99 #if TBB_USE_DEBUG 100 WARN_MESSAGE( 101 trace_size <= repeats, 102 "Thread " << thread_id << " executed extra " << trace_size - repeats 103 << " ranges assigned to other threads." 104 ); 105 WARN_MESSAGE( 106 trace_size >= repeats, 107 "Thread " << thread_id << " executed " << repeats - trace_size 108 << " fewer ranges than expected." 109 ); 110 #endif 111 } 112 113 #if TBB_USE_DEBUG 114 WARN_MESSAGE( 115 range_shifts == 0, 116 "Threads change subranges " << range_shifts << " times out of " 117 << num_threads * repeats - num_threads << " possible." 118 ); 119 #endif 120 121 return float(range_shifts) / float(repeats * num_threads); 122 } 123 124 void relaxed_test() { 125 float range_shifts_part = test(/*per body invocation call*/[]{}); 126 const float require_tolerance = 0.5f; 127 // TODO: investigate why switching could happen in more than half of the cases 128 WARN_MESSAGE( 129 (0 <= range_shifts_part && range_shifts_part <= require_tolerance), 130 "Tasks affinitization was not respected in " << range_shifts_part * 100 << "% of the cases." 131 ); 132 } 133 134 void strict_test() { 135 utils::SpinBarrier barrier(2 * utils::get_platform_max_threads()); 136 const float tolerance = 1e-5f; 137 while (test(/*per body invocation call*/[&barrier] { barrier.wait(); }) > tolerance); 138 } 139 140 } // namespace task_affinity_retention 141 142 //! Testing affinitized tasks are not stolen 143 //! \brief \ref error_guessing 144 TEST_CASE("Threads respect task affinity") { 145 task_affinity_retention::relaxed_test(); 146 task_affinity_retention::strict_test(); 147 } 148 149 template <typename Range> 150 void test_custom_range(int diff_mult) { 151 int num_trials = 100; 152 153 std::vector<std::vector<std::size_t>> results(num_trials); 154 oneapi::tbb::mutex results_mutex; 155 156 for (int i = 0; i < num_trials; ++i) { 157 oneapi::tbb::parallel_for(Range(0, int(100 * utils::get_platform_max_threads()), 1), [&] (const Range& r) { 158 oneapi::tbb::mutex::scoped_lock lock(results_mutex); 159 results[i].push_back(r.size()); 160 }, oneapi::tbb::static_partitioner{}); 161 } 162 163 for (auto& res : results) { 164 REQUIRE(res.size() == utils::get_platform_max_threads()); 165 166 std::size_t min_size = *std::min_element(res.begin(), res.end()); 167 for (auto elem : res) { 168 REQUIRE(min_size * diff_mult + 2 >= elem); 169 } 170 } 171 } 172 173 //! \brief \ref regression 174 TEST_CASE("Test partitioned tasks count and size for static_partitioner") { 175 class custom_range : public oneapi::tbb::blocked_range<int> { 176 using base_type = oneapi::tbb::blocked_range<int>; 177 public: 178 custom_range(int l, int r, int g) : base_type(l, r, g) {} 179 custom_range(const custom_range& r) : base_type(r) {} 180 181 custom_range(custom_range& r, tbb::split) : base_type(r, tbb::split()) {} 182 }; 183 184 test_custom_range<custom_range>(2); 185 186 class custom_range_with_psplit : public oneapi::tbb::blocked_range<int> { 187 using base_type = oneapi::tbb::blocked_range<int>; 188 public: 189 custom_range_with_psplit(int l, int r, int g) : base_type(l, r, g) {} 190 custom_range_with_psplit(const custom_range_with_psplit& r) : base_type(r) {} 191 192 custom_range_with_psplit(custom_range_with_psplit& r, tbb::split) : base_type(r, tbb::split()) {} 193 custom_range_with_psplit(custom_range_with_psplit& r, tbb::proportional_split& p) : base_type(r, p) {} 194 }; 195 196 test_custom_range<custom_range_with_psplit>(1); 197 } 198