xref: /oneTBB/test/tbb/test_arena_priorities.cpp (revision c4568449)
1 /*
2     Copyright (c) 2020-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #include "tbb/task_group.h"
20 #include "tbb/task_arena.h"
21 #include "tbb/global_control.h"
22 
23 #include "common/spin_barrier.h"
24 #include "common/utils.h"
25 #include "common/utils_concurrency_limit.h"
26 
27 #include <cstddef>
28 #include <algorithm>
29 #include <numeric>
30 
31 //! \file test_arena_priorities.cpp
32 //! \brief Test for [scheduler.task_arena] specification
33 
34 //--------------------------------------------------//
35 
36 std::vector<tbb::task_arena::priority> g_task_info;
37 
38 std::atomic<unsigned> g_task_num;
39 
40 std::atomic<bool> g_work_submitted;
41 
42 namespace HighPriorityArenasTakeExecutionPrecedence {
43 
44 using concurrency_type = int;
45 using arena_info =
46     std::tuple<std::unique_ptr<tbb::task_arena>,
47                concurrency_type,
48                tbb::task_arena::priority,
49                std::unique_ptr<tbb::task_group>>;
50 
51 enum arena_info_keys {
52     arena_pointer, arena_concurrency, arena_priority, associated_task_group
53 };
54 
prepare_logging_data(std::vector<tbb::task_arena::priority> & task_log,unsigned overall_tasks_num)55 void prepare_logging_data(std::vector<tbb::task_arena::priority>& task_log, unsigned overall_tasks_num) {
56     task_log.clear();
57     task_log.resize(overall_tasks_num);
58     for( auto& record : task_log )
59         record = tbb::task_arena::priority::normal;
60 }
61 
62 template<typename... ArenaArgs>
do_allocate_and_construct(const ArenaArgs &...arena_args)63 tbb::task_arena* do_allocate_and_construct( const ArenaArgs&... arena_args )
64 {
65     const int dummy_max_concurrency = 4;
66     const int dummy_reserved_for_masters = 4;
67 
68     enum initialization_methods {
69         lazy,
70         explicit_initialize,
71         explicit_initialize_with_different_constructor_parameters,
72         initialization_methods_num
73     };
74     static initialization_methods initialization_method = lazy;
75 
76     tbb::task_arena* result_arena = nullptr;
77 
78     switch( initialization_method ) {
79 
80     case lazy:
81         result_arena = new tbb::task_arena( arena_args... );
82         break;
83 
84     case explicit_initialize:
85         result_arena = new tbb::task_arena;
86         result_arena->initialize( arena_args... );
87         break;
88 
89     case explicit_initialize_with_different_constructor_parameters:
90     {
91         tbb::task_arena tmp(dummy_max_concurrency, dummy_reserved_for_masters);
92         result_arena = new tbb::task_arena(tmp);
93         result_arena->initialize(arena_args...);
94         break;
95     }
96 
97     default:
98         REQUIRE_MESSAGE( false, "Not implemented method of initialization." );
99         break;
100     }
101 
102     int next_value = (initialization_method + 1) % initialization_methods_num;
103     initialization_method = (initialization_methods)next_value;
104 
105     return result_arena;
106 }
107 
108 template<typename FirstArenaArg>
decide_on_arguments(const FirstArenaArg & first_arg,const int reserved_for_masters,tbb::task_arena::priority a_priority)109 tbb::task_arena* decide_on_arguments(
110     const FirstArenaArg& first_arg, const int reserved_for_masters,
111     tbb::task_arena::priority a_priority )
112 {
113     const tbb::task_arena::priority default_priority = tbb::task_arena::priority::normal;
114     static bool pass_default_priority_implicitly = false;
115     if( default_priority == a_priority ) {
116         pass_default_priority_implicitly = !pass_default_priority_implicitly;
117         if( pass_default_priority_implicitly )
118             return do_allocate_and_construct( first_arg, reserved_for_masters );
119     }
120     return do_allocate_and_construct( first_arg, reserved_for_masters, a_priority );
121 }
122 
123 
allocate_and_construct_arena(int arena_max_concurrency,tbb::task_arena::priority a_priority)124 tbb::task_arena* allocate_and_construct_arena(
125     int arena_max_concurrency, tbb::task_arena::priority a_priority )
126 {
127     const int reserved_for_masters = 0;
128 
129     static bool use_constraints = false;
130     use_constraints = !use_constraints;
131 
132     if( use_constraints ) {
133         tbb::task_arena::constraints properties{tbb::task_arena::automatic, arena_max_concurrency};
134         return decide_on_arguments( properties, reserved_for_masters, a_priority );
135     }
136 
137     return decide_on_arguments( arena_max_concurrency, reserved_for_masters, a_priority );
138 }
139 
submit_work(std::vector<arena_info> & arenas,unsigned repeats,utils::SpinBarrier & barrier)140 void submit_work( std::vector<arena_info>& arenas, unsigned repeats, utils::SpinBarrier& barrier ) {
141     for( auto& item : arenas ) {
142         tbb::task_arena& arena = *std::get<arena_pointer>(item).get();
143         concurrency_type concurrency = std::get<arena_concurrency>(item);
144         tbb::task_arena::priority priority_value = std::get<arena_priority>(item);
145         auto& tg = std::get<associated_task_group>(item);
146 
147         arena.execute(
148             [repeats, &barrier, &tg, priority_value, concurrency]() {
149                 for( unsigned i = 0; i < repeats * concurrency; ++i ) {
150                     tg->run(
151                         [&barrier, priority_value](){
152                             while( !g_work_submitted.load(std::memory_order_acquire) )
153                                 utils::yield();
154                             g_task_info[g_task_num++] = priority_value;
155                             barrier.wait();
156                         }
157                     );
158                 }
159             } // arena work submission functor
160         );
161     }
162 }
163 
wait_work_completion(std::vector<arena_info> & arenas,std::size_t max_num_threads,unsigned overall_tasks_num)164 void wait_work_completion(
165     std::vector<arena_info>& arenas, std::size_t max_num_threads, unsigned overall_tasks_num )
166 {
167     if( max_num_threads > 1 )
168         while( g_task_num < overall_tasks_num )
169             utils::yield();
170 
171     for( auto& item : arenas ) {
172         tbb::task_arena& arena = *std::get<arena_pointer>(item).get();
173         auto& tg = std::get<associated_task_group>(item);
174         arena.execute( [&tg]() { tg->wait(); } );
175     }
176     CHECK_MESSAGE(g_task_num == overall_tasks_num, "Not all tasks were executed.");
177 }
178 
test()179 void test() {
180 
181     const std::size_t max_num_threads = utils::get_platform_max_threads();
182 
183     tbb::global_control control(tbb::global_control::max_allowed_parallelism, max_num_threads + 1);
184     concurrency_type signed_max_num_threads = static_cast<int>(max_num_threads);
185     if (1 == max_num_threads) {
186         // Skipping workerless case
187         return;
188     }
189 
190     INFO( "max_num_threads = " << max_num_threads );
191     // TODO: iterate over threads to see that the work is going on in low priority arena.
192 
193     const int min_arena_concurrency = 2; // implementation detail
194 
195     tbb::task_arena::priority high   = tbb::task_arena::priority::high;
196     tbb::task_arena::priority normal = tbb::task_arena::priority::normal;
197     tbb::task_arena::priority low    = tbb::task_arena::priority::low;
198 
199     // TODO: use vector or std::array of priorities instead of the c-style array.
200 
201     // TODO: consider extending priorities to have more than three arenas.
202 
203     tbb::task_arena::priority priorities[] = {high, normal, low}; // keep it sorted
204     const unsigned priorities_num = sizeof(priorities) / sizeof(priorities[0]);
205     const unsigned overall_arenas_num = priorities_num;
206 
207     std::vector<arena_info> arenas;
208 
209     std::vector<unsigned> progressing_arenas( overall_arenas_num, 0 );
210     std::iota( progressing_arenas.begin(), progressing_arenas.end(), 1 );
211 
212     for( const auto& progressing_arenas_num : progressing_arenas ) {
213 
214         INFO( "progressing_arenas_num = " << progressing_arenas_num );
215 
216         // TODO: consider populating vector with arenas in separate function.
217         unsigned adjusted_progressing_arenas = progressing_arenas_num;
218 
219         arenas.clear();
220         g_task_num = 0;
221 
222         concurrency_type projected_concurrency =
223             (signed_max_num_threads + progressing_arenas_num - 1) / progressing_arenas_num;
224         projected_concurrency = std::max(min_arena_concurrency, projected_concurrency); // implementation detail
225         adjusted_progressing_arenas = signed_max_num_threads / projected_concurrency;
226 
227         int threads_left = signed_max_num_threads;
228 
229         // Instantiate arenas with necessary concurrency so that progressing arenas consume all
230         // available threads.
231         for( unsigned arena_idx = 0; arena_idx < overall_arenas_num; ++arena_idx ) {
232             tbb::task_arena::priority a_priority = priorities[arena_idx];
233 
234             concurrency_type concurrency = projected_concurrency;
235             concurrency_type actual_concurrency = projected_concurrency;
236             if( threads_left < actual_concurrency ||
237                 arena_idx == adjusted_progressing_arenas - 1 ) // give all remaining threads to last progressing arena
238             {
239                 concurrency = actual_concurrency = threads_left;
240             }
241 
242             threads_left -= actual_concurrency;
243 
244             if( !actual_concurrency ) {
245                 concurrency = tbb::task_arena::automatic;
246                 actual_concurrency = signed_max_num_threads;
247             }
248             actual_concurrency = std::max( min_arena_concurrency, actual_concurrency ); // implementation detail
249 
250             tbb::task_arena* arena = allocate_and_construct_arena(concurrency, a_priority);
251             arenas.push_back(
252                 std::make_tuple(
253                     std::unique_ptr<tbb::task_arena>(arena),
254                     actual_concurrency,
255                     a_priority,
256                     std::unique_ptr<tbb::task_group>(new tbb::task_group)
257                 )
258             );
259         }
260 
261         std::rotate( arenas.begin(), arenas.begin() + progressing_arenas_num - 1, arenas.end() );
262 
263         const unsigned repeats = 10;
264 
265         unsigned overall_tasks_num = 0;
266         for( auto& item : arenas )
267             overall_tasks_num += std::get<arena_concurrency>(item) * repeats;
268 
269         prepare_logging_data( g_task_info, overall_tasks_num );
270 
271         g_work_submitted = false;
272 
273         utils::SpinBarrier barrier{ max_num_threads };
274         submit_work( arenas, repeats, barrier );
275 
276         g_work_submitted = true;
277 
278         wait_work_completion( arenas, max_num_threads, overall_tasks_num );
279 
280         std::map<tbb::task_arena::priority, unsigned> wasted_tasks;
281 
282         tbb::task_arena::priority* end_ptr = priorities + adjusted_progressing_arenas;
283 
284         {
285             // First epoch - check progressing arenas only
286             unsigned overall_progressing_arenas_tasks_num = 0;
287             std::map<tbb::task_arena::priority, unsigned> per_priority_tasks_num;
288 
289             // Due to indeterministic submission of tasks in the beginning, count tasks priorities up
290             // to additional epoch. Assume threads are rebalanced once the work is submitted.
291             unsigned last_task_idx = std::min((repeats + 1) * unsigned(max_num_threads), overall_tasks_num);
292             for( unsigned i = 0; i < last_task_idx; ++i ) {
293                 tbb::task_arena::priority p = g_task_info[i];
294                 ++per_priority_tasks_num[p];
295 
296                 overall_progressing_arenas_tasks_num += (int)(
297                     end_ptr != std::find(priorities, end_ptr, p)
298                 );
299 
300                 if( i < max_num_threads || i >= repeats * max_num_threads )
301                     ++wasted_tasks[p];
302             }
303 
304             unsigned expected_overall_progressing_arenas_tasks_num = 0;
305             for( unsigned i = 0; i < adjusted_progressing_arenas; ++i ) {
306                 tbb::task_arena::priority p = priorities[i];
307                 concurrency_type concurrency = 0;
308                 for( auto& item : arenas ) {
309                     if( std::get<arena_priority>(item) == p ) {
310                         concurrency = std::get<arena_concurrency>(item);
311                         break;
312                     }
313                 }
314                 unsigned expected_tasks_num = repeats * concurrency;
315 
316                 CHECK_MESSAGE( expected_tasks_num == per_priority_tasks_num[p],
317                                "Unexpected number of executed tasks in arena with index " << i << " and concurrency = " << concurrency ) ;
318 
319                 expected_overall_progressing_arenas_tasks_num += expected_tasks_num;
320             }
321             CHECK_MESSAGE(
322                 expected_overall_progressing_arenas_tasks_num == overall_progressing_arenas_tasks_num,
323                 "Number of tasks for progressing arenas mismatched."
324             );
325         }
326         {
327             // Other epochs - check remaining arenas
328             std::map<tbb::task_arena::priority, unsigned> per_priority_tasks_num;
329 
330             std::size_t lower_priority_start = (repeats + 1) * max_num_threads;
331             for( std::size_t i = lower_priority_start; i < overall_tasks_num; ++i )
332                 ++per_priority_tasks_num[ g_task_info[i] ];
333 
334             for( auto& e : per_priority_tasks_num ) {
335                 auto priority = e.first;
336                 auto tasks_num = e.second;
337 
338                 auto priorities_it = std::find( end_ptr, priorities + priorities_num, priority );
339                 CHECK_MESSAGE( priorities_it != priorities + priorities_num,
340                                "Tasks from prioritized arena got deferred." );
341 
342                 auto it = std::find_if(
343                     arenas.begin(), arenas.end(),
344                     [priority](arena_info& info) {
345                         return std::get<arena_priority>(info) == priority;
346                     }
347                 );
348                 auto per_arena_tasks_num = repeats * std::get<arena_concurrency>(*it);
349                 CHECK_MESSAGE(
350                     tasks_num == per_arena_tasks_num - wasted_tasks[priority],
351                     "Incorrect number of tasks from deferred (non-progressing) arenas were executed."
352                 );
353             }
354         } // Other epochs
355     } // loop over simultaneously progressing arenas
356 
357     INFO( "Done\n" );
358 }
359 
360 } // namespace HighPriorityArenasTakeExecutionPrecedence
361 
362 
363 // TODO: nested arena case
364 //! Test for setting a priority to arena
365 //! \brief \ref requirement
366 TEST_CASE("Arena priorities") {
367     HighPriorityArenasTakeExecutionPrecedence::test();
368 }
369