xref: /oneTBB/test/tbb/test_task.cpp (revision 055cc6ea)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/dummy_body.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 #include "common/cpu_usertime.h"
23 
24 #include "tbb/task.h"
25 #include "tbb/task_group.h"
26 #include "tbb/parallel_for.h"
27 #include "tbb/cache_aligned_allocator.h"
28 #include "tbb/global_control.h"
29 #include "tbb/concurrent_vector.h"
30 
31 #include <atomic>
32 #include <thread>
33 #include <thread>
34 
35 //! \file test_task.cpp
36 //! \brief Test for [internal] functionality
37 
38 struct EmptyBody {
39     void operator()() const {}
40 };
41 
42 #if _MSC_VER && !defined(__INTEL_COMPILER)
43 // unreachable code
44 #pragma warning( push )
45 #pragma warning( disable: 4702 )
46 #endif
47 
48 template <typename Body = EmptyBody>
49 class CountingTask : public tbb::detail::d1::task {
50 public:
51     CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
52 
53     CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
54 
55     task* execute( tbb::detail::d1::execution_data& ) override {
56         ++my_execute_counter;
57         my_body();
58         my_wait.release();
59         return nullptr;
60     }
61 
62     task* cancel( tbb::detail::d1::execution_data& ) override {
63         ++my_cancel_counter;
64         my_wait.release();
65         return nullptr;
66     }
67 
68     static void reset() {
69         my_execute_counter = 0;
70         my_cancel_counter = 0;
71     }
72 
73     static std::size_t execute_counter() { return my_execute_counter; }
74     static std::size_t cancel_counter() { return my_cancel_counter; }
75 
76 private:
77     Body my_body;
78     tbb::detail::d1::wait_context& my_wait;
79 
80     static std::atomic<std::size_t> my_execute_counter;
81     static std::atomic<std::size_t> my_cancel_counter;
82 }; // struct CountingTask
83 
84 
85 #if _MSC_VER && !defined(__INTEL_COMPILER)
86 #pragma warning( pop )
87 #endif // warning 4702 is back
88 
89 template <typename Body>
90 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
91 
92 template <typename Body>
93 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
94 
95 #if TBB_USE_EXCEPTIONS
96 void test_cancellation_on_exception( bool reset_ctx ) {
97     tbb::detail::d1::wait_context wait(1);
98     tbb::task_group_context test_context;
99     auto throw_body = [] {
100         throw 1;
101     };
102     CountingTask<decltype(throw_body)> task(throw_body, wait);
103 
104     constexpr std::size_t iter_counter = 1000;
105     for (std::size_t i = 0; i < iter_counter; ++i) {
106         try {
107             tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
108         } catch(int ex) {
109             REQUIRE(ex == 1);
110         }
111         if (reset_ctx) {
112             test_context.reset();
113         }
114         wait.reserve(1);
115     }
116     wait.release(1);
117 
118     REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
119     REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
120     task.reset();
121 }
122 #endif // TBB_USE_EXCEPTIONS
123 
124 //! \brief \ref error_guessing
125 TEST_CASE("External threads sleep") {
126     if (utils::get_platform_max_threads() < 2) return;
127     utils::SpinBarrier barrier(2);
128 
129     tbb::task_group test_gr;
130 
131     test_gr.run([&] {
132         barrier.wait();
133         TestCPUUserTime(2);
134     });
135 
136     barrier.wait();
137 
138     test_gr.wait();
139 }
140 
141 //! \brief \ref error_guessing
142 TEST_CASE("Test that task was executed p times") {
143     tbb::detail::d1::wait_context wait(1);
144     tbb::task_group_context test_context;
145     CountingTask<> test_task(wait);
146 
147     constexpr std::size_t iter_counter = 10000;
148     for (std::size_t i = 0; i < iter_counter; ++i) {
149         tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
150         wait.reserve(1);
151     }
152 
153     wait.release(1);
154 
155     REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
156     REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
157     CountingTask<>::reset();
158 }
159 
160 #if TBB_USE_EXCEPTIONS
161 //! \brief \ref error_guessing
162 TEST_CASE("Test cancellation on exception") {
163     test_cancellation_on_exception(/*reset_ctx = */true);
164     test_cancellation_on_exception(/*reset_ctx = */false);
165 }
166 #endif // TBB_USE_EXCEPTIONS
167 
168 //! \brief \ref error_guessing
169 TEST_CASE("Simple test parallelism usage") {
170     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
171     utils::SpinBarrier barrier(threads_num);
172 
173     auto barrier_wait = [&barrier] {
174         barrier.wait();
175     };
176 
177     tbb::detail::d1::wait_context wait(threads_num);
178     tbb::detail::d1::task_group_context test_context;
179     using task_type = CountingTask<decltype(barrier_wait)>;
180 
181     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
182 
183     constexpr std::size_t iter_counter = 100;
184     for (std::size_t i = 0; i < iter_counter; ++i) {
185         for (std::size_t j = 0; j < threads_num; ++j) {
186             tbb::detail::d1::spawn(vector_test_task[j], test_context);
187         }
188         tbb::detail::d1::wait(wait, test_context);
189         wait.reserve(threads_num);
190     }
191     wait.release(threads_num);
192 
193     REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
194     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
195     task_type::reset();
196 }
197 
198 //! \brief \ref error_guessing
199 TEST_CASE("Test parallelism usage with parallel_for") {
200     std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
201     utils::SpinBarrier barrier(task_threads_num);
202 
203     auto barrier_wait = [&barrier] {
204         barrier.wait();
205     };
206 
207     std::size_t pfor_iter_count = 10000;
208     std::atomic<std::size_t> pfor_counter(0);
209 
210     auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
211         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
212                           [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
213                               for (auto it = range.begin(); it != range.end(); ++it) {
214                                   ++pfor_counter;
215                               }
216                            }
217         );
218     };
219 
220     tbb::detail::d1::wait_context wait(task_threads_num);
221     tbb::detail::d1::task_group_context test_context;
222     using task_type = CountingTask<decltype(barrier_wait)>;
223     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
224 
225     constexpr std::size_t iter_count = 10;
226     constexpr std::size_t pfor_threads_num = 4;
227     for (std::size_t i = 0; i < iter_count; ++i) {
228         std::vector<std::thread> pfor_threads;
229 
230         for (std::size_t j = 0; j < task_threads_num; ++j) {
231             tbb::detail::d1::spawn(vector_test_task[j], test_context);
232         }
233 
234         for (std::size_t k = 0; k < pfor_threads_num; ++k) {
235             pfor_threads.emplace_back(parallel_for_func);
236         }
237 
238         tbb::detail::d1::wait(wait, test_context);
239 
240         for (auto& thread : pfor_threads) {
241             if (thread.joinable()) {
242                 thread.join();
243             }
244         }
245 
246         wait.reserve(task_threads_num);
247     }
248     wait.release(task_threads_num);
249 
250     REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
251     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
252     REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
253     task_type::reset();
254 }
255 
256 //! \brief \ref error_guessing
257 TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
258     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
259     utils::SpinBarrier barrier(threads_num);
260 
261     auto barrier_wait = [&barrier] {
262         barrier.wait();
263     };
264 
265     tbb::detail::d1::wait_context wait(threads_num);
266     tbb::detail::d1::task_group_context test_context;
267     using task_type = CountingTask<decltype(barrier_wait)>;
268     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
269 
270     auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
271         tbb::detail::d1::spawn(vector_test_task[idx], test_context);
272     };
273 
274     constexpr std::size_t iter_count = 10;
275     for (std::size_t i = 0; i < iter_count; ++i) {
276         std::vector<std::thread> threads;
277 
278         for (std::size_t k = 0; k < threads_num - 1; ++k) {
279             threads.emplace_back(thread_func, k);
280         }
281 
282         for (auto& thread : threads) {
283             if (thread.joinable()) {
284                 thread.join();
285             }
286         }
287 
288         tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
289         wait.reserve(threads_num);
290     }
291     wait.release(threads_num);
292 
293     REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
294     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
295     task_type::reset();
296 }
297 
298 class SpawningTaskBody;
299 
300 using SpawningTask = CountingTask<SpawningTaskBody>;
301 
302 class SpawningTaskBody {
303 public:
304     using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
305 
306     SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
307         : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
308 
309     void operator()() const {
310         std::size_t delta = 7;
311         std::size_t start_idx = my_current_task.fetch_add(delta);
312 
313         if (start_idx < my_task_pool.size()) {
314             for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
315                 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
316             }
317         }
318     }
319 private:
320     task_pool_type& my_task_pool;
321     tbb::task_group_context& my_test_ctx;
322     static std::atomic<std::size_t> my_current_task;
323 }; // class SpawningTaskBody
324 
325 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
326 
327 //! \brief \ref error_guessing
328 TEST_CASE("Actively adding tasks") {
329     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
330 
331     tbb::detail::d1::wait_context wait(task_number + 1);
332     tbb::task_group_context test_context;
333 
334     SpawningTaskBody::task_pool_type task_pool;
335 
336     SpawningTaskBody task_body{task_pool, test_context};
337     for (std::size_t i = 0; i < task_number; ++i) {
338         task_pool.emplace_back(task_body, wait);
339     }
340 
341     SpawningTask first_task(task_body, wait);
342     tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
343 
344     REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
345     REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
346 }
347 
348 #if __TBB_RESUMABLE_TASKS
349 
350 struct suspended_task : public tbb::detail::d1::task {
351 
352     suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
353         : my_suspend_tag(tag), my_wait(wait)
354     {}
355 
356     task* execute(tbb::detail::d1::execution_data&) override {
357         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
358             [] (const tbb::blocked_range<std::size_t>& range) {
359                 // Make some heavy work
360                 std::atomic<int> sum{};
361                 for (auto it = range.begin(); it != range.end(); ++it) {
362                     ++sum;
363                 }
364             },
365             tbb::static_partitioner{}
366         );
367 
368         my_wait.release();
369         tbb::task::resume(my_suspend_tag);
370         return nullptr;
371     }
372 
373     task* cancel(tbb::detail::d1::execution_data&) override {
374         FAIL("The function should never be called.");
375         return nullptr;
376     }
377 
378     tbb::task::suspend_point my_suspend_tag;
379     tbb::detail::d1::wait_context& my_wait;
380 };
381 
382 //! \brief \ref error_guessing
383 TEST_CASE("Isolation + resumable tasks") {
384     std::atomic<int> suspend_flag{};
385     tbb::task_group_context test_context;
386 
387     std::atomic<int> suspend_count{};
388     std::atomic<int> resume_count{};
389 
390     tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
391         [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
392             int ticket = 0;
393             for (auto it = range.begin(); it != range.end(); ++it) {
394                 ticket = suspend_flag++;
395             }
396 
397             if (ticket % 5 == 0) {
398                 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
399                 tbb::detail::d1::wait_context wait(1);
400                 ++suspend_count;
401                 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
402                     tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) {
403                         test_task.emplace_back(tag, wait);
404                         tbb::detail::d1::spawn(test_task[0], test_context);
405                     });
406                     }
407                 );
408                 tbb::detail::d1::wait(wait, test_context);
409                 ++resume_count;
410             }
411         }
412     );
413 
414     CHECK(suspend_count == resume_count);
415 }
416 
417 struct bypass_task : public tbb::detail::d1::task {
418     using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
419 
420     bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
421                 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag)
422         : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
423     {}
424 
425     task* execute(tbb::detail::d1::execution_data&) override {
426         utils::doDummyWork(10000);
427 
428         int expected = 1;
429         if (my_resume_flag.compare_exchange_strong(expected, 2)) {
430             tbb::task::resume(my_suspend_tag);
431         }
432 
433         std::size_t ticket = my_current_task++;
434         task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
435 
436         if (!next && my_resume_flag != 2) {
437             // Rarely all tasks can be executed before the suspend.
438             // So, wait for the suspend before leaving.
439             utils::SpinWaitWhileEq(my_resume_flag, 0);
440             expected = 1;
441             if (my_resume_flag.compare_exchange_strong(expected, 2)) {
442                 tbb::task::resume(my_suspend_tag);
443             }
444         }
445 
446         my_wait.release();
447         return next;
448     }
449 
450     task* cancel(tbb::detail::d1::execution_data&) override {
451         FAIL("The function should never be called.");
452         return nullptr;
453     }
454 
455     tbb::detail::d1::wait_context& my_wait;
456     task_pool_type& my_task_pool;
457     std::atomic<int>& my_resume_flag;
458     tbb::task::suspend_point& my_suspend_tag;
459     static std::atomic<int> my_current_task;
460 };
461 
462 std::atomic<int> bypass_task::my_current_task(0);
463 
464 thread_local int test_tls = 0;
465 
466 //! \brief \ref error_guessing
467 TEST_CASE("Bypass suspended by resume") {
468     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
469     tbb::task_group_context test_context;
470     tbb::detail::d1::wait_context wait(task_number + 1);
471 
472     test_tls = 1;
473 
474     std::atomic<int> resume_flag{0};
475     tbb::task::suspend_point test_suspend_tag;
476 
477     std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
478 
479     for (std::size_t i = 0; i < task_number; ++i) {
480         test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
481     }
482 
483     for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
484         std::size_t ticket = bypass_task::my_current_task++;
485         if (ticket < test_task_pool.size()) {
486             tbb::detail::d1::spawn(test_task_pool[ticket], test_context);
487         }
488     }
489 
490     auto suspend_func = [&resume_flag, &test_suspend_tag] {
491         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
492             test_suspend_tag = tag;
493             resume_flag = 1;
494         });
495     };
496     using task_type = CountingTask<decltype(suspend_func)>;
497     task_type suspend_task(suspend_func, wait);
498 
499     tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
500     CHECK(bypass_task::my_current_task >= test_task_pool.size());
501     REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
502 }
503 
504 //! \brief \ref error_guessing
505 TEST_CASE("Critical tasks + resume") {
506     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
507 
508     tbb::task_group_context test_context;
509     tbb::detail::d1::wait_context wait(task_number);
510 
511     // The test expects at least one thread in test_arena
512     int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads()));
513     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena);
514     tbb::task_arena test_arena(num_threads_in_test_arena);
515 
516     test_arena.initialize();
517 
518     std::atomic<bool> resume_flag{};
519     tbb::task::suspend_point test_suspend_tag;
520 
521     auto task_body = [&resume_flag, &test_suspend_tag] {
522         // Make some work
523         utils::doDummyWork(1000);
524 
525         if (resume_flag.exchange(false)) {
526             tbb::task::resume(test_suspend_tag);
527         }
528     };
529 
530     using task_type = CountingTask<decltype(task_body)>;
531     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
532 
533     for (std::size_t i = 0; i < task_number; ++i) {
534         test_tasks.emplace_back(task_body, wait);
535     }
536 
537     for (std::size_t i = 0; i < task_number / 2; ++i) {
538         submit(test_tasks[i], test_arena, test_context, true);
539     }
540 
541     auto suspend_func = [&resume_flag, &test_suspend_tag] {
542         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
543             test_suspend_tag = tag;
544             resume_flag.store(true, std::memory_order_release);
545         });
546     };
547     using suspend_task_type = CountingTask<decltype(suspend_func)>;
548     suspend_task_type suspend_task(suspend_func, wait);
549 
550     submit(suspend_task, test_arena, test_context, true);
551 
552     test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] {
553     tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] {
554         tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1),
555             [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
556                 for (std::size_t i = range.begin(); i != range.end(); ++i) {
557                     submit(test_tasks[i], test_arena, test_context, true);
558                 }
559             });
560         });
561     });
562 
563     tbb::detail::d1::wait(wait, test_context);
564 }
565 
566 //! \brief \ref error_guessing
567 TEST_CASE("Stress testing") {
568     std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
569 
570     tbb::task_group_context test_context;
571     tbb::detail::d1::wait_context wait(task_number);
572 
573     tbb::task_arena test_arena;
574 
575     test_arena.initialize();
576 
577     auto task_body = [] {
578         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
579             utils::doDummyWork(100);
580         });
581     };
582     using task_type = CountingTask<decltype(task_body)>;
583 
584     std::size_t iter_counter = 20;
585 
586     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
587 
588     for (std::size_t j = 0; j < task_number; ++j) {
589         test_tasks.emplace_back(task_body, wait);
590     }
591 
592     test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
593         for (std::size_t i = 0; i < iter_counter; ++i) {
594 
595             for (std::size_t j = 0; j < task_number; ++j) {
596                 test_arena.enqueue(task_body);
597             }
598 
599             for (std::size_t j = 0; j < task_number / 2; ++j) {
600                 tbb::detail::d1::spawn(test_tasks[j], test_context);
601             }
602 
603             for (std::size_t j = task_number / 2; j < task_number; ++j) {
604                 submit(test_tasks[j], test_arena, test_context, true);
605             }
606 
607             tbb::detail::d1::wait(wait, test_context);
608             wait.reserve(task_number);
609         }
610         wait.release(task_number);
611     });
612 
613 
614     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
615     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
616 }
617 
618 //! \brief \ref error_guessing
619 TEST_CASE("All workers sleep") {
620     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
621     tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
622 
623     tbb::task_group test_gr;
624 
625     utils::SpinBarrier barrier(thread_number);
626     auto resumble_task = [&] {
627         barrier.wait();
628         tbb::task::suspend([&] (tbb::task::suspend_point sp) {
629             suspend_points.push_back(sp);
630             barrier.wait();
631         });
632     };
633 
634     for (std::size_t i = 0; i < thread_number - 1; ++i) {
635         test_gr.run(resumble_task);
636     }
637 
638     barrier.wait();
639     barrier.wait();
640     TestCPUUserTime(thread_number);
641 
642     for (auto sp : suspend_points)
643         tbb::task::resume(sp);
644     test_gr.wait();
645 }
646 
647 #endif // __TBB_RESUMABLE_TASKS
648 
649 //! \brief \ref error_guessing
650 TEST_CASE("Enqueue with exception") {
651     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
652 
653     tbb::task_group_context test_context;
654     tbb::detail::d1::wait_context wait(task_number);
655 
656     tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
657 
658     test_arena.initialize();
659 
660     auto task_body = [] {
661         utils::doDummyWork(100);
662     };
663 
664     std::atomic<bool> end_flag{false};
665     auto check_body = [&end_flag] {
666         end_flag.store(true, std::memory_order_relaxed);
667     };
668 
669     using task_type = CountingTask<decltype(task_body)>;
670     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
671 
672     for (std::size_t j = 0; j < task_number; ++j) {
673         test_tasks.emplace_back(task_body, wait);
674     }
675 
676     {
677         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
678         test_arena.enqueue(task_body);
679         // Initialize implicit arena
680         tbb::parallel_for(0, 1, [] (int) {});
681         tbb::task_arena test_arena2(tbb::task_arena::attach{});
682         test_arena2.enqueue(task_body);
683     }
684 
685     constexpr std::size_t iter_count = 10;
686     for (std::size_t k = 0; k < iter_count; ++k) {
687         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
688         test_arena.enqueue(check_body);
689 
690         while (!end_flag.load(std::memory_order_relaxed)) ;
691 
692         utils::Sleep(1);
693         end_flag.store(false, std::memory_order_relaxed);
694 
695         test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
696             for (std::size_t j = 0; j < task_number; ++j) {
697                 tbb::detail::d1::spawn(test_tasks[j], test_context);
698             }
699 
700             tbb::detail::d1::wait(wait, test_context);
701             wait.reserve(task_number);
702         });
703     }
704     wait.release(task_number);
705 
706 
707     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
708     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
709 }
710 
711 struct resubmitting_task : public tbb::detail::d1::task {
712     tbb::task_arena& my_arena;
713     tbb::task_group_context& my_ctx;
714     std::atomic<int> counter{100000};
715 
716     resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx)
717     {}
718 
719     tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override {
720         if (counter-- > 0) {
721             submit(*this, my_arena, my_ctx, true);
722         }
723         return nullptr;
724     }
725 
726     tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override {
727         FAIL("The function should never be called.");
728         return nullptr;
729     }
730 };
731 
732 //! \brief \ref error_guessing
733 TEST_CASE("Test with priority inversion") {
734     if (!utils::can_change_thread_priority()) return;
735 
736     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
737     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1);
738 
739     tbb::task_arena test_arena(2 * thread_number, thread_number);
740     test_arena.initialize();
741     utils::pinning_observer obsr(test_arena);
742     CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated");
743 
744     std::uint32_t  critical_task_counter = 1000 * thread_number;
745     std::atomic<std::size_t> task_counter{0};
746 
747     tbb::task_group_context test_context;
748     tbb::detail::d1::wait_context wait(critical_task_counter);
749 
750     auto critical_work = [&] {
751         utils::doDummyWork(10);
752     };
753 
754     using suspend_task_type = CountingTask<decltype(critical_work)>;
755     suspend_task_type critical_task(critical_work, wait);
756 
757     auto high_priority_thread_func = [&] {
758         // Increase external threads priority
759         utils::increase_thread_priority();
760         // pin external threads
761         test_arena.execute([]{});
762         while (task_counter++ < critical_task_counter) {
763             submit(critical_task, test_arena, test_context, true);
764             std::this_thread::sleep_for(std::chrono::milliseconds(1));
765         }
766     };
767 
768     resubmitting_task worker_task(test_arena, test_context);
769     // warm up
770     // take first core on execute
771     utils::SpinBarrier barrier(thread_number + 1);
772     test_arena.execute([&] {
773         tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t&) {
774             barrier.wait();
775             submit(worker_task, test_arena, test_context, true);
776         });
777     });
778 
779     std::vector<std::thread> high_priority_threads;
780     for (std::size_t i = 0; i < thread_number - 1; ++i) {
781         high_priority_threads.emplace_back(high_priority_thread_func);
782     }
783 
784     utils::increase_thread_priority();
785     while (task_counter++ < critical_task_counter) {
786         submit(critical_task, test_arena, test_context, true);
787         std::this_thread::sleep_for(std::chrono::milliseconds(1));
788     }
789 
790     tbb::detail::d1::wait(wait, test_context);
791 
792     for (std::size_t i = 0; i < thread_number - 1; ++i) {
793         high_priority_threads[i].join();
794     }
795     obsr.observe(false);
796 }
797 
798 // Explicit test for raii_guard move ctor because of copy elision optimization
799 // TODO: consider better test file for the test case
800 //! \brief \ref interface
801 TEST_CASE("raii_guard move ctor") {
802     int count{0};
803     auto func = [&count] {
804         count++;
805         CHECK(count == 1);
806     };
807 
808     tbb::detail::d0::raii_guard<decltype(func)> guard1(func);
809     tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1));
810 }
811