1 /* 2 Copyright (c) 2021-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #include "tbb/parallel_for.h" 20 #include "tbb/task_arena.h" 21 #include "tbb/global_control.h" 22 #include "oneapi/tbb/mutex.h" 23 24 #include "common/utils.h" 25 #include "common/utils_concurrency_limit.h" 26 #include "common/dummy_body.h" 27 #include "common/spin_barrier.h" 28 29 #include <cstddef> 30 #include <utility> 31 #include <vector> 32 #include <algorithm> // std::min_element 33 34 //! \file test_partitioner.cpp 35 //! \brief Test for [internal] functionality 36 37 namespace task_affinity_retention { 38 39 template <typename PerBodyFunc> float test(PerBodyFunc&& body) { 40 const std::size_t num_threads = 2 * utils::get_platform_max_threads(); 41 tbb::global_control concurrency(tbb::global_control::max_allowed_parallelism, num_threads); 42 tbb::task_arena big_arena(static_cast<int>(num_threads)); 43 44 const std::size_t repeats = 100; 45 const std::size_t per_thread_iters = 1000; 46 47 using range = std::pair<std::size_t, std::size_t>; 48 using execution_trace = std::vector< std::vector<range> >; 49 50 execution_trace trace(num_threads); 51 for (auto& v : trace) 52 v.reserve(repeats); 53 54 for (std::size_t repeat = 0; repeat < repeats; ++repeat) { 55 big_arena.execute([&] { 56 tbb::parallel_for( 57 tbb::blocked_range<std::size_t>(0, per_thread_iters * num_threads), 58 [&](const tbb::blocked_range<std::size_t>& r) { 59 int thread_id = tbb::this_task_arena::current_thread_index(); 60 trace[thread_id].emplace_back(r.begin(), r.end()); 61 62 const bool is_uniform_split = r.size() == per_thread_iters; 63 CHECK_MESSAGE(is_uniform_split, "static partitioner split the range incorrectly."); 64 65 std::this_thread::yield(); 66 67 std::forward<PerBodyFunc>(body)(); 68 }, 69 tbb::static_partitioner() 70 ); 71 }); 72 // TODO: 73 // - Consider introducing an observer to guarantee the threads left the arena. 74 } 75 76 std::size_t range_shifts = 0; 77 for (std::size_t thread_id = 0; thread_id < num_threads; ++thread_id) { 78 auto trace_size = trace[thread_id].size(); 79 if (trace_size > 1) { 80 auto previous_call_range = trace[thread_id][1]; 81 82 for (std::size_t invocation = 2; invocation < trace_size; ++invocation) { 83 const auto& current_call_range = trace[thread_id][invocation]; 84 85 const bool is_range_changed = previous_call_range != current_call_range; 86 if (is_range_changed) { 87 previous_call_range = current_call_range; 88 // count thread changes its execution strategy 89 ++range_shifts; 90 } 91 } 92 } 93 94 #if TBB_USE_DEBUG 95 WARN_MESSAGE( 96 trace_size <= repeats, 97 "Thread " << thread_id << " executed extra " << trace_size - repeats 98 << " ranges assigned to other threads." 99 ); 100 WARN_MESSAGE( 101 trace_size >= repeats, 102 "Thread " << thread_id << " executed " << repeats - trace_size 103 << " fewer ranges than expected." 104 ); 105 #endif 106 } 107 108 #if TBB_USE_DEBUG 109 WARN_MESSAGE( 110 range_shifts == 0, 111 "Threads change subranges " << range_shifts << " times out of " 112 << num_threads * repeats - num_threads << " possible." 113 ); 114 #endif 115 116 return float(range_shifts) / float(repeats * num_threads); 117 } 118 119 void relaxed_test() { 120 float range_shifts_part = test(/*per body invocation call*/[]{}); 121 const float require_tolerance = 0.5f; 122 // TODO: investigate why switching could happen in more than half of the cases 123 WARN_MESSAGE( 124 (0 <= range_shifts_part && range_shifts_part <= require_tolerance), 125 "Tasks affinitization was not respected in " << range_shifts_part * 100 << "% of the cases." 126 ); 127 } 128 129 void strict_test() { 130 utils::SpinBarrier barrier(2 * utils::get_platform_max_threads()); 131 const float tolerance = 1e-5f; 132 while (test(/*per body invocation call*/[&barrier] { barrier.wait(); }) > tolerance); 133 } 134 135 } // namespace task_affinity_retention 136 137 //! Testing affinitized tasks are not stolen 138 //! \brief \ref error_guessing 139 TEST_CASE("Threads respect task affinity") { 140 task_affinity_retention::relaxed_test(); 141 task_affinity_retention::strict_test(); 142 } 143 144 template <typename Range> 145 void test_custom_range(int diff_mult) { 146 int num_trials = 100; 147 148 std::vector<std::vector<std::size_t>> results(num_trials); 149 oneapi::tbb::mutex results_mutex; 150 151 for (int i = 0; i < num_trials; ++i) { 152 oneapi::tbb::parallel_for(Range(0, int(100 * utils::get_platform_max_threads()), 1), [&] (const Range& r) { 153 oneapi::tbb::mutex::scoped_lock lock(results_mutex); 154 results[i].push_back(r.size()); 155 }, oneapi::tbb::static_partitioner{}); 156 } 157 158 for (auto& res : results) { 159 REQUIRE(res.size() == utils::get_platform_max_threads()); 160 161 std::size_t min_size = *std::min_element(res.begin(), res.end()); 162 for (auto elem : res) { 163 REQUIRE(min_size * diff_mult + 2 >= elem); 164 } 165 } 166 } 167 168 //! \brief \ref regression 169 TEST_CASE("Test partitioned tasks count and size for static_partitioner") { 170 class custom_range : public oneapi::tbb::blocked_range<int> { 171 using base_type = oneapi::tbb::blocked_range<int>; 172 public: 173 custom_range(int l, int r, int g) : base_type(l, r, g) {} 174 custom_range(const custom_range& r) : base_type(r) {} 175 176 custom_range(custom_range& r, tbb::split) : base_type(r, tbb::split()) {} 177 }; 178 179 test_custom_range<custom_range>(2); 180 181 class custom_range_with_psplit : public oneapi::tbb::blocked_range<int> { 182 using base_type = oneapi::tbb::blocked_range<int>; 183 public: 184 custom_range_with_psplit(int l, int r, int g) : base_type(l, r, g) {} 185 custom_range_with_psplit(const custom_range_with_psplit& r) : base_type(r) {} 186 187 custom_range_with_psplit(custom_range_with_psplit& r, tbb::split) : base_type(r, tbb::split()) {} 188 custom_range_with_psplit(custom_range_with_psplit& r, tbb::proportional_split& p) : base_type(r, p) {} 189 }; 190 191 test_custom_range<custom_range_with_psplit>(1); 192 } 193