xref: /oneTBB/test/tbb/test_task.cpp (revision 9e15720b)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/dummy_body.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 #include "common/cpu_usertime.h"
23 
24 #include "tbb/task.h"
25 #include "tbb/task_group.h"
26 #include "tbb/parallel_for.h"
27 #include "tbb/cache_aligned_allocator.h"
28 #include "tbb/global_control.h"
29 #include "tbb/concurrent_vector.h"
30 
31 #include <atomic>
32 #include <thread>
33 #include <thread>
34 
35 //! \file test_task.cpp
36 //! \brief Test for [internal] functionality
37 
38 struct EmptyBody {
39     void operator()() const {}
40 };
41 
42 #if _MSC_VER && !defined(__INTEL_COMPILER)
43 // unreachable code
44 #pragma warning( push )
45 #pragma warning( disable: 4702 )
46 #endif
47 
48 template <typename Body = EmptyBody>
49 class CountingTask : public tbb::detail::d1::task {
50 public:
51     CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
52 
53     CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
54 
55     task* execute( tbb::detail::d1::execution_data& ) override {
56         ++my_execute_counter;
57         my_body();
58         my_wait.release();
59         return nullptr;
60     }
61 
62     task* cancel( tbb::detail::d1::execution_data& ) override {
63         ++my_cancel_counter;
64         my_wait.release();
65         return nullptr;
66     }
67 
68     static void reset() {
69         my_execute_counter = 0;
70         my_cancel_counter = 0;
71     }
72 
73     static std::size_t execute_counter() { return my_execute_counter; }
74     static std::size_t cancel_counter() { return my_cancel_counter; }
75 
76 private:
77     Body my_body;
78     tbb::detail::d1::wait_context& my_wait;
79 
80     static std::atomic<std::size_t> my_execute_counter;
81     static std::atomic<std::size_t> my_cancel_counter;
82 }; // struct CountingTask
83 
84 
85 #if _MSC_VER && !defined(__INTEL_COMPILER)
86 #pragma warning( pop )
87 #endif // warning 4702 is back
88 
89 template <typename Body>
90 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
91 
92 template <typename Body>
93 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
94 
95 #if TBB_USE_EXCEPTIONS
96 void test_cancellation_on_exception( bool reset_ctx ) {
97     tbb::detail::d1::wait_context wait(1);
98     tbb::task_group_context test_context;
99     auto throw_body = [] {
100         throw 1;
101     };
102     CountingTask<decltype(throw_body)> task(throw_body, wait);
103 
104     constexpr std::size_t iter_counter = 1000;
105     for (std::size_t i = 0; i < iter_counter; ++i) {
106         try {
107             tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
108         } catch(int ex) {
109             REQUIRE(ex == 1);
110         }
111         if (reset_ctx) {
112             test_context.reset();
113         }
114         wait.reserve(1);
115     }
116     wait.release(1);
117 
118     REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
119     REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
120     task.reset();
121 }
122 #endif // TBB_USE_EXCEPTIONS
123 
124 //! \brief \ref error_guessing
125 TEST_CASE("External threads sleep") {
126     if (utils::get_platform_max_threads() < 2) return;
127     utils::SpinBarrier barrier(2);
128 
129     tbb::task_group test_gr;
130 
131     test_gr.run([&] {
132         barrier.wait();
133         TestCPUUserTime(2);
134     });
135 
136     barrier.wait();
137 
138     test_gr.wait();
139 }
140 
141 //! \brief \ref error_guessing
142 TEST_CASE("Test that task was executed p times") {
143     tbb::detail::d1::wait_context wait(1);
144     tbb::task_group_context test_context;
145     CountingTask<> test_task(wait);
146 
147     constexpr std::size_t iter_counter = 10000;
148     for (std::size_t i = 0; i < iter_counter; ++i) {
149         tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
150         wait.reserve(1);
151     }
152 
153     wait.release(1);
154 
155     REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
156     REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
157     CountingTask<>::reset();
158 }
159 
160 #if TBB_USE_EXCEPTIONS
161 //! \brief \ref error_guessing
162 TEST_CASE("Test cancellation on exception") {
163     test_cancellation_on_exception(/*reset_ctx = */true);
164     test_cancellation_on_exception(/*reset_ctx = */false);
165 }
166 #endif // TBB_USE_EXCEPTIONS
167 
168 //! \brief \ref error_guessing
169 TEST_CASE("Simple test parallelism usage") {
170     std::size_t threads_num = utils::get_platform_max_threads();
171     utils::SpinBarrier barrier(threads_num);
172 
173     auto barrier_wait = [&barrier] {
174         barrier.wait();
175     };
176 
177     tbb::detail::d1::wait_context wait(threads_num);
178     tbb::detail::d1::task_group_context test_context;
179     using task_type = CountingTask<decltype(barrier_wait)>;
180 
181     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
182 
183     constexpr std::size_t iter_counter = 100;
184     for (std::size_t i = 0; i < iter_counter; ++i) {
185         for (std::size_t j = 0; j < threads_num; ++j) {
186             tbb::detail::d1::spawn(vector_test_task[j], test_context);
187         }
188         tbb::detail::d1::wait(wait, test_context);
189         wait.reserve(threads_num);
190     }
191     wait.release(threads_num);
192 
193     REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
194     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
195     task_type::reset();
196 }
197 
198 //! \brief \ref error_guessing
199 TEST_CASE("Test parallelism usage with parallel_for") {
200     std::size_t task_threads_num = utils::get_platform_max_threads();
201     utils::SpinBarrier barrier(task_threads_num);
202 
203     auto barrier_wait = [&barrier] {
204         barrier.wait();
205     };
206 
207     std::size_t pfor_iter_count = 10000;
208     std::atomic<std::size_t> pfor_counter(0);
209 
210     auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
211         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
212                           [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
213                               for (auto it = range.begin(); it != range.end(); ++it) {
214                                   ++pfor_counter;
215                               }
216                            }
217         );
218     };
219 
220     tbb::detail::d1::wait_context wait(task_threads_num);
221     tbb::detail::d1::task_group_context test_context;
222     using task_type = CountingTask<decltype(barrier_wait)>;
223     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
224 
225     constexpr std::size_t iter_count = 10;
226     constexpr std::size_t pfor_threads_num = 4;
227     for (std::size_t i = 0; i < iter_count; ++i) {
228         std::vector<std::thread> pfor_threads;
229 
230         for (std::size_t j = 0; j < task_threads_num; ++j) {
231             tbb::detail::d1::spawn(vector_test_task[j], test_context);
232         }
233 
234         for (std::size_t k = 0; k < pfor_threads_num; ++k) {
235             pfor_threads.emplace_back(parallel_for_func);
236         }
237 
238         tbb::detail::d1::wait(wait, test_context);
239 
240         for (auto& thread : pfor_threads) {
241             if (thread.joinable()) {
242                 thread.join();
243             }
244         }
245 
246         wait.reserve(task_threads_num);
247     }
248     wait.release(task_threads_num);
249 
250     REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
251     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
252     REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
253     task_type::reset();
254 }
255 
256 //! \brief \ref error_guessing
257 TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
258     std::size_t threads_num = utils::get_platform_max_threads();
259     utils::SpinBarrier barrier(threads_num);
260 
261     auto barrier_wait = [&barrier] {
262         barrier.wait();
263     };
264 
265     tbb::detail::d1::wait_context wait(threads_num);
266     tbb::detail::d1::task_group_context test_context;
267     using task_type = CountingTask<decltype(barrier_wait)>;
268     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
269 
270     auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
271         tbb::detail::d1::spawn(vector_test_task[idx], test_context);
272     };
273 
274     constexpr std::size_t iter_count = 10;
275     for (std::size_t i = 0; i < iter_count; ++i) {
276         std::vector<std::thread> threads;
277 
278         for (std::size_t k = 0; k < threads_num - 1; ++k) {
279             threads.emplace_back(thread_func, k);
280         }
281 
282         for (auto& thread : threads) {
283             if (thread.joinable()) {
284                 thread.join();
285             }
286         }
287 
288         tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
289         wait.reserve(threads_num);
290     }
291     wait.release(threads_num);
292 
293     REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
294     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
295     task_type::reset();
296 }
297 
298 class SpawningTaskBody;
299 
300 using SpawningTask = CountingTask<SpawningTaskBody>;
301 
302 class SpawningTaskBody {
303 public:
304     using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
305 
306     SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
307         : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
308 
309     void operator()() const {
310         std::size_t delta = 7;
311         std::size_t start_idx = my_current_task.fetch_add(delta);
312 
313         if (start_idx < my_task_pool.size()) {
314             for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
315                 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
316             }
317         }
318     }
319 private:
320     task_pool_type& my_task_pool;
321     tbb::task_group_context& my_test_ctx;
322     static std::atomic<std::size_t> my_current_task;
323 }; // class SpawningTaskBody
324 
325 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
326 
327 //! \brief \ref error_guessing
328 TEST_CASE("Actively adding tasks") {
329     std::size_t task_number = 500 * utils::get_platform_max_threads();
330 
331     tbb::detail::d1::wait_context wait(task_number + 1);
332     tbb::task_group_context test_context;
333 
334     SpawningTaskBody::task_pool_type task_pool;
335 
336     SpawningTaskBody task_body{task_pool, test_context};
337     for (std::size_t i = 0; i < task_number; ++i) {
338         task_pool.emplace_back(task_body, wait);
339     }
340 
341     SpawningTask first_task(task_body, wait);
342     tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
343 
344     REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
345     REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
346 }
347 
348 #if __TBB_RESUMABLE_TASKS
349 
350 struct suspended_task : public tbb::detail::d1::task {
351 
352     suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
353         : my_suspend_tag(tag), my_wait(wait)
354     {}
355 
356     task* execute(tbb::detail::d1::execution_data&) override {
357         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
358             [] (const tbb::blocked_range<std::size_t>& range) {
359                 // Make some heavy work
360                 std::atomic<int> sum{};
361                 for (auto it = range.begin(); it != range.end(); ++it) {
362                     ++sum;
363                 }
364             },
365             tbb::static_partitioner{}
366         );
367 
368         my_wait.release();
369         tbb::task::resume(my_suspend_tag);
370         return nullptr;
371     }
372 
373     task* cancel(tbb::detail::d1::execution_data&) override {
374         FAIL("The function should never be called.");
375         return nullptr;
376     }
377 
378     tbb::task::suspend_point my_suspend_tag;
379     tbb::detail::d1::wait_context& my_wait;
380 };
381 
382 //! \brief \ref error_guessing
383 TEST_CASE("Isolation + resumable tasks") {
384     std::atomic<int> suspend_flag{};
385     tbb::task_group_context test_context;
386 
387     std::atomic<int> suspend_count{};
388     std::atomic<int> resume_count{};
389 
390     tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
391         [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
392             int ticket = 0;
393             for (auto it = range.begin(); it != range.end(); ++it) {
394                 ticket = suspend_flag++;
395             }
396 
397             if (ticket % 5 == 0) {
398                 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
399                 tbb::detail::d1::wait_context wait(1);
400                 ++suspend_count;
401                 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
402                     tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) {
403                         test_task.emplace_back(tag, wait);
404                         tbb::detail::d1::spawn(test_task[0], test_context);
405                     });
406                     }
407                 );
408                 tbb::detail::d1::wait(wait, test_context);
409                 ++resume_count;
410             }
411         }
412     );
413 
414     CHECK(suspend_count == resume_count);
415 }
416 
417 struct bypass_task : public tbb::detail::d1::task {
418     using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
419 
420     bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
421                 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag)
422         : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
423     {}
424 
425     task* execute(tbb::detail::d1::execution_data&) override {
426         utils::doDummyWork(10000);
427 
428         int expected = 1;
429         if (my_resume_flag.compare_exchange_strong(expected, 2)) {
430             tbb::task::resume(my_suspend_tag);
431         }
432 
433         std::size_t ticket = my_current_task++;
434         task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
435 
436         if (!next && my_resume_flag != 2) {
437             // Rarely all tasks can be executed before the suspend.
438             // So, wait for the suspend before leaving.
439             utils::SpinWaitWhileEq(my_resume_flag, 0);
440             expected = 1;
441             if (my_resume_flag.compare_exchange_strong(expected, 2)) {
442                 tbb::task::resume(my_suspend_tag);
443             }
444         }
445 
446         my_wait.release();
447         return next;
448     }
449 
450     task* cancel(tbb::detail::d1::execution_data&) override {
451         FAIL("The function should never be called.");
452         return nullptr;
453     }
454 
455     tbb::detail::d1::wait_context& my_wait;
456     task_pool_type& my_task_pool;
457     std::atomic<int>& my_resume_flag;
458     tbb::task::suspend_point& my_suspend_tag;
459     static std::atomic<int> my_current_task;
460 };
461 
462 std::atomic<int> bypass_task::my_current_task(0);
463 
464 thread_local int test_tls = 0;
465 
466 //! \brief \ref error_guessing
467 TEST_CASE("Bypass suspended by resume") {
468     std::size_t task_number = 500 * utils::get_platform_max_threads();
469     tbb::task_group_context test_context;
470     tbb::detail::d1::wait_context wait(task_number + 1);
471 
472     test_tls = 1;
473 
474     std::atomic<int> resume_flag{0};
475     tbb::task::suspend_point test_suspend_tag;
476 
477     std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
478 
479     for (std::size_t i = 0; i < task_number; ++i) {
480         test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
481     }
482 
483     for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
484         std::size_t ticket = bypass_task::my_current_task++;
485         if (ticket < test_task_pool.size()) {
486             tbb::detail::d1::spawn(test_task_pool[ticket], test_context);
487         }
488     }
489 
490     auto suspend_func = [&resume_flag, &test_suspend_tag] {
491         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
492             test_suspend_tag = tag;
493             resume_flag = 1;
494         });
495     };
496     using task_type = CountingTask<decltype(suspend_func)>;
497     task_type suspend_task(suspend_func, wait);
498 
499     tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
500     CHECK(bypass_task::my_current_task >= test_task_pool.size());
501     REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
502 }
503 
504 //! \brief \ref error_guessing
505 TEST_CASE("Critical tasks + resume") {
506     std::size_t task_number = 500 * utils::get_platform_max_threads();
507 
508     tbb::task_group_context test_context;
509     tbb::detail::d1::wait_context wait(task_number);
510 
511     tbb::task_arena test_arena;
512 
513     test_arena.initialize();
514 
515     std::atomic<bool> resume_flag{};
516     tbb::task::suspend_point test_suspend_tag;
517 
518     auto task_body = [&resume_flag, &test_suspend_tag] {
519         // Make some work
520         utils::doDummyWork(1000);
521 
522         if (resume_flag.exchange(false)) {
523             tbb::task::resume(test_suspend_tag);
524         }
525     };
526 
527     using task_type = CountingTask<decltype(task_body)>;
528     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
529 
530     for (std::size_t i = 0; i < task_number; ++i) {
531         test_tasks.emplace_back(task_body, wait);
532     }
533 
534     for (std::size_t i = 0; i < task_number / 2; ++i) {
535         submit(test_tasks[i], test_arena, test_context, true);
536     }
537 
538     auto suspend_func = [&resume_flag, &test_suspend_tag] {
539         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
540             test_suspend_tag = tag;
541             resume_flag.store(true, std::memory_order_release);
542         });
543     };
544     using suspend_task_type = CountingTask<decltype(suspend_func)>;
545     suspend_task_type suspend_task(suspend_func, wait);
546 
547     submit(suspend_task, test_arena, test_context, true);
548 
549     test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] {
550     tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] {
551         tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1),
552             [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
553                 for (std::size_t i = range.begin(); i != range.end(); ++i) {
554                     submit(test_tasks[i], test_arena, test_context, true);
555                 }
556             });
557         });
558     });
559 
560     tbb::detail::d1::wait(wait, test_context);
561 }
562 
563 //! \brief \ref error_guessing
564 TEST_CASE("Stress testing") {
565     std::size_t task_number = utils::get_platform_max_threads();
566 
567     tbb::task_group_context test_context;
568     tbb::detail::d1::wait_context wait(task_number);
569 
570     tbb::task_arena test_arena;
571 
572     test_arena.initialize();
573 
574     auto task_body = [] {
575         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
576             utils::doDummyWork(100);
577         });
578     };
579     using task_type = CountingTask<decltype(task_body)>;
580 
581     std::size_t iter_counter = 20;
582 
583     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
584 
585     for (std::size_t j = 0; j < task_number; ++j) {
586         test_tasks.emplace_back(task_body, wait);
587     }
588 
589     test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
590         for (std::size_t i = 0; i < iter_counter; ++i) {
591 
592             for (std::size_t j = 0; j < task_number; ++j) {
593                 test_arena.enqueue(task_body);
594             }
595 
596             for (std::size_t j = 0; j < task_number / 2; ++j) {
597                 tbb::detail::d1::spawn(test_tasks[j], test_context);
598             }
599 
600             for (std::size_t j = task_number / 2; j < task_number; ++j) {
601                 submit(test_tasks[j], test_arena, test_context, true);
602             }
603 
604             tbb::detail::d1::wait(wait, test_context);
605             wait.reserve(task_number);
606         }
607         wait.release(task_number);
608     });
609 
610 
611     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
612     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
613 }
614 
615 //! \brief \ref error_guessing
616 TEST_CASE("All workers sleep") {
617     std::size_t thread_number = utils::get_platform_max_threads();
618     tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
619 
620     tbb::task_group test_gr;
621 
622     utils::SpinBarrier barrier(thread_number);
623     auto resumble_task = [&] {
624         barrier.wait();
625         tbb::task::suspend([&] (tbb::task::suspend_point sp) {
626             suspend_points.push_back(sp);
627             barrier.wait();
628         });
629     };
630 
631     for (std::size_t i = 0; i < thread_number - 1; ++i) {
632         test_gr.run(resumble_task);
633     }
634 
635     barrier.wait();
636     barrier.wait();
637     TestCPUUserTime(thread_number);
638 
639     for (auto sp : suspend_points)
640         tbb::task::resume(sp);
641     test_gr.wait();
642 }
643 
644 #endif // __TBB_RESUMABLE_TASKS
645 
646 //! \brief \ref error_guessing
647 TEST_CASE("Enqueue with exception") {
648     std::size_t task_number = 500 * utils::get_platform_max_threads();
649 
650     tbb::task_group_context test_context;
651     tbb::detail::d1::wait_context wait(task_number);
652 
653     tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
654 
655     test_arena.initialize();
656 
657     auto task_body = [] {
658         utils::doDummyWork(100);
659     };
660 
661     std::atomic<bool> end_flag{false};
662     auto check_body = [&end_flag] {
663         end_flag.store(true, std::memory_order_relaxed);
664     };
665 
666     using task_type = CountingTask<decltype(task_body)>;
667     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
668 
669     for (std::size_t j = 0; j < task_number; ++j) {
670         test_tasks.emplace_back(task_body, wait);
671     }
672 
673     {
674         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
675         test_arena.enqueue(task_body);
676         // Initialize implicit arena
677         tbb::parallel_for(0, 1, [] (int) {});
678         tbb::task_arena test_arena2(tbb::task_arena::attach{});
679         test_arena2.enqueue(task_body);
680     }
681 
682     constexpr std::size_t iter_count = 10;
683     for (std::size_t k = 0; k < iter_count; ++k) {
684         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
685         test_arena.enqueue(check_body);
686 
687         while (!end_flag.load(std::memory_order_relaxed)) ;
688 
689         utils::Sleep(1);
690         end_flag.store(false, std::memory_order_relaxed);
691 
692         test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
693             for (std::size_t j = 0; j < task_number; ++j) {
694                 tbb::detail::d1::spawn(test_tasks[j], test_context);
695             }
696 
697             tbb::detail::d1::wait(wait, test_context);
698             wait.reserve(task_number);
699         });
700     }
701     wait.release(task_number);
702 
703 
704     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
705     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
706 }
707 
708 struct resubmitting_task : public tbb::detail::d1::task {
709     tbb::task_arena& my_arena;
710     tbb::task_group_context& my_ctx;
711     std::atomic<int> counter{100000};
712 
713     resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx)
714     {}
715 
716     tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override {
717         if (counter-- > 0) {
718             submit(*this, my_arena, my_ctx, true);
719         }
720         return nullptr;
721     }
722 
723     tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override {
724         FAIL("The function should never be called.");
725         return nullptr;
726     }
727 };
728 
729 //! \brief \ref error_guessing
730 TEST_CASE("Test with priority inversion") {
731     if (!utils::can_change_thread_priority()) return;
732 
733     std::size_t thread_number = utils::get_platform_max_threads();
734     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1);
735 
736     tbb::task_arena test_arena(2 * thread_number, thread_number);
737     test_arena.initialize();
738     utils::pinning_observer obsr(test_arena);
739     CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated");
740 
741     std::size_t critical_task_counter = 1000 * thread_number;
742     std::atomic<std::size_t> task_counter{0};
743 
744     tbb::task_group_context test_context;
745     tbb::detail::d1::wait_context wait(critical_task_counter);
746 
747     auto critical_work = [&] {
748         utils::doDummyWork(10);
749     };
750 
751     using suspend_task_type = CountingTask<decltype(critical_work)>;
752     suspend_task_type critical_task(critical_work, wait);
753 
754     auto high_priority_thread_func = [&] {
755         // Increase external threads priority
756         utils::increase_thread_priority();
757         // pin external threads
758         test_arena.execute([]{});
759         while (task_counter++ < critical_task_counter) {
760             submit(critical_task, test_arena, test_context, true);
761             std::this_thread::sleep_for(std::chrono::milliseconds(1));
762         }
763     };
764 
765     resubmitting_task worker_task(test_arena, test_context);
766     // warm up
767     // take first core on execute
768     utils::SpinBarrier barrier(thread_number + 1);
769     test_arena.execute([&] {
770         tbb::parallel_for(std::size_t(0), thread_number + 1, [&] (std::size_t&) {
771             barrier.wait();
772             submit(worker_task, test_arena, test_context, true);
773         });
774     });
775 
776     std::vector<std::thread> high_priority_threads;
777     for (std::size_t i = 0; i < thread_number - 1; ++i) {
778         high_priority_threads.emplace_back(high_priority_thread_func);
779     }
780 
781     utils::increase_thread_priority();
782     while (task_counter++ < critical_task_counter) {
783         submit(critical_task, test_arena, test_context, true);
784         std::this_thread::sleep_for(std::chrono::milliseconds(1));
785     }
786 
787     tbb::detail::d1::wait(wait, test_context);
788 
789     for (std::size_t i = 0; i < thread_number - 1; ++i) {
790         high_priority_threads[i].join();
791     }
792     obsr.observe(false);
793 }
794