xref: /oneTBB/test/tbb/test_task.cpp (revision 2a6bd1e1)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/dummy_body.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 #include "common/cpu_usertime.h"
23 
24 #include "tbb/task.h"
25 #include "tbb/task_group.h"
26 #include "tbb/parallel_for.h"
27 #include "tbb/cache_aligned_allocator.h"
28 #include "tbb/global_control.h"
29 #include "tbb/concurrent_vector.h"
30 
31 #include <atomic>
32 #include <thread>
33 #include <thread>
34 
35 //! \file test_task.cpp
36 //! \brief Test for [internal] functionality
37 struct EmptyBody {
38     void operator()() const {}
39 };
40 
41 #if _MSC_VER && !defined(__INTEL_COMPILER)
42 // unreachable code
43 #pragma warning( push )
44 #pragma warning( disable: 4702 )
45 #endif
46 
47 template <typename Body = EmptyBody>
48 class CountingTask : public tbb::detail::d1::task {
49 public:
50     CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
51 
52     CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
53 
54     task* execute( tbb::detail::d1::execution_data& ) override {
55         ++my_execute_counter;
56         my_body();
57         my_wait.release();
58         return nullptr;
59     }
60 
61     task* cancel( tbb::detail::d1::execution_data& ) override {
62         ++my_cancel_counter;
63         my_wait.release();
64         return nullptr;
65     }
66 
67     static void reset() {
68         my_execute_counter = 0;
69         my_cancel_counter = 0;
70     }
71 
72     static std::size_t execute_counter() { return my_execute_counter; }
73     static std::size_t cancel_counter() { return my_cancel_counter; }
74 
75 private:
76     Body my_body;
77     tbb::detail::d1::wait_context& my_wait;
78 
79     static std::atomic<std::size_t> my_execute_counter;
80     static std::atomic<std::size_t> my_cancel_counter;
81 }; // struct CountingTask
82 
83 
84 #if _MSC_VER && !defined(__INTEL_COMPILER)
85 #pragma warning( pop )
86 #endif // warning 4702 is back
87 
88 template <typename Body>
89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
90 
91 template <typename Body>
92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
93 
94 #if TBB_USE_EXCEPTIONS
95 void test_cancellation_on_exception( bool reset_ctx ) {
96     tbb::detail::d1::wait_context wait(1);
97     tbb::task_group_context test_context;
98     auto throw_body = [] {
99         throw 1;
100     };
101     CountingTask<decltype(throw_body)> task(throw_body, wait);
102 
103     constexpr std::size_t iter_counter = 1000;
104     for (std::size_t i = 0; i < iter_counter; ++i) {
105         try {
106             tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
107         } catch(int ex) {
108             REQUIRE(ex == 1);
109         }
110         if (reset_ctx) {
111             test_context.reset();
112         }
113         wait.reserve(1);
114     }
115     wait.release(1);
116 
117     REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
118     REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
119     task.reset();
120 }
121 #endif // TBB_USE_EXCEPTIONS
122 
123 //! \brief \ref error_guessing
124 TEST_CASE("External threads sleep") {
125     if (utils::get_platform_max_threads() < 2) return;
126     utils::SpinBarrier barrier(2);
127 
128     tbb::task_group test_gr;
129 
130     test_gr.run([&] {
131         barrier.wait();
132         TestCPUUserTime(2);
133     });
134 
135     barrier.wait();
136 
137     test_gr.wait();
138 }
139 
140 //! \brief \ref error_guessing
141 TEST_CASE("Test that task was executed p times") {
142     tbb::detail::d1::wait_context wait(1);
143     tbb::task_group_context test_context;
144     CountingTask<> test_task(wait);
145 
146     constexpr std::size_t iter_counter = 10000;
147     for (std::size_t i = 0; i < iter_counter; ++i) {
148         tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
149         wait.reserve(1);
150     }
151 
152     wait.release(1);
153 
154     REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
155     REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
156     CountingTask<>::reset();
157 }
158 
159 #if TBB_USE_EXCEPTIONS
160 //! \brief \ref error_guessing
161 TEST_CASE("Test cancellation on exception") {
162     test_cancellation_on_exception(/*reset_ctx = */true);
163     test_cancellation_on_exception(/*reset_ctx = */false);
164 }
165 #endif // TBB_USE_EXCEPTIONS
166 
167 //! \brief \ref error_guessing
168 TEST_CASE("Simple test parallelism usage") {
169     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
170     utils::SpinBarrier barrier(threads_num);
171 
172     auto barrier_wait = [&barrier] {
173         barrier.wait();
174     };
175 
176     tbb::detail::d1::wait_context wait(threads_num);
177     tbb::detail::d1::task_group_context test_context;
178     using task_type = CountingTask<decltype(barrier_wait)>;
179 
180     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
181 
182     constexpr std::size_t iter_counter = 100;
183     for (std::size_t i = 0; i < iter_counter; ++i) {
184         for (std::size_t j = 0; j < threads_num; ++j) {
185             tbb::detail::d1::spawn(vector_test_task[j], test_context);
186         }
187         tbb::detail::d1::wait(wait, test_context);
188         wait.reserve(threads_num);
189     }
190     wait.release(threads_num);
191 
192     REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
193     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
194     task_type::reset();
195 }
196 
197 //! \brief \ref error_guessing
198 TEST_CASE("Test parallelism usage with parallel_for") {
199     std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
200     utils::SpinBarrier barrier(task_threads_num);
201 
202     auto barrier_wait = [&barrier] {
203         barrier.wait();
204     };
205 
206     std::size_t pfor_iter_count = 10000;
207     std::atomic<std::size_t> pfor_counter(0);
208 
209     auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
210         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
211                           [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
212                               for (auto it = range.begin(); it != range.end(); ++it) {
213                                   ++pfor_counter;
214                               }
215                            }
216         );
217     };
218 
219     tbb::detail::d1::wait_context wait(task_threads_num);
220     tbb::detail::d1::task_group_context test_context;
221     using task_type = CountingTask<decltype(barrier_wait)>;
222     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
223 
224     constexpr std::size_t iter_count = 10;
225     constexpr std::size_t pfor_threads_num = 4;
226     for (std::size_t i = 0; i < iter_count; ++i) {
227         std::vector<std::thread> pfor_threads;
228 
229         for (std::size_t j = 0; j < task_threads_num; ++j) {
230             tbb::detail::d1::spawn(vector_test_task[j], test_context);
231         }
232 
233         for (std::size_t k = 0; k < pfor_threads_num; ++k) {
234             pfor_threads.emplace_back(parallel_for_func);
235         }
236 
237         tbb::detail::d1::wait(wait, test_context);
238 
239         for (auto& thread : pfor_threads) {
240             if (thread.joinable()) {
241                 thread.join();
242             }
243         }
244 
245         wait.reserve(task_threads_num);
246     }
247     wait.release(task_threads_num);
248 
249     REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
250     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
251     REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
252     task_type::reset();
253 }
254 
255 //! \brief \ref error_guessing
256 TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
257     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
258     utils::SpinBarrier barrier(threads_num);
259 
260     auto barrier_wait = [&barrier] {
261         barrier.wait();
262     };
263 
264     tbb::detail::d1::wait_context wait(threads_num);
265     tbb::detail::d1::task_group_context test_context;
266     using task_type = CountingTask<decltype(barrier_wait)>;
267     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
268 
269     auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
270         tbb::detail::d1::spawn(vector_test_task[idx], test_context);
271     };
272 
273     constexpr std::size_t iter_count = 10;
274     for (std::size_t i = 0; i < iter_count; ++i) {
275         std::vector<std::thread> threads;
276 
277         for (std::size_t k = 0; k < threads_num - 1; ++k) {
278             threads.emplace_back(thread_func, k);
279         }
280 
281         for (auto& thread : threads) {
282             if (thread.joinable()) {
283                 thread.join();
284             }
285         }
286 
287         tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
288         wait.reserve(threads_num);
289     }
290     wait.release(threads_num);
291 
292     REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
293     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
294     task_type::reset();
295 }
296 
297 class SpawningTaskBody;
298 
299 using SpawningTask = CountingTask<SpawningTaskBody>;
300 
301 class SpawningTaskBody {
302 public:
303     using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
304 
305     SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
306         : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
307 
308     void operator()() const {
309         std::size_t delta = 7;
310         std::size_t start_idx = my_current_task.fetch_add(delta);
311 
312         if (start_idx < my_task_pool.size()) {
313             for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
314                 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
315             }
316         }
317     }
318 private:
319     task_pool_type& my_task_pool;
320     tbb::task_group_context& my_test_ctx;
321     static std::atomic<std::size_t> my_current_task;
322 }; // class SpawningTaskBody
323 
324 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
325 
326 //! \brief \ref error_guessing
327 TEST_CASE("Actively adding tasks") {
328     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
329 
330     tbb::detail::d1::wait_context wait(task_number + 1);
331     tbb::task_group_context test_context;
332 
333     SpawningTaskBody::task_pool_type task_pool;
334 
335     SpawningTaskBody task_body{task_pool, test_context};
336     for (std::size_t i = 0; i < task_number; ++i) {
337         task_pool.emplace_back(task_body, wait);
338     }
339 
340     SpawningTask first_task(task_body, wait);
341     tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
342 
343     REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
344     REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
345 }
346 
347 #if __TBB_RESUMABLE_TASKS
348 struct suspended_task : public tbb::detail::d1::task {
349 
350     suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
351         : my_suspend_tag(tag), my_wait(wait)
352     {}
353 
354     task* execute(tbb::detail::d1::execution_data&) override {
355         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
356             [] (const tbb::blocked_range<std::size_t>& range) {
357                 // Make some heavy work
358                 std::atomic<int> sum{};
359                 for (auto it = range.begin(); it != range.end(); ++it) {
360                     ++sum;
361                 }
362             },
363             tbb::static_partitioner{}
364         );
365 
366         my_wait.release();
367         tbb::task::resume(my_suspend_tag);
368         return nullptr;
369     }
370 
371     task* cancel(tbb::detail::d1::execution_data&) override {
372         FAIL("The function should never be called.");
373         return nullptr;
374     }
375 
376     tbb::task::suspend_point my_suspend_tag;
377     tbb::detail::d1::wait_context& my_wait;
378 };
379 
380 //! \brief \ref error_guessing
381 TEST_CASE("Isolation + resumable tasks") {
382     std::atomic<int> suspend_flag{};
383     tbb::task_group_context test_context;
384 
385     std::atomic<int> suspend_count{};
386     std::atomic<int> resume_count{};
387 
388     tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
389         [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
390             int ticket = 0;
391             for (auto it = range.begin(); it != range.end(); ++it) {
392                 ticket = suspend_flag++;
393             }
394 
395             if (ticket % 5 == 0) {
396                 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
397                 tbb::detail::d1::wait_context wait(1);
398                 ++suspend_count;
399                 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
400                     auto thread_id = std::this_thread::get_id();
401                     tbb::task::suspend([&wait, &test_context, &test_task, thread_id] (tbb::task::suspend_point tag) {
402                         CHECK(thread_id == std::this_thread::get_id());
403                         test_task.emplace_back(tag, wait);
404                         tbb::detail::d1::spawn(test_task[0], test_context);
405                     });
406                     }
407                 );
408                 tbb::detail::d1::wait(wait, test_context);
409                 ++resume_count;
410             }
411         }
412     );
413 
414     CHECK(suspend_count == resume_count);
415 }
416 
417 struct bypass_task : public tbb::detail::d1::task {
418     using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
419 
420     bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
421                 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag)
422         : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
423     {}
424 
425     task* execute(tbb::detail::d1::execution_data&) override {
426         utils::doDummyWork(10000);
427 
428         int expected = 1;
429         if (my_resume_flag.compare_exchange_strong(expected, 2)) {
430             tbb::task::resume(my_suspend_tag);
431         }
432 
433         std::size_t ticket = my_current_task++;
434         task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
435 
436         if (!next && my_resume_flag != 2) {
437             // Rarely all tasks can be executed before the suspend.
438             // So, wait for the suspend before leaving.
439             utils::SpinWaitWhileEq(my_resume_flag, 0);
440             expected = 1;
441             if (my_resume_flag.compare_exchange_strong(expected, 2)) {
442                 tbb::task::resume(my_suspend_tag);
443             }
444         }
445 
446         my_wait.release();
447         return next;
448     }
449 
450     task* cancel(tbb::detail::d1::execution_data&) override {
451         FAIL("The function should never be called.");
452         return nullptr;
453     }
454 
455     tbb::detail::d1::wait_context& my_wait;
456     task_pool_type& my_task_pool;
457     std::atomic<int>& my_resume_flag;
458     tbb::task::suspend_point& my_suspend_tag;
459     static std::atomic<int> my_current_task;
460 };
461 
462 std::atomic<int> bypass_task::my_current_task(0);
463 
464 thread_local int test_tls = 0;
465 
466 //! \brief \ref error_guessing
467 TEST_CASE("Bypass suspended by resume") {
468     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
469     tbb::task_group_context test_context;
470     tbb::detail::d1::wait_context wait(task_number + 1);
471 
472     test_tls = 1;
473 
474     std::atomic<int> resume_flag{0};
475     tbb::task::suspend_point test_suspend_tag;
476 
477     std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
478 
479     for (std::size_t i = 0; i < task_number; ++i) {
480         test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
481     }
482 
483     for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
484         std::size_t ticket = bypass_task::my_current_task++;
485         if (ticket < test_task_pool.size()) {
486             tbb::detail::d1::spawn(test_task_pool[ticket], test_context);
487         }
488     }
489 
490     auto suspend_func = [&resume_flag, &test_suspend_tag] {
491         auto thread_id = std::this_thread::get_id();
492         tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
493             CHECK(thread_id == std::this_thread::get_id());
494             test_suspend_tag = tag;
495             resume_flag = 1;
496         });
497     };
498     using task_type = CountingTask<decltype(suspend_func)>;
499     task_type suspend_task(suspend_func, wait);
500 
501     tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
502     CHECK(bypass_task::my_current_task >= test_task_pool.size());
503     REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
504 }
505 
506 //! \brief \ref error_guessing
507 TEST_CASE("Critical tasks + resume") {
508     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
509 
510     tbb::task_group_context test_context;
511     tbb::detail::d1::wait_context wait{ 0 };
512 
513     // The test expects at least one thread in test_arena
514     int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads()));
515     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena);
516     tbb::task_arena test_arena(num_threads_in_test_arena);
517 
518     test_arena.initialize();
519 
520     std::atomic<bool> resume_flag{}, resumed{};
521     tbb::task::suspend_point test_suspend_tag;
522 
523     auto task_body = [&resume_flag, &resumed, &test_suspend_tag] {
524         // Make some work
525         utils::doDummyWork(1000);
526 
527         if (resume_flag.exchange(false)) {
528             tbb::task::resume(test_suspend_tag);
529             resumed = true;
530         }
531     };
532 
533     using task_type = CountingTask<decltype(task_body)>;
534     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
535 
536     for (std::size_t i = 0; i < task_number; ++i) {
537         test_tasks.emplace_back(task_body, wait);
538     }
539 
540     wait.reserve(task_number / 2);
541     for (std::size_t i = 0; i < task_number / 2; ++i) {
542         submit(test_tasks[i], test_arena, test_context, true);
543     }
544 
545     auto suspend_func = [&resume_flag, &test_suspend_tag] {
546         auto thread_id = std::this_thread::get_id();
547         tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
548             CHECK(thread_id == std::this_thread::get_id());
549             test_suspend_tag = tag;
550             resume_flag.store(true, std::memory_order_release);
551         });
552     };
553     using suspend_task_type = CountingTask<decltype(suspend_func)>;
554     suspend_task_type suspend_task(suspend_func, wait);
555 
556     wait.reserve(1);
557     submit(suspend_task, test_arena, test_context, true);
558 
559     test_arena.execute([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
560         tbb::this_task_arena::isolate([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
561             do {
562                 wait.reserve(task_number / 2);
563                 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number),
564                     [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
565                         for (std::size_t i = range.begin(); i != range.end(); ++i) {
566                             submit(test_tasks[i], test_arena, test_context, true);
567                         }
568                     }
569                 );
570             } while (!resumed);
571         });
572     });
573 
574     test_arena.execute([&wait, &test_context] {
575         tbb::detail::d1::wait(wait, test_context);
576     });
577 }
578 
579 //! \brief \ref error_guessing
580 TEST_CASE("Stress testing") {
581     std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
582 
583     tbb::task_group_context test_context;
584     tbb::detail::d1::wait_context wait(task_number);
585 
586     tbb::task_arena test_arena;
587 
588     test_arena.initialize();
589 
590     auto task_body = [] {
591         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
592             utils::doDummyWork(100);
593         });
594     };
595     using task_type = CountingTask<decltype(task_body)>;
596 
597     std::size_t iter_counter = 20;
598 
599     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
600 
601     for (std::size_t j = 0; j < task_number; ++j) {
602         test_tasks.emplace_back(task_body, wait);
603     }
604 
605     test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
606         for (std::size_t i = 0; i < iter_counter; ++i) {
607 
608             for (std::size_t j = 0; j < task_number; ++j) {
609                 test_arena.enqueue(task_body);
610             }
611 
612             for (std::size_t j = 0; j < task_number / 2; ++j) {
613                 tbb::detail::d1::spawn(test_tasks[j], test_context);
614             }
615 
616             for (std::size_t j = task_number / 2; j < task_number; ++j) {
617                 submit(test_tasks[j], test_arena, test_context, true);
618             }
619 
620             tbb::detail::d1::wait(wait, test_context);
621             wait.reserve(task_number);
622         }
623         wait.release(task_number);
624     });
625 
626 
627     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
628     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
629 }
630 
631 //! \brief \ref error_guessing
632 TEST_CASE("All workers sleep") {
633     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
634     tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
635 
636     tbb::task_group test_gr;
637 
638     utils::SpinBarrier barrier(thread_number);
639     auto resumble_task = [&] {
640         barrier.wait();
641         auto thread_id = std::this_thread::get_id();
642         tbb::task::suspend([&] (tbb::task::suspend_point sp) {
643             CHECK(thread_id == std::this_thread::get_id());
644             suspend_points.push_back(sp);
645             barrier.wait();
646         });
647     };
648 
649     for (std::size_t i = 0; i < thread_number - 1; ++i) {
650         test_gr.run(resumble_task);
651     }
652 
653     barrier.wait();
654     barrier.wait();
655     TestCPUUserTime(thread_number);
656 
657     for (auto sp : suspend_points)
658         tbb::task::resume(sp);
659     test_gr.wait();
660 }
661 
662 #endif // __TBB_RESUMABLE_TASKS
663 
664 //! \brief \ref error_guessing
665 TEST_CASE("Enqueue with exception") {
666     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
667 
668     tbb::task_group_context test_context;
669     tbb::detail::d1::wait_context wait(task_number);
670 
671     tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
672 
673     test_arena.initialize();
674 
675     auto task_body = [] {
676         utils::doDummyWork(100);
677     };
678 
679     std::atomic<bool> end_flag{false};
680     auto check_body = [&end_flag] {
681         end_flag.store(true, std::memory_order_relaxed);
682     };
683 
684     using task_type = CountingTask<decltype(task_body)>;
685     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
686 
687     for (std::size_t j = 0; j < task_number; ++j) {
688         test_tasks.emplace_back(task_body, wait);
689     }
690 
691     {
692         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
693         test_arena.enqueue(task_body);
694         // Initialize implicit arena
695         tbb::parallel_for(0, 1, [] (int) {});
696         tbb::task_arena test_arena2(tbb::task_arena::attach{});
697         test_arena2.enqueue(task_body);
698     }
699 
700     constexpr std::size_t iter_count = 10;
701     for (std::size_t k = 0; k < iter_count; ++k) {
702         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
703         test_arena.enqueue(check_body);
704 
705         while (!end_flag.load(std::memory_order_relaxed)) ;
706 
707         utils::Sleep(1);
708         end_flag.store(false, std::memory_order_relaxed);
709 
710         test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
711             for (std::size_t j = 0; j < task_number; ++j) {
712                 tbb::detail::d1::spawn(test_tasks[j], test_context);
713             }
714 
715             tbb::detail::d1::wait(wait, test_context);
716             wait.reserve(task_number);
717         });
718     }
719     wait.release(task_number);
720 
721 
722     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
723     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
724 }
725 
726 struct resubmitting_task : public tbb::detail::d1::task {
727     tbb::task_arena& my_arena;
728     tbb::task_group_context& my_ctx;
729     std::atomic<int> counter{100000};
730 
731     resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx)
732     {}
733 
734     tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override {
735         if (counter-- > 0) {
736             submit(*this, my_arena, my_ctx, true);
737         }
738         return nullptr;
739     }
740 
741     tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override {
742         FAIL("The function should never be called.");
743         return nullptr;
744     }
745 };
746 
747 //! \brief \ref error_guessing
748 TEST_CASE("Test with priority inversion") {
749     if (!utils::can_change_thread_priority()) return;
750 
751     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
752     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1);
753 
754     tbb::task_arena test_arena(2 * thread_number, thread_number);
755     test_arena.initialize();
756     utils::pinning_observer obsr(test_arena);
757     CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated");
758 
759     std::uint32_t  critical_task_counter = 1000 * thread_number;
760     std::atomic<std::size_t> task_counter{0};
761 
762     tbb::task_group_context test_context;
763     tbb::detail::d1::wait_context wait(critical_task_counter);
764 
765     auto critical_work = [&] {
766         utils::doDummyWork(10);
767     };
768 
769     using suspend_task_type = CountingTask<decltype(critical_work)>;
770     suspend_task_type critical_task(critical_work, wait);
771 
772     auto high_priority_thread_func = [&] {
773         // Increase external threads priority
774         utils::increased_priority_guard guard{};
775         utils::suppress_unused_warning(guard);
776         // pin external threads
777         test_arena.execute([]{});
778         while (task_counter++ < critical_task_counter) {
779             submit(critical_task, test_arena, test_context, true);
780             std::this_thread::sleep_for(std::chrono::milliseconds(1));
781         }
782     };
783 
784     resubmitting_task worker_task(test_arena, test_context);
785     // warm up
786     // take first core on execute
787     utils::SpinBarrier barrier(thread_number + 1);
788     test_arena.execute([&] {
789         tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t) {
790             barrier.wait();
791             submit(worker_task, test_arena, test_context, true);
792         });
793     });
794 
795     std::vector<std::thread> high_priority_threads;
796     for (std::size_t i = 0; i < thread_number - 1; ++i) {
797         high_priority_threads.emplace_back(high_priority_thread_func);
798     }
799 
800     utils::increased_priority_guard guard{};
801     utils::suppress_unused_warning(guard);
802     while (task_counter++ < critical_task_counter) {
803         submit(critical_task, test_arena, test_context, true);
804         std::this_thread::sleep_for(std::chrono::milliseconds(1));
805     }
806 
807     tbb::detail::d1::wait(wait, test_context);
808 
809     for (std::size_t i = 0; i < thread_number - 1; ++i) {
810         high_priority_threads[i].join();
811     }
812 }
813 
814 // Explicit test for raii_guard move ctor because of copy elision optimization
815 // TODO: consider better test file for the test case
816 //! \brief \ref interface
817 TEST_CASE("raii_guard move ctor") {
818     int count{0};
819     auto func = [&count] {
820         count++;
821         CHECK(count == 1);
822     };
823 
824     tbb::detail::d0::raii_guard<decltype(func)> guard1(func);
825     tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1));
826 }
827 
828 //! \brief \ref error_guessing
829 TEST_CASE("Check correct arena destruction with enqueue") {
830     for (int i = 0; i < 100; ++i) {
831         tbb::task_scheduler_handle handle{ tbb::attach{} };
832         {
833             tbb::task_arena a(2, 0);
834 
835             a.enqueue([] {
836                 tbb::parallel_for(0, 100, [] (int) { std::this_thread::sleep_for(std::chrono::nanoseconds(10)); });
837             });
838             std::this_thread::sleep_for(std::chrono::microseconds(1));
839         }
840         tbb::finalize(handle, std::nothrow_t{});
841     }
842 }
843