1 /* 2 Copyright (c) 2021-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #include "tbb/parallel_for.h" 20 #include "tbb/task_arena.h" 21 #include "tbb/global_control.h" 22 #include "oneapi/tbb/mutex.h" 23 24 #include "common/utils.h" 25 #include "common/utils_concurrency_limit.h" 26 #include "common/dummy_body.h" 27 #include "common/spin_barrier.h" 28 29 #include <cstddef> 30 #include <utility> 31 #include <vector> 32 #include <algorithm> // std::min_element 33 34 //! \file test_partitioner.cpp 35 //! \brief Test for [internal] functionality 36 37 namespace task_affinity_retention { 38 39 template <typename PerBodyFunc> float test(PerBodyFunc&& body) { 40 const std::size_t num_threads = 2 * utils::get_platform_max_threads(); 41 tbb::global_control concurrency(tbb::global_control::max_allowed_parallelism, num_threads); 42 tbb::task_arena big_arena(static_cast<int>(num_threads)); 43 44 #if __TBB_USE_THREAD_SANITIZER 45 // Reduce execution time under Thread Sanitizer 46 const std::size_t repeats = 50; 47 #elif EMSCRIPTEN 48 // Reduce execution time for emscripten 49 const std::size_t repeats = 10; 50 #else 51 const std::size_t repeats = 100; 52 #endif 53 const std::size_t per_thread_iters = 1000; 54 55 using range = std::pair<std::size_t, std::size_t>; 56 using execution_trace = std::vector< std::vector<range> >; 57 58 execution_trace trace(num_threads); 59 for (auto& v : trace) 60 v.reserve(repeats); 61 62 for (std::size_t repeat = 0; repeat < repeats; ++repeat) { 63 big_arena.execute([&] { 64 tbb::parallel_for( 65 tbb::blocked_range<std::size_t>(0, per_thread_iters * num_threads), 66 [&](const tbb::blocked_range<std::size_t>& r) { 67 int thread_id = tbb::this_task_arena::current_thread_index(); 68 trace[thread_id].emplace_back(r.begin(), r.end()); 69 70 const bool is_uniform_split = r.size() == per_thread_iters; 71 CHECK_MESSAGE(is_uniform_split, "static partitioner split the range incorrectly."); 72 73 std::this_thread::yield(); 74 75 std::forward<PerBodyFunc>(body)(); 76 }, 77 tbb::static_partitioner() 78 ); 79 }); 80 // TODO: 81 // - Consider introducing an observer to guarantee the threads left the arena. 82 } 83 84 std::size_t range_shifts = 0; 85 for (std::size_t thread_id = 0; thread_id < num_threads; ++thread_id) { 86 auto trace_size = trace[thread_id].size(); 87 if (trace_size > 1) { 88 auto previous_call_range = trace[thread_id][1]; 89 90 for (std::size_t invocation = 2; invocation < trace_size; ++invocation) { 91 const auto& current_call_range = trace[thread_id][invocation]; 92 93 const bool is_range_changed = previous_call_range != current_call_range; 94 if (is_range_changed) { 95 previous_call_range = current_call_range; 96 // count thread changes its execution strategy 97 ++range_shifts; 98 } 99 } 100 } 101 102 #if TBB_USE_DEBUG 103 WARN_MESSAGE( 104 trace_size <= repeats, 105 "Thread " << thread_id << " executed extra " << trace_size - repeats 106 << " ranges assigned to other threads." 107 ); 108 WARN_MESSAGE( 109 trace_size >= repeats, 110 "Thread " << thread_id << " executed " << repeats - trace_size 111 << " fewer ranges than expected." 112 ); 113 #endif 114 } 115 116 #if TBB_USE_DEBUG 117 WARN_MESSAGE( 118 range_shifts == 0, 119 "Threads change subranges " << range_shifts << " times out of " 120 << num_threads * repeats - num_threads << " possible." 121 ); 122 #endif 123 124 return float(range_shifts) / float(repeats * num_threads); 125 } 126 127 void relaxed_test() { 128 float range_shifts_part = test(/*per body invocation call*/[]{}); 129 const float require_tolerance = 0.5f; 130 // TODO: investigate why switching could happen in more than half of the cases 131 WARN_MESSAGE( 132 (0 <= range_shifts_part && range_shifts_part <= require_tolerance), 133 "Tasks affinitization was not respected in " << range_shifts_part * 100 << "% of the cases." 134 ); 135 } 136 137 void strict_test() { 138 utils::SpinBarrier barrier(2 * utils::get_platform_max_threads()); 139 const float tolerance = 1e-5f; 140 while (test(/*per body invocation call*/[&barrier] { barrier.wait(); }) > tolerance); 141 } 142 143 } // namespace task_affinity_retention 144 145 //! Testing affinitized tasks are not stolen 146 //! \brief \ref error_guessing 147 TEST_CASE("Threads respect task affinity") { 148 task_affinity_retention::relaxed_test(); 149 task_affinity_retention::strict_test(); 150 } 151 152 template <typename Range> 153 void test_custom_range(int diff_mult) { 154 int num_trials = 100; 155 156 std::vector<std::vector<std::size_t>> results(num_trials); 157 oneapi::tbb::mutex results_mutex; 158 159 for (int i = 0; i < num_trials; ++i) { 160 oneapi::tbb::parallel_for(Range(0, int(100 * utils::get_platform_max_threads()), 1), [&] (const Range& r) { 161 oneapi::tbb::mutex::scoped_lock lock(results_mutex); 162 results[i].push_back(r.size()); 163 }, oneapi::tbb::static_partitioner{}); 164 } 165 166 for (auto& res : results) { 167 REQUIRE(res.size() == utils::get_platform_max_threads()); 168 169 std::size_t min_size = *std::min_element(res.begin(), res.end()); 170 for (auto elem : res) { 171 REQUIRE(min_size * diff_mult + 2 >= elem); 172 } 173 } 174 } 175 176 //! \brief \ref regression 177 TEST_CASE("Test partitioned tasks count and size for static_partitioner") { 178 class custom_range : public oneapi::tbb::blocked_range<int> { 179 using base_type = oneapi::tbb::blocked_range<int>; 180 public: 181 custom_range(int l, int r, int g) : base_type(l, r, g) {} 182 custom_range(const custom_range& r) : base_type(r) {} 183 184 custom_range(custom_range& r, tbb::split) : base_type(r, tbb::split()) {} 185 }; 186 187 test_custom_range<custom_range>(2); 188 189 class custom_range_with_psplit : public oneapi::tbb::blocked_range<int> { 190 using base_type = oneapi::tbb::blocked_range<int>; 191 public: 192 custom_range_with_psplit(int l, int r, int g) : base_type(l, r, g) {} 193 custom_range_with_psplit(const custom_range_with_psplit& r) : base_type(r) {} 194 195 custom_range_with_psplit(custom_range_with_psplit& r, tbb::split) : base_type(r, tbb::split()) {} 196 custom_range_with_psplit(custom_range_with_psplit& r, tbb::proportional_split& p) : base_type(r, p) {} 197 }; 198 199 test_custom_range<custom_range_with_psplit>(1); 200 } 201