xref: /oneTBB/test/tbb/test_task.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/spin_barrier.h"
20 #include "common/utils_concurrency_limit.h"
21 #include "common/cpu_usertime.h"
22 
23 #include "tbb/task.h"
24 #include "tbb/task_group.h"
25 #include "tbb/parallel_for.h"
26 #include "tbb/cache_aligned_allocator.h"
27 #include "tbb/global_control.h"
28 #include "tbb/concurrent_vector.h"
29 
30 #include <atomic>
31 #include <thread>
32 #include <thread>
33 
34 //! \file test_task.cpp
35 //! \brief Test for [internal] functionality
36 
37 struct EmptyBody {
38     void operator()() const {}
39 };
40 
41 #if _MSC_VER && !defined(__INTEL_COMPILER)
42 // unreachable code
43 #pragma warning( push )
44 #pragma warning( disable: 4702 )
45 #endif
46 
47 template <typename Body = EmptyBody>
48 class CountingTask : public tbb::detail::d1::task {
49 public:
50     CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
51 
52     CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
53 
54     task* execute( tbb::detail::d1::execution_data& ) override {
55         ++my_execute_counter;
56         my_body();
57         my_wait.release();
58         return nullptr;
59     }
60 
61     task* cancel( tbb::detail::d1::execution_data& ) override {
62         ++my_cancel_counter;
63         my_wait.release();
64         return nullptr;
65     }
66 
67     static void reset() {
68         my_execute_counter = 0;
69         my_cancel_counter = 0;
70     }
71 
72     static std::size_t execute_counter() { return my_execute_counter; }
73     static std::size_t cancel_counter() { return my_cancel_counter; }
74 
75 private:
76     Body my_body;
77     tbb::detail::d1::wait_context& my_wait;
78 
79     static std::atomic<std::size_t> my_execute_counter;
80     static std::atomic<std::size_t> my_cancel_counter;
81 }; // struct CountingTask
82 
83 
84 #if _MSC_VER && !defined(__INTEL_COMPILER)
85 #pragma warning( pop )
86 #endif // warning 4702 is back
87 
88 template <typename Body>
89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
90 
91 template <typename Body>
92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
93 
94 #if TBB_USE_EXCEPTIONS
95 void test_cancellation_on_exception( bool reset_ctx ) {
96     tbb::detail::d1::wait_context wait(1);
97     tbb::task_group_context test_context;
98     auto throw_body = [] {
99         throw 1;
100     };
101     CountingTask<decltype(throw_body)> task(throw_body, wait);
102 
103     constexpr std::size_t iter_counter = 1000;
104     for (std::size_t i = 0; i < iter_counter; ++i) {
105         try {
106             tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
107         } catch(int ex) {
108             REQUIRE(ex == 1);
109         }
110         if (reset_ctx) {
111             test_context.reset();
112         }
113         wait.reserve(1);
114     }
115     wait.release(1);
116 
117     REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
118     REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
119     task.reset();
120 }
121 #endif // TBB_USE_EXCEPTIONS
122 
123 //! \brief \ref error_guessing
124 TEST_CASE("Test that task was executed p times") {
125     tbb::detail::d1::wait_context wait(1);
126     tbb::task_group_context test_context;
127     CountingTask<> test_task(wait);
128 
129     constexpr std::size_t iter_counter = 10000;
130     for (std::size_t i = 0; i < iter_counter; ++i) {
131         tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
132         wait.reserve(1);
133     }
134 
135     wait.release(1);
136 
137     REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
138     REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
139     CountingTask<>::reset();
140 }
141 
142 #if TBB_USE_EXCEPTIONS
143 //! \brief \ref error_guessing
144 TEST_CASE("Test cancellation on exception") {
145     test_cancellation_on_exception(/*reset_ctx = */true);
146     test_cancellation_on_exception(/*reset_ctx = */false);
147 }
148 #endif // TBB_USE_EXCEPTIONS
149 
150 //! \brief \ref error_guessing
151 TEST_CASE("Simple test parallelism usage") {
152     std::size_t threads_num = utils::get_platform_max_threads();
153     utils::SpinBarrier barrier(threads_num);
154 
155     auto barrier_wait = [&barrier] {
156         barrier.wait();
157     };
158 
159     tbb::detail::d1::wait_context wait(threads_num);
160     tbb::detail::d1::task_group_context test_context;
161     using task_type = CountingTask<decltype(barrier_wait)>;
162 
163     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
164 
165     constexpr std::size_t iter_counter = 100;
166     for (std::size_t i = 0; i < iter_counter; ++i) {
167         for (std::size_t j = 0; j < threads_num; ++j) {
168             tbb::detail::d1::spawn(vector_test_task[j], test_context);
169         }
170         tbb::detail::d1::wait(wait, test_context);
171         wait.reserve(threads_num);
172     }
173     wait.release(threads_num);
174 
175     REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
176     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
177     task_type::reset();
178 }
179 
180 //! \brief \ref error_guessing
181 TEST_CASE("Test parallelism usage with parallel_for") {
182     std::size_t task_threads_num = utils::get_platform_max_threads();
183     utils::SpinBarrier barrier(task_threads_num);
184 
185     auto barrier_wait = [&barrier] {
186         barrier.wait();
187     };
188 
189     std::size_t pfor_iter_count = 10000;
190     std::atomic<std::size_t> pfor_counter(0);
191 
192     auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
193         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
194                           [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
195                               for (auto it = range.begin(); it != range.end(); ++it) {
196                                   ++pfor_counter;
197                               }
198                            }
199         );
200     };
201 
202     tbb::detail::d1::wait_context wait(task_threads_num);
203     tbb::detail::d1::task_group_context test_context;
204     using task_type = CountingTask<decltype(barrier_wait)>;
205     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
206 
207     constexpr std::size_t iter_count = 10;
208     constexpr std::size_t pfor_threads_num = 4;
209     for (std::size_t i = 0; i < iter_count; ++i) {
210         std::vector<std::thread> pfor_threads;
211 
212         for (std::size_t j = 0; j < task_threads_num; ++j) {
213             tbb::detail::d1::spawn(vector_test_task[j], test_context);
214         }
215 
216         for (std::size_t k = 0; k < pfor_threads_num; ++k) {
217             pfor_threads.emplace_back(parallel_for_func);
218         }
219 
220         tbb::detail::d1::wait(wait, test_context);
221 
222         for (auto& thread : pfor_threads) {
223             if (thread.joinable()) {
224                 thread.join();
225             }
226         }
227 
228         wait.reserve(task_threads_num);
229     }
230     wait.release(task_threads_num);
231 
232     REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
233     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
234     REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
235     task_type::reset();
236 }
237 
238 //! \brief \ref error_guessing
239 TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
240     std::size_t threads_num = utils::get_platform_max_threads();
241     utils::SpinBarrier barrier(threads_num);
242 
243     auto barrier_wait = [&barrier] {
244         barrier.wait();
245     };
246 
247     tbb::detail::d1::wait_context wait(threads_num);
248     tbb::detail::d1::task_group_context test_context;
249     using task_type = CountingTask<decltype(barrier_wait)>;
250     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
251 
252     auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
253         tbb::detail::d1::spawn(vector_test_task[idx], test_context);
254     };
255 
256     constexpr std::size_t iter_count = 10;
257     for (std::size_t i = 0; i < iter_count; ++i) {
258         std::vector<std::thread> threads;
259 
260         for (std::size_t k = 0; k < threads_num - 1; ++k) {
261             threads.emplace_back(thread_func, k);
262         }
263 
264         for (auto& thread : threads) {
265             if (thread.joinable()) {
266                 thread.join();
267             }
268         }
269 
270         tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
271         wait.reserve(threads_num);
272     }
273     wait.release(threads_num);
274 
275     REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
276     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
277     task_type::reset();
278 }
279 
280 class SpawningTaskBody;
281 
282 using SpawningTask = CountingTask<SpawningTaskBody>;
283 
284 class SpawningTaskBody {
285 public:
286     using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
287 
288     SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
289         : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
290 
291     void operator()() const {
292         std::size_t delta = 7;
293         std::size_t start_idx = my_current_task.fetch_add(delta);
294 
295         if (start_idx < my_task_pool.size()) {
296             for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
297                 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
298             }
299         }
300     }
301 private:
302     task_pool_type& my_task_pool;
303     tbb::task_group_context& my_test_ctx;
304     static std::atomic<std::size_t> my_current_task;
305 }; // class SpawningTaskBody
306 
307 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
308 
309 //! \brief \ref error_guessing
310 TEST_CASE("Actively adding tasks") {
311     std::size_t task_number = 500 * utils::get_platform_max_threads();
312 
313     tbb::detail::d1::wait_context wait(task_number + 1);
314     tbb::task_group_context test_context;
315 
316     SpawningTaskBody::task_pool_type task_pool;
317 
318     SpawningTaskBody task_body{task_pool, test_context};
319     for (std::size_t i = 0; i < task_number; ++i) {
320         task_pool.emplace_back(task_body, wait);
321     }
322 
323     SpawningTask first_task(task_body, wait);
324     tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
325 
326     REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
327     REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
328 }
329 
330 #if __TBB_RESUMABLE_TASKS
331 
332 struct suspended_task : public tbb::detail::d1::task {
333 
334     suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
335         : my_suspend_tag(tag), my_wait(wait)
336     {}
337 
338     task* execute(tbb::detail::d1::execution_data&) override {
339         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
340             [] (const tbb::blocked_range<std::size_t>& range) {
341                 // Make some heavy work
342                 std::atomic<int> sum{};
343                 for (auto it = range.begin(); it != range.end(); ++it) {
344                     ++sum;
345                 }
346             },
347             tbb::static_partitioner{}
348         );
349 
350         my_wait.release();
351         tbb::task::resume(my_suspend_tag);
352         return nullptr;
353     }
354 
355     task* cancel(tbb::detail::d1::execution_data&) override {
356         FAIL("The function should never be called.");
357         return nullptr;
358     }
359 
360     tbb::task::suspend_point my_suspend_tag;
361     tbb::detail::d1::wait_context& my_wait;
362 };
363 
364 //! \brief \ref error_guessing
365 TEST_CASE("Isolation + resumable tasks") {
366     std::atomic<int> suspend_flag{};
367     tbb::task_group_context test_context;
368 
369     std::atomic<int> suspend_count{};
370     std::atomic<int> resume_count{};
371 
372     tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
373         [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
374             int ticket = 0;
375             for (auto it = range.begin(); it != range.end(); ++it) {
376                 ticket = suspend_flag++;
377             }
378 
379             if (ticket % 5 == 0) {
380                 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
381                 tbb::detail::d1::wait_context wait(1);
382                 ++suspend_count;
383                 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
384                     tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) {
385                         test_task.emplace_back(tag, wait);
386                         tbb::detail::d1::spawn(test_task[0], test_context);
387                     });
388                     }
389                 );
390                 tbb::detail::d1::wait(wait, test_context);
391                 ++resume_count;
392             }
393         }
394     );
395 
396     CHECK(suspend_count == resume_count);
397 }
398 
399 struct bypass_task : public tbb::detail::d1::task {
400     using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
401 
402     bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
403                 std::atomic<bool>& resume_flag, tbb::task::suspend_point& suspend_tag)
404         : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
405     {}
406 
407     task* execute(tbb::detail::d1::execution_data&) override {
408         std::atomic<int> sum{};
409 
410         // Make some heavy work
411         for (std::size_t i = 0; i < 100000; ++i) {
412             ++sum;
413         }
414 
415         my_wait.release();
416 
417         if (my_resume_flag.exchange(false)) {
418             tbb::task::resume(my_suspend_tag);
419         }
420 
421         std::size_t ticket = my_current_task++;
422         return ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
423     }
424 
425     task* cancel(tbb::detail::d1::execution_data&) override {
426         FAIL("The function should never be called.");
427         return nullptr;
428     }
429 
430     tbb::detail::d1::wait_context& my_wait;
431     task_pool_type& my_task_pool;
432     std::atomic<bool>& my_resume_flag;
433     tbb::task::suspend_point& my_suspend_tag;
434     static std::atomic<int> my_current_task;
435 };
436 
437 std::atomic<int> bypass_task::my_current_task(0);
438 
439 thread_local int test_tls = 0;
440 
441 //! \brief \ref error_guessing
442 TEST_CASE("Bypass suspended by resume") {
443     std::size_t task_number = 500 * utils::get_platform_max_threads();
444     tbb::task_group_context test_context;
445     tbb::detail::d1::wait_context wait(task_number + 1);
446 
447     test_tls = 1;
448 
449     std::atomic<bool> resume_flag{false};
450     tbb::task::suspend_point test_suspend_tag;
451 
452     std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
453 
454     for (std::size_t i = 0; i < task_number; ++i) {
455         test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
456     }
457 
458     for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
459         tbb::detail::d1::spawn(test_task_pool[bypass_task::my_current_task++], test_context);
460     }
461 
462     auto suspend_func = [&resume_flag, &test_suspend_tag] {
463         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
464             test_suspend_tag = tag;
465             resume_flag.store(true, std::memory_order_release);
466         });
467     };
468     using task_type = CountingTask<decltype(suspend_func)>;
469     task_type suspend_task(suspend_func, wait);
470 
471     tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
472     CHECK(bypass_task::my_current_task >= test_task_pool.size());
473     REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
474 }
475 
476 //! \brief \ref error_guessing
477 TEST_CASE("Critical tasks + resume") {
478     std::size_t task_number = 500 * utils::get_platform_max_threads();
479 
480     tbb::task_group_context test_context;
481     tbb::detail::d1::wait_context wait(task_number);
482 
483     tbb::task_arena test_arena;
484 
485     test_arena.initialize();
486 
487     std::atomic<bool> resume_flag{};
488     tbb::task::suspend_point test_suspend_tag;
489 
490     auto task_body = [&resume_flag, &test_suspend_tag] {
491         std::atomic<int> sum{};
492 
493         // Make some work
494         for (std::size_t i = 0; i < 1000; ++i) {
495             ++sum;
496         }
497 
498         if (resume_flag.exchange(false)) {
499             tbb::task::resume(test_suspend_tag);
500         }
501     };
502 
503     using task_type = CountingTask<decltype(task_body)>;
504     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
505 
506     for (std::size_t i = 0; i < task_number; ++i) {
507         test_tasks.emplace_back(task_body, wait);
508     }
509 
510     for (std::size_t i = 0; i < task_number / 2; ++i) {
511         submit(test_tasks[i], test_arena, test_context, true);
512     }
513 
514     auto suspend_func = [&resume_flag, &test_suspend_tag] {
515         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
516             test_suspend_tag = tag;
517             resume_flag.store(true, std::memory_order_release);
518         });
519     };
520     using suspend_task_type = CountingTask<decltype(suspend_func)>;
521     suspend_task_type suspend_task(suspend_func, wait);
522 
523     submit(suspend_task, test_arena, test_context, true);
524 
525     test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] {
526     tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] {
527         tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1),
528             [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
529                 for (std::size_t i = range.begin(); i != range.end(); ++i) {
530                     submit(test_tasks[i], test_arena, test_context, true);
531                 }
532             });
533         });
534     });
535 
536     tbb::detail::d1::wait(wait, test_context);
537 }
538 
539 //! \brief \ref error_guessing
540 TEST_CASE("Stress testing") {
541     std::size_t task_number = utils::get_platform_max_threads();
542 
543     tbb::task_group_context test_context;
544     tbb::detail::d1::wait_context wait(task_number);
545 
546     tbb::task_arena test_arena;
547 
548     test_arena.initialize();
549 
550     auto task_body = [] {
551         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
552             std::atomic<int> sum{};
553             // Make some work
554             for (std::size_t i = 0; i < 100; ++i) {
555                 ++sum;
556             }
557         });
558     };
559     using task_type = CountingTask<decltype(task_body)>;
560 
561     std::size_t iter_counter = 20;
562 
563     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
564 
565     for (std::size_t j = 0; j < task_number; ++j) {
566         test_tasks.emplace_back(task_body, wait);
567     }
568 
569     test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
570         for (std::size_t i = 0; i < iter_counter; ++i) {
571 
572             for (std::size_t j = 0; j < task_number; ++j) {
573                 test_arena.enqueue(task_body);
574             }
575 
576             for (std::size_t j = 0; j < task_number / 2; ++j) {
577                 tbb::detail::d1::spawn(test_tasks[j], test_context);
578             }
579 
580             for (std::size_t j = task_number / 2; j < task_number; ++j) {
581                 submit(test_tasks[j], test_arena, test_context, true);
582             }
583 
584             tbb::detail::d1::wait(wait, test_context);
585             wait.reserve(task_number);
586         }
587         wait.release(task_number);
588     });
589 
590 
591     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
592     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
593 }
594 
595 //! \brief \ref error_guessing
596 TEST_CASE("All workers sleep") {
597     std::size_t thread_number = utils::get_platform_max_threads();
598     tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
599 
600     tbb::task_group test_gr;
601 
602     utils::SpinBarrier barrier(thread_number);
603     auto resumble_task = [&] {
604         barrier.wait();
605         tbb::task::suspend([&] (tbb::task::suspend_point sp) {
606             suspend_points.push_back(sp);
607             barrier.wait();
608         });
609     };
610 
611     for (std::size_t i = 0; i < thread_number - 1; ++i) {
612         test_gr.run(resumble_task);
613     }
614 
615     barrier.wait();
616     barrier.wait();
617     TestCPUUserTime(thread_number);
618 
619     for (auto sp : suspend_points)
620         tbb::task::resume(sp);
621     test_gr.wait();
622 }
623 
624 //! \brief \ref error_guessing
625 TEST_CASE("External threads sleep") {
626     if (utils::get_platform_max_threads() < 2) return;
627     utils::SpinBarrier barrier(2);
628 
629     tbb::task_group test_gr;
630 
631     test_gr.run([&] {
632         barrier.wait();
633         TestCPUUserTime(2);
634     });
635 
636     barrier.wait();
637 
638     test_gr.wait();
639 }
640 
641 #endif // __TBB_RESUMABLE_TASKS
642 
643 //! \brief \ref error_guessing
644 TEST_CASE("Enqueue with exception") {
645     std::size_t task_number = 500 * utils::get_platform_max_threads();
646 
647     tbb::task_group_context test_context;
648     tbb::detail::d1::wait_context wait(task_number);
649 
650     tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
651 
652     test_arena.initialize();
653 
654     auto task_body = [] {
655         std::atomic<int> sum{};
656         // Make some work
657         for (std::size_t i = 0; i < 100; ++i) {
658             ++sum;
659         }
660     };
661 
662     std::atomic<bool> end_flag{false};
663     auto check_body = [&end_flag] {
664         end_flag.store(true, std::memory_order_relaxed);
665     };
666 
667     using task_type = CountingTask<decltype(task_body)>;
668     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
669 
670     for (std::size_t j = 0; j < task_number; ++j) {
671         test_tasks.emplace_back(task_body, wait);
672     }
673 
674     {
675         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
676         test_arena.enqueue(task_body);
677         // Initialize implicit arena
678         tbb::parallel_for(0, 1, [] (int) {});
679         tbb::task_arena test_arena2(tbb::task_arena::attach{});
680         test_arena2.enqueue(task_body);
681     }
682 
683     constexpr std::size_t iter_count = 10;
684     for (std::size_t k = 0; k < iter_count; ++k) {
685         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
686         test_arena.enqueue(check_body);
687 
688         while (!end_flag.load(std::memory_order_relaxed)) ;
689 
690         utils::Sleep(1);
691         end_flag.store(false, std::memory_order_relaxed);
692 
693         test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
694             for (std::size_t j = 0; j < task_number; ++j) {
695                 tbb::detail::d1::spawn(test_tasks[j], test_context);
696             }
697 
698             tbb::detail::d1::wait(wait, test_context);
699             wait.reserve(task_number);
700         });
701     }
702     wait.release(task_number);
703 
704 
705     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
706     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
707 }
708