1 /*
2 Copyright (c) 2020-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/test.h"
18
19 #include "tbb/task_group.h"
20 #include "tbb/task_arena.h"
21 #include "tbb/global_control.h"
22
23 #include "common/spin_barrier.h"
24 #include "common/utils.h"
25 #include "common/utils_concurrency_limit.h"
26
27 #include <cstddef>
28 #include <algorithm>
29 #include <numeric>
30
31 //! \file test_arena_priorities.cpp
32 //! \brief Test for [scheduler.task_arena] specification
33
34 //--------------------------------------------------//
35
36 std::vector<tbb::task_arena::priority> g_task_info;
37
38 std::atomic<unsigned> g_task_num;
39
40 std::atomic<bool> g_work_submitted;
41
42 namespace HighPriorityArenasTakeExecutionPrecedence {
43
44 using concurrency_type = int;
45 using arena_info =
46 std::tuple<std::unique_ptr<tbb::task_arena>,
47 concurrency_type,
48 tbb::task_arena::priority,
49 std::unique_ptr<tbb::task_group>>;
50
51 enum arena_info_keys {
52 arena_pointer, arena_concurrency, arena_priority, associated_task_group
53 };
54
prepare_logging_data(std::vector<tbb::task_arena::priority> & task_log,unsigned overall_tasks_num)55 void prepare_logging_data(std::vector<tbb::task_arena::priority>& task_log, unsigned overall_tasks_num) {
56 task_log.clear();
57 task_log.resize(overall_tasks_num);
58 for( auto& record : task_log )
59 record = tbb::task_arena::priority::normal;
60 }
61
62 template<typename... ArenaArgs>
do_allocate_and_construct(const ArenaArgs &...arena_args)63 tbb::task_arena* do_allocate_and_construct( const ArenaArgs&... arena_args )
64 {
65 const int dummy_max_concurrency = 4;
66 const int dummy_reserved_for_masters = 4;
67
68 enum initialization_methods {
69 lazy,
70 explicit_initialize,
71 explicit_initialize_with_different_constructor_parameters,
72 initialization_methods_num
73 };
74 static initialization_methods initialization_method = lazy;
75
76 tbb::task_arena* result_arena = nullptr;
77
78 switch( initialization_method ) {
79
80 case lazy:
81 result_arena = new tbb::task_arena( arena_args... );
82 break;
83
84 case explicit_initialize:
85 result_arena = new tbb::task_arena;
86 result_arena->initialize( arena_args... );
87 break;
88
89 case explicit_initialize_with_different_constructor_parameters:
90 {
91 tbb::task_arena tmp(dummy_max_concurrency, dummy_reserved_for_masters);
92 result_arena = new tbb::task_arena(tmp);
93 result_arena->initialize(arena_args...);
94 break;
95 }
96
97 default:
98 REQUIRE_MESSAGE( false, "Not implemented method of initialization." );
99 break;
100 }
101
102 int next_value = (initialization_method + 1) % initialization_methods_num;
103 initialization_method = (initialization_methods)next_value;
104
105 return result_arena;
106 }
107
108 template<typename FirstArenaArg>
decide_on_arguments(const FirstArenaArg & first_arg,const int reserved_for_masters,tbb::task_arena::priority a_priority)109 tbb::task_arena* decide_on_arguments(
110 const FirstArenaArg& first_arg, const int reserved_for_masters,
111 tbb::task_arena::priority a_priority )
112 {
113 const tbb::task_arena::priority default_priority = tbb::task_arena::priority::normal;
114 static bool pass_default_priority_implicitly = false;
115 if( default_priority == a_priority ) {
116 pass_default_priority_implicitly = !pass_default_priority_implicitly;
117 if( pass_default_priority_implicitly )
118 return do_allocate_and_construct( first_arg, reserved_for_masters );
119 }
120 return do_allocate_and_construct( first_arg, reserved_for_masters, a_priority );
121 }
122
123
allocate_and_construct_arena(int arena_max_concurrency,tbb::task_arena::priority a_priority)124 tbb::task_arena* allocate_and_construct_arena(
125 int arena_max_concurrency, tbb::task_arena::priority a_priority )
126 {
127 const int reserved_for_masters = 0;
128
129 static bool use_constraints = false;
130 use_constraints = !use_constraints;
131
132 if( use_constraints ) {
133 tbb::task_arena::constraints properties{tbb::task_arena::automatic, arena_max_concurrency};
134 return decide_on_arguments( properties, reserved_for_masters, a_priority );
135 }
136
137 return decide_on_arguments( arena_max_concurrency, reserved_for_masters, a_priority );
138 }
139
submit_work(std::vector<arena_info> & arenas,unsigned repeats,utils::SpinBarrier & barrier)140 void submit_work( std::vector<arena_info>& arenas, unsigned repeats, utils::SpinBarrier& barrier ) {
141 for( auto& item : arenas ) {
142 tbb::task_arena& arena = *std::get<arena_pointer>(item).get();
143 concurrency_type concurrency = std::get<arena_concurrency>(item);
144 tbb::task_arena::priority priority_value = std::get<arena_priority>(item);
145 auto& tg = std::get<associated_task_group>(item);
146
147 arena.execute(
148 [repeats, &barrier, &tg, priority_value, concurrency]() {
149 for( unsigned i = 0; i < repeats * concurrency; ++i ) {
150 tg->run(
151 [&barrier, priority_value](){
152 while( !g_work_submitted.load(std::memory_order_acquire) )
153 utils::yield();
154 g_task_info[g_task_num++] = priority_value;
155 barrier.wait();
156 }
157 );
158 }
159 } // arena work submission functor
160 );
161 }
162 }
163
wait_work_completion(std::vector<arena_info> & arenas,std::size_t max_num_threads,unsigned overall_tasks_num)164 void wait_work_completion(
165 std::vector<arena_info>& arenas, std::size_t max_num_threads, unsigned overall_tasks_num )
166 {
167 if( max_num_threads > 1 )
168 while( g_task_num < overall_tasks_num )
169 utils::yield();
170
171 for( auto& item : arenas ) {
172 tbb::task_arena& arena = *std::get<arena_pointer>(item).get();
173 auto& tg = std::get<associated_task_group>(item);
174 arena.execute( [&tg]() { tg->wait(); } );
175 }
176 CHECK_MESSAGE(g_task_num == overall_tasks_num, "Not all tasks were executed.");
177 }
178
test()179 void test() {
180
181 const std::size_t max_num_threads = utils::get_platform_max_threads();
182
183 tbb::global_control control(tbb::global_control::max_allowed_parallelism, max_num_threads + 1);
184 concurrency_type signed_max_num_threads = static_cast<int>(max_num_threads);
185 if (1 == max_num_threads) {
186 // Skipping workerless case
187 return;
188 }
189
190 INFO( "max_num_threads = " << max_num_threads );
191 // TODO: iterate over threads to see that the work is going on in low priority arena.
192
193 const int min_arena_concurrency = 2; // implementation detail
194
195 tbb::task_arena::priority high = tbb::task_arena::priority::high;
196 tbb::task_arena::priority normal = tbb::task_arena::priority::normal;
197 tbb::task_arena::priority low = tbb::task_arena::priority::low;
198
199 // TODO: use vector or std::array of priorities instead of the c-style array.
200
201 // TODO: consider extending priorities to have more than three arenas.
202
203 tbb::task_arena::priority priorities[] = {high, normal, low}; // keep it sorted
204 const unsigned priorities_num = sizeof(priorities) / sizeof(priorities[0]);
205 const unsigned overall_arenas_num = priorities_num;
206
207 std::vector<arena_info> arenas;
208
209 std::vector<unsigned> progressing_arenas( overall_arenas_num, 0 );
210 std::iota( progressing_arenas.begin(), progressing_arenas.end(), 1 );
211
212 for( const auto& progressing_arenas_num : progressing_arenas ) {
213
214 INFO( "progressing_arenas_num = " << progressing_arenas_num );
215
216 // TODO: consider populating vector with arenas in separate function.
217 unsigned adjusted_progressing_arenas = progressing_arenas_num;
218
219 arenas.clear();
220 g_task_num = 0;
221
222 concurrency_type projected_concurrency =
223 (signed_max_num_threads + progressing_arenas_num - 1) / progressing_arenas_num;
224 projected_concurrency = std::max(min_arena_concurrency, projected_concurrency); // implementation detail
225 adjusted_progressing_arenas = signed_max_num_threads / projected_concurrency;
226
227 int threads_left = signed_max_num_threads;
228
229 // Instantiate arenas with necessary concurrency so that progressing arenas consume all
230 // available threads.
231 for( unsigned arena_idx = 0; arena_idx < overall_arenas_num; ++arena_idx ) {
232 tbb::task_arena::priority a_priority = priorities[arena_idx];
233
234 concurrency_type concurrency = projected_concurrency;
235 concurrency_type actual_concurrency = projected_concurrency;
236 if( threads_left < actual_concurrency ||
237 arena_idx == adjusted_progressing_arenas - 1 ) // give all remaining threads to last progressing arena
238 {
239 concurrency = actual_concurrency = threads_left;
240 }
241
242 threads_left -= actual_concurrency;
243
244 if( !actual_concurrency ) {
245 concurrency = tbb::task_arena::automatic;
246 actual_concurrency = signed_max_num_threads;
247 }
248 actual_concurrency = std::max( min_arena_concurrency, actual_concurrency ); // implementation detail
249
250 tbb::task_arena* arena = allocate_and_construct_arena(concurrency, a_priority);
251 arenas.push_back(
252 std::make_tuple(
253 std::unique_ptr<tbb::task_arena>(arena),
254 actual_concurrency,
255 a_priority,
256 std::unique_ptr<tbb::task_group>(new tbb::task_group)
257 )
258 );
259 }
260
261 std::rotate( arenas.begin(), arenas.begin() + progressing_arenas_num - 1, arenas.end() );
262
263 const unsigned repeats = 10;
264
265 unsigned overall_tasks_num = 0;
266 for( auto& item : arenas )
267 overall_tasks_num += std::get<arena_concurrency>(item) * repeats;
268
269 prepare_logging_data( g_task_info, overall_tasks_num );
270
271 g_work_submitted = false;
272
273 utils::SpinBarrier barrier{ max_num_threads };
274 submit_work( arenas, repeats, barrier );
275
276 g_work_submitted = true;
277
278 wait_work_completion( arenas, max_num_threads, overall_tasks_num );
279
280 std::map<tbb::task_arena::priority, unsigned> wasted_tasks;
281
282 tbb::task_arena::priority* end_ptr = priorities + adjusted_progressing_arenas;
283
284 {
285 // First epoch - check progressing arenas only
286 unsigned overall_progressing_arenas_tasks_num = 0;
287 std::map<tbb::task_arena::priority, unsigned> per_priority_tasks_num;
288
289 // Due to indeterministic submission of tasks in the beginning, count tasks priorities up
290 // to additional epoch. Assume threads are rebalanced once the work is submitted.
291 unsigned last_task_idx = std::min((repeats + 1) * unsigned(max_num_threads), overall_tasks_num);
292 for( unsigned i = 0; i < last_task_idx; ++i ) {
293 tbb::task_arena::priority p = g_task_info[i];
294 ++per_priority_tasks_num[p];
295
296 overall_progressing_arenas_tasks_num += (int)(
297 end_ptr != std::find(priorities, end_ptr, p)
298 );
299
300 if( i < max_num_threads || i >= repeats * max_num_threads )
301 ++wasted_tasks[p];
302 }
303
304 unsigned expected_overall_progressing_arenas_tasks_num = 0;
305 for( unsigned i = 0; i < adjusted_progressing_arenas; ++i ) {
306 tbb::task_arena::priority p = priorities[i];
307 concurrency_type concurrency = 0;
308 for( auto& item : arenas ) {
309 if( std::get<arena_priority>(item) == p ) {
310 concurrency = std::get<arena_concurrency>(item);
311 break;
312 }
313 }
314 unsigned expected_tasks_num = repeats * concurrency;
315
316 CHECK_MESSAGE( expected_tasks_num == per_priority_tasks_num[p],
317 "Unexpected number of executed tasks in arena with index " << i << " and concurrency = " << concurrency ) ;
318
319 expected_overall_progressing_arenas_tasks_num += expected_tasks_num;
320 }
321 CHECK_MESSAGE(
322 expected_overall_progressing_arenas_tasks_num == overall_progressing_arenas_tasks_num,
323 "Number of tasks for progressing arenas mismatched."
324 );
325 }
326 {
327 // Other epochs - check remaining arenas
328 std::map<tbb::task_arena::priority, unsigned> per_priority_tasks_num;
329
330 std::size_t lower_priority_start = (repeats + 1) * max_num_threads;
331 for( std::size_t i = lower_priority_start; i < overall_tasks_num; ++i )
332 ++per_priority_tasks_num[ g_task_info[i] ];
333
334 for( auto& e : per_priority_tasks_num ) {
335 auto priority = e.first;
336 auto tasks_num = e.second;
337
338 auto priorities_it = std::find( end_ptr, priorities + priorities_num, priority );
339 CHECK_MESSAGE( priorities_it != priorities + priorities_num,
340 "Tasks from prioritized arena got deferred." );
341
342 auto it = std::find_if(
343 arenas.begin(), arenas.end(),
344 [priority](arena_info& info) {
345 return std::get<arena_priority>(info) == priority;
346 }
347 );
348 auto per_arena_tasks_num = repeats * std::get<arena_concurrency>(*it);
349 CHECK_MESSAGE(
350 tasks_num == per_arena_tasks_num - wasted_tasks[priority],
351 "Incorrect number of tasks from deferred (non-progressing) arenas were executed."
352 );
353 }
354 } // Other epochs
355 } // loop over simultaneously progressing arenas
356
357 INFO( "Done\n" );
358 }
359
360 } // namespace HighPriorityArenasTakeExecutionPrecedence
361
362
363 // TODO: nested arena case
364 //! Test for setting a priority to arena
365 //! \brief \ref requirement
366 TEST_CASE("Arena priorities") {
367 HighPriorityArenasTakeExecutionPrecedence::test();
368 }
369