1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #include "tbb/task_group.h" 20 #include "tbb/task_arena.h" 21 #include "tbb/global_control.h" 22 23 #include "common/spin_barrier.h" 24 #include "common/utils.h" 25 #include "common/utils_concurrency_limit.h" 26 27 #include <cstddef> 28 #include <algorithm> 29 #include <numeric> 30 31 //! \file test_arena_priorities.cpp 32 //! \brief Test for [scheduler.task_arena] specification 33 34 //--------------------------------------------------// 35 36 std::vector<tbb::task_arena::priority> g_task_info; 37 38 std::atomic<unsigned> g_task_num; 39 40 std::atomic<bool> g_work_submitted; 41 42 namespace HighPriorityArenasTakeExecutionPrecedence { 43 44 using concurrency_type = int; 45 using arena_info = 46 std::tuple<std::unique_ptr<tbb::task_arena>, 47 concurrency_type, 48 tbb::task_arena::priority, 49 std::unique_ptr<tbb::task_group>>; 50 51 enum arena_info_keys { 52 arena_pointer, arena_concurrency, arena_priority, associated_task_group 53 }; 54 55 void prepare_logging_data(std::vector<tbb::task_arena::priority>& task_log, unsigned overall_tasks_num) { 56 task_log.clear(); 57 task_log.resize(overall_tasks_num); 58 for( auto& record : task_log ) 59 record = tbb::task_arena::priority::normal; 60 } 61 62 template<typename... ArenaArgs> 63 tbb::task_arena* do_allocate_and_construct( const ArenaArgs&... arena_args ) 64 { 65 const int dummy_max_concurrency = 4; 66 const int dummy_reserved_for_masters = 4; 67 68 enum initialization_methods { 69 lazy, 70 explicit_initialize, 71 explicit_initialize_with_different_constructor_parameters, 72 initialization_methods_num 73 }; 74 static initialization_methods initialization_method = lazy; 75 76 tbb::task_arena* result_arena = nullptr; 77 78 switch( initialization_method ) { 79 80 case lazy: 81 result_arena = new tbb::task_arena( arena_args... ); 82 break; 83 84 case explicit_initialize: 85 result_arena = new tbb::task_arena; 86 result_arena->initialize( arena_args... ); 87 break; 88 89 case explicit_initialize_with_different_constructor_parameters: 90 { 91 tbb::task_arena tmp(dummy_max_concurrency, dummy_reserved_for_masters); 92 result_arena = new tbb::task_arena(tmp); 93 result_arena->initialize(arena_args...); 94 break; 95 } 96 97 default: 98 REQUIRE_MESSAGE( false, "Not implemented method of initialization." ); 99 break; 100 } 101 102 int next_value = (initialization_method + 1) % initialization_methods_num; 103 initialization_method = (initialization_methods)next_value; 104 105 return result_arena; 106 } 107 108 template<typename FirstArenaArg> 109 tbb::task_arena* decide_on_arguments( 110 const FirstArenaArg& first_arg, const int reserved_for_masters, 111 tbb::task_arena::priority a_priority ) 112 { 113 const tbb::task_arena::priority default_priority = tbb::task_arena::priority::normal; 114 static bool pass_default_priority_implicitly = false; 115 if( default_priority == a_priority ) { 116 pass_default_priority_implicitly = !pass_default_priority_implicitly; 117 if( pass_default_priority_implicitly ) 118 return do_allocate_and_construct( first_arg, reserved_for_masters ); 119 } 120 return do_allocate_and_construct( first_arg, reserved_for_masters, a_priority ); 121 } 122 123 124 tbb::task_arena* allocate_and_construct_arena( 125 int arena_max_concurrency, tbb::task_arena::priority a_priority ) 126 { 127 const int reserved_for_masters = 0; 128 129 static bool use_constraints = false; 130 use_constraints = !use_constraints; 131 132 if( use_constraints ) { 133 tbb::task_arena::constraints properties{tbb::task_arena::automatic, arena_max_concurrency}; 134 return decide_on_arguments( properties, reserved_for_masters, a_priority ); 135 } 136 137 return decide_on_arguments( arena_max_concurrency, reserved_for_masters, a_priority ); 138 } 139 140 void submit_work( std::vector<arena_info>& arenas, unsigned repeats, utils::SpinBarrier& barrier ) { 141 for( auto& item : arenas ) { 142 tbb::task_arena& arena = *std::get<arena_pointer>(item).get(); 143 concurrency_type concurrency = std::get<arena_concurrency>(item); 144 tbb::task_arena::priority priority_value = std::get<arena_priority>(item); 145 auto& tg = std::get<associated_task_group>(item); 146 147 arena.execute( 148 [repeats, &barrier, &tg, priority_value, concurrency]() { 149 for( unsigned i = 0; i < repeats * concurrency; ++i ) { 150 tg->run( 151 [&barrier, priority_value](){ 152 while( !g_work_submitted.load(std::memory_order_acquire) ) 153 utils::yield(); 154 g_task_info[g_task_num++] = priority_value; 155 barrier.wait(); 156 } 157 ); 158 } 159 } // arena work submission functor 160 ); 161 } 162 } 163 164 void wait_work_completion( 165 std::vector<arena_info>& arenas, std::size_t max_num_threads, unsigned overall_tasks_num ) 166 { 167 if( max_num_threads > 1 ) 168 while( g_task_num < overall_tasks_num ) 169 utils::yield(); 170 171 for( auto& item : arenas ) { 172 tbb::task_arena& arena = *std::get<arena_pointer>(item).get(); 173 auto& tg = std::get<associated_task_group>(item); 174 arena.execute( [&tg]() { tg->wait(); } ); 175 } 176 CHECK_MESSAGE(g_task_num == overall_tasks_num, "Not all tasks were executed."); 177 } 178 179 void test() { 180 181 const std::size_t max_num_threads = utils::get_platform_max_threads(); 182 183 tbb::global_control control(tbb::global_control::max_allowed_parallelism, max_num_threads + 1); 184 concurrency_type signed_max_num_threads = static_cast<int>(max_num_threads); 185 if (1 == max_num_threads) { 186 // Skipping workerless case 187 return; 188 } 189 190 INFO( "max_num_threads = " << max_num_threads ); 191 // TODO: iterate over threads to see that the work is going on in low priority arena. 192 193 const int min_arena_concurrency = 2; // implementation detail 194 195 tbb::task_arena::priority high = tbb::task_arena::priority::high; 196 tbb::task_arena::priority normal = tbb::task_arena::priority::normal; 197 tbb::task_arena::priority low = tbb::task_arena::priority::low; 198 199 // TODO: use vector or std::array of priorities instead of the c-style array. 200 201 // TODO: consider extending priorities to have more than three arenas. 202 203 tbb::task_arena::priority priorities[] = {high, normal, low}; // keep it sorted 204 const unsigned priorities_num = sizeof(priorities) / sizeof(priorities[0]); 205 const unsigned overall_arenas_num = priorities_num; 206 207 std::vector<arena_info> arenas; 208 209 std::vector<unsigned> progressing_arenas( overall_arenas_num, 0 ); 210 std::iota( progressing_arenas.begin(), progressing_arenas.end(), 1 ); 211 212 for( const auto& progressing_arenas_num : progressing_arenas ) { 213 214 INFO( "progressing_arenas_num = " << progressing_arenas_num ); 215 216 // TODO: consider populating vector with arenas in separate function. 217 unsigned adjusted_progressing_arenas = progressing_arenas_num; 218 219 arenas.clear(); 220 g_task_num = 0; 221 222 concurrency_type projected_concurrency = 223 (signed_max_num_threads + progressing_arenas_num - 1) / progressing_arenas_num; 224 projected_concurrency = std::max(min_arena_concurrency, projected_concurrency); // implementation detail 225 adjusted_progressing_arenas = signed_max_num_threads / projected_concurrency; 226 227 int threads_left = signed_max_num_threads; 228 229 // Instantiate arenas with necessary concurrency so that progressing arenas consume all 230 // available threads. 231 for( unsigned arena_idx = 0; arena_idx < overall_arenas_num; ++arena_idx ) { 232 tbb::task_arena::priority a_priority = priorities[arena_idx]; 233 234 concurrency_type concurrency = projected_concurrency; 235 concurrency_type actual_concurrency = projected_concurrency; 236 if( threads_left < actual_concurrency || 237 arena_idx == adjusted_progressing_arenas - 1 ) // give all remaining threads to last progressing arena 238 { 239 concurrency = actual_concurrency = threads_left; 240 } 241 242 threads_left -= actual_concurrency; 243 244 if( !actual_concurrency ) { 245 concurrency = tbb::task_arena::automatic; 246 actual_concurrency = signed_max_num_threads; 247 } 248 actual_concurrency = std::max( min_arena_concurrency, actual_concurrency ); // implementation detail 249 250 tbb::task_arena* arena = allocate_and_construct_arena(concurrency, a_priority); 251 arenas.push_back( 252 std::make_tuple( 253 std::unique_ptr<tbb::task_arena>(arena), 254 actual_concurrency, 255 a_priority, 256 std::unique_ptr<tbb::task_group>(new tbb::task_group) 257 ) 258 ); 259 } 260 261 std::rotate( arenas.begin(), arenas.begin() + progressing_arenas_num - 1, arenas.end() ); 262 263 const unsigned repeats = 10; 264 265 unsigned overall_tasks_num = 0; 266 for( auto& item : arenas ) 267 overall_tasks_num += std::get<arena_concurrency>(item) * repeats; 268 269 prepare_logging_data( g_task_info, overall_tasks_num ); 270 271 g_work_submitted = false; 272 273 utils::SpinBarrier barrier{ max_num_threads }; 274 submit_work( arenas, repeats, barrier ); 275 276 g_work_submitted = true; 277 278 wait_work_completion( arenas, max_num_threads, overall_tasks_num ); 279 280 std::map<tbb::task_arena::priority, unsigned> wasted_tasks; 281 282 tbb::task_arena::priority* end_ptr = priorities + adjusted_progressing_arenas; 283 284 { 285 // First epoch - check progressing arenas only 286 unsigned overall_progressing_arenas_tasks_num = 0; 287 std::map<tbb::task_arena::priority, unsigned> per_priority_tasks_num; 288 289 // Due to indeterministic submission of tasks in the beginning, count tasks priorities up 290 // to additional epoch. Assume threads are rebalanced once the work is submitted. 291 unsigned last_task_idx = std::min((repeats + 1) * unsigned(max_num_threads), overall_tasks_num); 292 for( unsigned i = 0; i < last_task_idx; ++i ) { 293 tbb::task_arena::priority p = g_task_info[i]; 294 ++per_priority_tasks_num[p]; 295 296 overall_progressing_arenas_tasks_num += (int)( 297 end_ptr != std::find(priorities, end_ptr, p) 298 ); 299 300 if( i < max_num_threads || i >= repeats * max_num_threads ) 301 ++wasted_tasks[p]; 302 } 303 304 unsigned expected_overall_progressing_arenas_tasks_num = 0; 305 for( unsigned i = 0; i < adjusted_progressing_arenas; ++i ) { 306 tbb::task_arena::priority p = priorities[i]; 307 concurrency_type concurrency = 0; 308 for( auto& item : arenas ) { 309 if( std::get<arena_priority>(item) == p ) { 310 concurrency = std::get<arena_concurrency>(item); 311 break; 312 } 313 } 314 unsigned expected_tasks_num = repeats * concurrency; 315 316 CHECK_MESSAGE( expected_tasks_num == per_priority_tasks_num[p], 317 "Unexpected number of executed tasks in arena with index " << i << " and concurrency = " << concurrency ) ; 318 319 expected_overall_progressing_arenas_tasks_num += expected_tasks_num; 320 } 321 CHECK_MESSAGE( 322 expected_overall_progressing_arenas_tasks_num == overall_progressing_arenas_tasks_num, 323 "Number of tasks for progressing arenas mismatched." 324 ); 325 } 326 { 327 // Other epochs - check remaining arenas 328 std::map<tbb::task_arena::priority, unsigned> per_priority_tasks_num; 329 330 std::size_t lower_priority_start = (repeats + 1) * max_num_threads; 331 for( std::size_t i = lower_priority_start; i < overall_tasks_num; ++i ) 332 ++per_priority_tasks_num[ g_task_info[i] ]; 333 334 for( auto& e : per_priority_tasks_num ) { 335 auto priority = e.first; 336 auto tasks_num = e.second; 337 338 auto priorities_it = std::find( end_ptr, priorities + priorities_num, priority ); 339 CHECK_MESSAGE( priorities_it != priorities + priorities_num, 340 "Tasks from prioritized arena got deferred." ); 341 342 auto it = std::find_if( 343 arenas.begin(), arenas.end(), 344 [priority](arena_info& info) { 345 return std::get<arena_priority>(info) == priority; 346 } 347 ); 348 auto per_arena_tasks_num = repeats * std::get<arena_concurrency>(*it); 349 CHECK_MESSAGE( 350 tasks_num == per_arena_tasks_num - wasted_tasks[priority], 351 "Incorrect number of tasks from deferred (non-progressing) arenas were executed." 352 ); 353 } 354 } // Other epochs 355 } // loop over simultaneously progressing arenas 356 357 INFO( "Done\n" ); 358 } 359 360 } // namespace HighPriorityArenasTakeExecutionPrecedence 361 362 363 // TODO: nested arena case 364 365 //! Test for setting a priority to arena 366 //! \brief \ref requirement 367 TEST_CASE("Arena priorities") { 368 HighPriorityArenasTakeExecutionPrecedence::test(); 369 } 370