xref: /oneTBB/test/tbb/test_task.cpp (revision 106e08a4)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/dummy_body.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 #include "common/cpu_usertime.h"
23 
24 #include "tbb/task.h"
25 #include "tbb/task_group.h"
26 #include "tbb/parallel_for.h"
27 #include "tbb/cache_aligned_allocator.h"
28 #include "tbb/global_control.h"
29 #include "tbb/concurrent_vector.h"
30 
31 #include <atomic>
32 #include <thread>
33 #include <thread>
34 
35 //! \file test_task.cpp
36 //! \brief Test for [internal] functionality
37 struct EmptyBody {
38     void operator()() const {}
39 };
40 
41 #if _MSC_VER && !defined(__INTEL_COMPILER)
42 // unreachable code
43 #pragma warning( push )
44 #pragma warning( disable: 4702 )
45 #endif
46 
47 template <typename Body = EmptyBody>
48 class CountingTask : public tbb::detail::d1::task {
49 public:
50     CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
51 
52     CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
53 
54     task* execute( tbb::detail::d1::execution_data& ) override {
55         ++my_execute_counter;
56         my_body();
57         my_wait.release();
58         return nullptr;
59     }
60 
61     task* cancel( tbb::detail::d1::execution_data& ) override {
62         ++my_cancel_counter;
63         my_wait.release();
64         return nullptr;
65     }
66 
67     static void reset() {
68         my_execute_counter = 0;
69         my_cancel_counter = 0;
70     }
71 
72     static std::size_t execute_counter() { return my_execute_counter; }
73     static std::size_t cancel_counter() { return my_cancel_counter; }
74 
75 private:
76     Body my_body;
77     tbb::detail::d1::wait_context& my_wait;
78 
79     static std::atomic<std::size_t> my_execute_counter;
80     static std::atomic<std::size_t> my_cancel_counter;
81 }; // struct CountingTask
82 
83 
84 #if _MSC_VER && !defined(__INTEL_COMPILER)
85 #pragma warning( pop )
86 #endif // warning 4702 is back
87 
88 template <typename Body>
89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
90 
91 template <typename Body>
92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
93 
94 #if TBB_USE_EXCEPTIONS
95 void test_cancellation_on_exception( bool reset_ctx ) {
96     tbb::detail::d1::wait_context wait(1);
97     tbb::task_group_context test_context;
98     auto throw_body = [] {
99         throw 1;
100     };
101     CountingTask<decltype(throw_body)> task(throw_body, wait);
102 
103     constexpr std::size_t iter_counter = 1000;
104     for (std::size_t i = 0; i < iter_counter; ++i) {
105         try {
106             tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
107         } catch(int ex) {
108             REQUIRE(ex == 1);
109         }
110         if (reset_ctx) {
111             test_context.reset();
112         }
113         wait.reserve(1);
114     }
115     wait.release(1);
116 
117     REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
118     REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
119     task.reset();
120 }
121 #endif // TBB_USE_EXCEPTIONS
122 
123 //! \brief \ref error_guessing
124 TEST_CASE("External threads sleep") {
125     if (utils::get_platform_max_threads() < 2) return;
126     utils::SpinBarrier barrier(2);
127 
128     tbb::task_group test_gr;
129 
130     test_gr.run([&] {
131         barrier.wait();
132         TestCPUUserTime(2);
133     });
134 
135     barrier.wait();
136 
137     test_gr.wait();
138 }
139 
140 //! \brief \ref error_guessing
141 TEST_CASE("Test that task was executed p times") {
142     tbb::detail::d1::wait_context wait(1);
143     tbb::task_group_context test_context;
144     CountingTask<> test_task(wait);
145 
146     constexpr std::size_t iter_counter = 10000;
147     for (std::size_t i = 0; i < iter_counter; ++i) {
148         tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
149         wait.reserve(1);
150     }
151 
152     wait.release(1);
153 
154     REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
155     REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
156     CountingTask<>::reset();
157 }
158 
159 #if TBB_USE_EXCEPTIONS
160 //! \brief \ref error_guessing
161 TEST_CASE("Test cancellation on exception") {
162     test_cancellation_on_exception(/*reset_ctx = */true);
163     test_cancellation_on_exception(/*reset_ctx = */false);
164 }
165 #endif // TBB_USE_EXCEPTIONS
166 
167 //! \brief \ref error_guessing
168 TEST_CASE("Simple test parallelism usage") {
169     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
170     utils::SpinBarrier barrier(threads_num);
171 
172     auto barrier_wait = [&barrier] {
173         barrier.wait();
174     };
175 
176     tbb::detail::d1::wait_context wait(threads_num);
177     tbb::detail::d1::task_group_context test_context;
178     using task_type = CountingTask<decltype(barrier_wait)>;
179 
180     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
181 
182     constexpr std::size_t iter_counter = 100;
183     for (std::size_t i = 0; i < iter_counter; ++i) {
184         for (std::size_t j = 0; j < threads_num; ++j) {
185             tbb::detail::d1::spawn(vector_test_task[j], test_context);
186         }
187         tbb::detail::d1::wait(wait, test_context);
188         wait.reserve(threads_num);
189     }
190     wait.release(threads_num);
191 
192     REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
193     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
194     task_type::reset();
195 }
196 
197 //! \brief \ref error_guessing
198 TEST_CASE("Test parallelism usage with parallel_for") {
199     std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
200     utils::SpinBarrier barrier(task_threads_num);
201 
202     auto barrier_wait = [&barrier] {
203         barrier.wait();
204     };
205 
206     std::size_t pfor_iter_count = 10000;
207     std::atomic<std::size_t> pfor_counter(0);
208 
209     auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
210         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
211                           [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
212                               for (auto it = range.begin(); it != range.end(); ++it) {
213                                   ++pfor_counter;
214                               }
215                            }
216         );
217     };
218 
219     tbb::detail::d1::wait_context wait(task_threads_num);
220     tbb::detail::d1::task_group_context test_context;
221     using task_type = CountingTask<decltype(barrier_wait)>;
222     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
223 
224     constexpr std::size_t iter_count = 10;
225     constexpr std::size_t pfor_threads_num = 4;
226     for (std::size_t i = 0; i < iter_count; ++i) {
227         std::vector<std::thread> pfor_threads;
228 
229         for (std::size_t j = 0; j < task_threads_num; ++j) {
230             tbb::detail::d1::spawn(vector_test_task[j], test_context);
231         }
232 
233         for (std::size_t k = 0; k < pfor_threads_num; ++k) {
234             pfor_threads.emplace_back(parallel_for_func);
235         }
236 
237         tbb::detail::d1::wait(wait, test_context);
238 
239         for (auto& thread : pfor_threads) {
240             if (thread.joinable()) {
241                 thread.join();
242             }
243         }
244 
245         wait.reserve(task_threads_num);
246     }
247     wait.release(task_threads_num);
248 
249     REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
250     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
251     REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
252     task_type::reset();
253 }
254 
255 //! \brief \ref error_guessing
256 TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
257     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
258     utils::SpinBarrier barrier(threads_num);
259 
260     auto barrier_wait = [&barrier] {
261         barrier.wait();
262     };
263 
264     tbb::detail::d1::wait_context wait(threads_num);
265     tbb::detail::d1::task_group_context test_context;
266     using task_type = CountingTask<decltype(barrier_wait)>;
267     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
268 
269     auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
270         tbb::detail::d1::spawn(vector_test_task[idx], test_context);
271     };
272 
273     constexpr std::size_t iter_count = 10;
274     for (std::size_t i = 0; i < iter_count; ++i) {
275         std::vector<std::thread> threads;
276 
277         for (std::size_t k = 0; k < threads_num - 1; ++k) {
278             threads.emplace_back(thread_func, k);
279         }
280 
281         for (auto& thread : threads) {
282             if (thread.joinable()) {
283                 thread.join();
284             }
285         }
286 
287         tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
288         wait.reserve(threads_num);
289     }
290     wait.release(threads_num);
291 
292     REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
293     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
294     task_type::reset();
295 }
296 
297 class SpawningTaskBody;
298 
299 using SpawningTask = CountingTask<SpawningTaskBody>;
300 
301 class SpawningTaskBody {
302 public:
303     using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
304 
305     SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
306         : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
307 
308     void operator()() const {
309         std::size_t delta = 7;
310         std::size_t start_idx = my_current_task.fetch_add(delta);
311 
312         if (start_idx < my_task_pool.size()) {
313             for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
314                 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
315             }
316         }
317     }
318 private:
319     task_pool_type& my_task_pool;
320     tbb::task_group_context& my_test_ctx;
321     static std::atomic<std::size_t> my_current_task;
322 }; // class SpawningTaskBody
323 
324 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
325 
326 //! \brief \ref error_guessing
327 TEST_CASE("Actively adding tasks") {
328     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
329 
330     tbb::detail::d1::wait_context wait(task_number + 1);
331     tbb::task_group_context test_context;
332 
333     SpawningTaskBody::task_pool_type task_pool;
334 
335     SpawningTaskBody task_body{task_pool, test_context};
336     for (std::size_t i = 0; i < task_number; ++i) {
337         task_pool.emplace_back(task_body, wait);
338     }
339 
340     SpawningTask first_task(task_body, wait);
341     tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
342 
343     REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
344     REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
345 }
346 
347 #if __TBB_RESUMABLE_TASKS
348 struct suspended_task : public tbb::detail::d1::task {
349 
350     suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
351         : my_suspend_tag(tag), my_wait(wait)
352     {}
353 
354     task* execute(tbb::detail::d1::execution_data&) override {
355         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
356             [] (const tbb::blocked_range<std::size_t>& range) {
357                 // Make some heavy work
358                 std::atomic<int> sum{};
359                 for (auto it = range.begin(); it != range.end(); ++it) {
360                     ++sum;
361                 }
362             },
363             tbb::static_partitioner{}
364         );
365 
366         my_wait.release();
367         tbb::task::resume(my_suspend_tag);
368         return nullptr;
369     }
370 
371     task* cancel(tbb::detail::d1::execution_data&) override {
372         FAIL("The function should never be called.");
373         return nullptr;
374     }
375 
376     tbb::task::suspend_point my_suspend_tag;
377     tbb::detail::d1::wait_context& my_wait;
378 };
379 
380 //! \brief \ref error_guessing
381 TEST_CASE("Isolation + resumable tasks") {
382     std::atomic<int> suspend_flag{};
383     tbb::task_group_context test_context;
384 
385     std::atomic<int> suspend_count{};
386     std::atomic<int> resume_count{};
387 
388     tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
389         [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
390             int ticket = 0;
391             for (auto it = range.begin(); it != range.end(); ++it) {
392                 ticket = suspend_flag++;
393             }
394 
395             if (ticket % 5 == 0) {
396                 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
397                 tbb::detail::d1::wait_context wait(1);
398                 ++suspend_count;
399                 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
400                     tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) {
401                         test_task.emplace_back(tag, wait);
402                         tbb::detail::d1::spawn(test_task[0], test_context);
403                     });
404                     }
405                 );
406                 tbb::detail::d1::wait(wait, test_context);
407                 ++resume_count;
408             }
409         }
410     );
411 
412     CHECK(suspend_count == resume_count);
413 }
414 
415 struct bypass_task : public tbb::detail::d1::task {
416     using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
417 
418     bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
419                 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag)
420         : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
421     {}
422 
423     task* execute(tbb::detail::d1::execution_data&) override {
424         utils::doDummyWork(10000);
425 
426         int expected = 1;
427         if (my_resume_flag.compare_exchange_strong(expected, 2)) {
428             tbb::task::resume(my_suspend_tag);
429         }
430 
431         std::size_t ticket = my_current_task++;
432         task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
433 
434         if (!next && my_resume_flag != 2) {
435             // Rarely all tasks can be executed before the suspend.
436             // So, wait for the suspend before leaving.
437             utils::SpinWaitWhileEq(my_resume_flag, 0);
438             expected = 1;
439             if (my_resume_flag.compare_exchange_strong(expected, 2)) {
440                 tbb::task::resume(my_suspend_tag);
441             }
442         }
443 
444         my_wait.release();
445         return next;
446     }
447 
448     task* cancel(tbb::detail::d1::execution_data&) override {
449         FAIL("The function should never be called.");
450         return nullptr;
451     }
452 
453     tbb::detail::d1::wait_context& my_wait;
454     task_pool_type& my_task_pool;
455     std::atomic<int>& my_resume_flag;
456     tbb::task::suspend_point& my_suspend_tag;
457     static std::atomic<int> my_current_task;
458 };
459 
460 std::atomic<int> bypass_task::my_current_task(0);
461 
462 thread_local int test_tls = 0;
463 
464 //! \brief \ref error_guessing
465 TEST_CASE("Bypass suspended by resume") {
466     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
467     tbb::task_group_context test_context;
468     tbb::detail::d1::wait_context wait(task_number + 1);
469 
470     test_tls = 1;
471 
472     std::atomic<int> resume_flag{0};
473     tbb::task::suspend_point test_suspend_tag;
474 
475     std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
476 
477     for (std::size_t i = 0; i < task_number; ++i) {
478         test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
479     }
480 
481     for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
482         std::size_t ticket = bypass_task::my_current_task++;
483         if (ticket < test_task_pool.size()) {
484             tbb::detail::d1::spawn(test_task_pool[ticket], test_context);
485         }
486     }
487 
488     auto suspend_func = [&resume_flag, &test_suspend_tag] {
489         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
490             test_suspend_tag = tag;
491             resume_flag = 1;
492         });
493     };
494     using task_type = CountingTask<decltype(suspend_func)>;
495     task_type suspend_task(suspend_func, wait);
496 
497     tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
498     CHECK(bypass_task::my_current_task >= test_task_pool.size());
499     REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
500 }
501 
502 //! \brief \ref error_guessing
503 TEST_CASE("Critical tasks + resume") {
504     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
505 
506     tbb::task_group_context test_context;
507     tbb::detail::d1::wait_context wait{ 0 };
508 
509     // The test expects at least one thread in test_arena
510     int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads()));
511     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena);
512     tbb::task_arena test_arena(num_threads_in_test_arena);
513 
514     test_arena.initialize();
515 
516     std::atomic<bool> resume_flag{}, resumed{};
517     tbb::task::suspend_point test_suspend_tag;
518 
519     auto task_body = [&resume_flag, &resumed, &test_suspend_tag] {
520         // Make some work
521         utils::doDummyWork(1000);
522 
523         if (resume_flag.exchange(false)) {
524             tbb::task::resume(test_suspend_tag);
525             resumed = true;
526         }
527     };
528 
529     using task_type = CountingTask<decltype(task_body)>;
530     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
531 
532     for (std::size_t i = 0; i < task_number; ++i) {
533         test_tasks.emplace_back(task_body, wait);
534     }
535 
536     wait.reserve(task_number / 2);
537     for (std::size_t i = 0; i < task_number / 2; ++i) {
538         submit(test_tasks[i], test_arena, test_context, true);
539     }
540 
541     auto suspend_func = [&resume_flag, &test_suspend_tag] {
542         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
543             test_suspend_tag = tag;
544             resume_flag.store(true, std::memory_order_release);
545         });
546     };
547     using suspend_task_type = CountingTask<decltype(suspend_func)>;
548     suspend_task_type suspend_task(suspend_func, wait);
549 
550     wait.reserve(1);
551     submit(suspend_task, test_arena, test_context, true);
552 
553     test_arena.execute([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
554         tbb::this_task_arena::isolate([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
555             do {
556                 wait.reserve(task_number / 2);
557                 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number),
558                     [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
559                         for (std::size_t i = range.begin(); i != range.end(); ++i) {
560                             submit(test_tasks[i], test_arena, test_context, true);
561                         }
562                     }
563                 );
564             } while (!resumed);
565         });
566     });
567 
568     test_arena.execute([&wait, &test_context] {
569         tbb::detail::d1::wait(wait, test_context);
570     });
571 }
572 
573 //! \brief \ref error_guessing
574 TEST_CASE("Stress testing") {
575     std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
576 
577     tbb::task_group_context test_context;
578     tbb::detail::d1::wait_context wait(task_number);
579 
580     tbb::task_arena test_arena;
581 
582     test_arena.initialize();
583 
584     auto task_body = [] {
585         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
586             utils::doDummyWork(100);
587         });
588     };
589     using task_type = CountingTask<decltype(task_body)>;
590 
591     std::size_t iter_counter = 20;
592 
593     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
594 
595     for (std::size_t j = 0; j < task_number; ++j) {
596         test_tasks.emplace_back(task_body, wait);
597     }
598 
599     test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
600         for (std::size_t i = 0; i < iter_counter; ++i) {
601 
602             for (std::size_t j = 0; j < task_number; ++j) {
603                 test_arena.enqueue(task_body);
604             }
605 
606             for (std::size_t j = 0; j < task_number / 2; ++j) {
607                 tbb::detail::d1::spawn(test_tasks[j], test_context);
608             }
609 
610             for (std::size_t j = task_number / 2; j < task_number; ++j) {
611                 submit(test_tasks[j], test_arena, test_context, true);
612             }
613 
614             tbb::detail::d1::wait(wait, test_context);
615             wait.reserve(task_number);
616         }
617         wait.release(task_number);
618     });
619 
620 
621     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
622     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
623 }
624 
625 //! \brief \ref error_guessing
626 TEST_CASE("All workers sleep") {
627     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
628     tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
629 
630     tbb::task_group test_gr;
631 
632     utils::SpinBarrier barrier(thread_number);
633     auto resumble_task = [&] {
634         barrier.wait();
635         tbb::task::suspend([&] (tbb::task::suspend_point sp) {
636             suspend_points.push_back(sp);
637             barrier.wait();
638         });
639     };
640 
641     for (std::size_t i = 0; i < thread_number - 1; ++i) {
642         test_gr.run(resumble_task);
643     }
644 
645     barrier.wait();
646     barrier.wait();
647     TestCPUUserTime(thread_number);
648 
649     for (auto sp : suspend_points)
650         tbb::task::resume(sp);
651     test_gr.wait();
652 }
653 
654 #endif // __TBB_RESUMABLE_TASKS
655 
656 //! \brief \ref error_guessing
657 TEST_CASE("Enqueue with exception") {
658     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
659 
660     tbb::task_group_context test_context;
661     tbb::detail::d1::wait_context wait(task_number);
662 
663     tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
664 
665     test_arena.initialize();
666 
667     auto task_body = [] {
668         utils::doDummyWork(100);
669     };
670 
671     std::atomic<bool> end_flag{false};
672     auto check_body = [&end_flag] {
673         end_flag.store(true, std::memory_order_relaxed);
674     };
675 
676     using task_type = CountingTask<decltype(task_body)>;
677     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
678 
679     for (std::size_t j = 0; j < task_number; ++j) {
680         test_tasks.emplace_back(task_body, wait);
681     }
682 
683     {
684         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
685         test_arena.enqueue(task_body);
686         // Initialize implicit arena
687         tbb::parallel_for(0, 1, [] (int) {});
688         tbb::task_arena test_arena2(tbb::task_arena::attach{});
689         test_arena2.enqueue(task_body);
690     }
691 
692     constexpr std::size_t iter_count = 10;
693     for (std::size_t k = 0; k < iter_count; ++k) {
694         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
695         test_arena.enqueue(check_body);
696 
697         while (!end_flag.load(std::memory_order_relaxed)) ;
698 
699         utils::Sleep(1);
700         end_flag.store(false, std::memory_order_relaxed);
701 
702         test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
703             for (std::size_t j = 0; j < task_number; ++j) {
704                 tbb::detail::d1::spawn(test_tasks[j], test_context);
705             }
706 
707             tbb::detail::d1::wait(wait, test_context);
708             wait.reserve(task_number);
709         });
710     }
711     wait.release(task_number);
712 
713 
714     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
715     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
716 }
717 
718 struct resubmitting_task : public tbb::detail::d1::task {
719     tbb::task_arena& my_arena;
720     tbb::task_group_context& my_ctx;
721     std::atomic<int> counter{100000};
722 
723     resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx)
724     {}
725 
726     tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override {
727         if (counter-- > 0) {
728             submit(*this, my_arena, my_ctx, true);
729         }
730         return nullptr;
731     }
732 
733     tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override {
734         FAIL("The function should never be called.");
735         return nullptr;
736     }
737 };
738 
739 //! \brief \ref error_guessing
740 TEST_CASE("Test with priority inversion") {
741     if (!utils::can_change_thread_priority()) return;
742 
743     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
744     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1);
745 
746     tbb::task_arena test_arena(2 * thread_number, thread_number);
747     test_arena.initialize();
748     utils::pinning_observer obsr(test_arena);
749     CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated");
750 
751     std::uint32_t  critical_task_counter = 1000 * thread_number;
752     std::atomic<std::size_t> task_counter{0};
753 
754     tbb::task_group_context test_context;
755     tbb::detail::d1::wait_context wait(critical_task_counter);
756 
757     auto critical_work = [&] {
758         utils::doDummyWork(10);
759     };
760 
761     using suspend_task_type = CountingTask<decltype(critical_work)>;
762     suspend_task_type critical_task(critical_work, wait);
763 
764     auto high_priority_thread_func = [&] {
765         // Increase external threads priority
766         utils::increase_thread_priority();
767         // pin external threads
768         test_arena.execute([]{});
769         while (task_counter++ < critical_task_counter) {
770             submit(critical_task, test_arena, test_context, true);
771             std::this_thread::sleep_for(std::chrono::milliseconds(1));
772         }
773     };
774 
775     resubmitting_task worker_task(test_arena, test_context);
776     // warm up
777     // take first core on execute
778     utils::SpinBarrier barrier(thread_number + 1);
779     test_arena.execute([&] {
780         tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t&) {
781             barrier.wait();
782             submit(worker_task, test_arena, test_context, true);
783         });
784     });
785 
786     std::vector<std::thread> high_priority_threads;
787     for (std::size_t i = 0; i < thread_number - 1; ++i) {
788         high_priority_threads.emplace_back(high_priority_thread_func);
789     }
790 
791     utils::increase_thread_priority();
792     while (task_counter++ < critical_task_counter) {
793         submit(critical_task, test_arena, test_context, true);
794         std::this_thread::sleep_for(std::chrono::milliseconds(1));
795     }
796 
797     tbb::detail::d1::wait(wait, test_context);
798 
799     for (std::size_t i = 0; i < thread_number - 1; ++i) {
800         high_priority_threads[i].join();
801     }
802     obsr.observe(false);
803 }
804 
805 // Explicit test for raii_guard move ctor because of copy elision optimization
806 // TODO: consider better test file for the test case
807 //! \brief \ref interface
808 TEST_CASE("raii_guard move ctor") {
809     int count{0};
810     auto func = [&count] {
811         count++;
812         CHECK(count == 1);
813     };
814 
815     tbb::detail::d0::raii_guard<decltype(func)> guard1(func);
816     tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1));
817 }
818