xref: /oneTBB/test/tbb/test_task.cpp (revision 49e08aac)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/spin_barrier.h"
20 #include "common/utils_concurrency_limit.h"
21 #include "common/cpu_usertime.h"
22 
23 #include "tbb/task.h"
24 #include "tbb/task_group.h"
25 #include "tbb/parallel_for.h"
26 #include "tbb/cache_aligned_allocator.h"
27 #include "tbb/global_control.h"
28 #include "tbb/concurrent_vector.h"
29 
30 #include <atomic>
31 #include <thread>
32 #include <thread>
33 
34 //! \file test_task.cpp
35 //! \brief Test for [internal] functionality
36 
37 struct EmptyBody {
38     void operator()() const {}
39 };
40 
41 #if _MSC_VER && !defined(__INTEL_COMPILER)
42 // unreachable code
43 #pragma warning( push )
44 #pragma warning( disable: 4702 )
45 #endif
46 
47 template <typename Body = EmptyBody>
48 class CountingTask : public tbb::detail::d1::task {
49 public:
50     CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
51 
52     CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
53 
54     task* execute( tbb::detail::d1::execution_data& ) override {
55         ++my_execute_counter;
56         my_body();
57         my_wait.release();
58         return nullptr;
59     }
60 
61     task* cancel( tbb::detail::d1::execution_data& ) override {
62         ++my_cancel_counter;
63         my_wait.release();
64         return nullptr;
65     }
66 
67     static void reset() {
68         my_execute_counter = 0;
69         my_cancel_counter = 0;
70     }
71 
72     static std::size_t execute_counter() { return my_execute_counter; }
73     static std::size_t cancel_counter() { return my_cancel_counter; }
74 
75 private:
76     Body my_body;
77     tbb::detail::d1::wait_context& my_wait;
78 
79     static std::atomic<std::size_t> my_execute_counter;
80     static std::atomic<std::size_t> my_cancel_counter;
81 }; // struct CountingTask
82 
83 
84 #if _MSC_VER && !defined(__INTEL_COMPILER)
85 #pragma warning( pop )
86 #endif // warning 4702 is back
87 
88 template <typename Body>
89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
90 
91 template <typename Body>
92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
93 
94 #if TBB_USE_EXCEPTIONS
95 void test_cancellation_on_exception( bool reset_ctx ) {
96     tbb::detail::d1::wait_context wait(1);
97     tbb::task_group_context test_context;
98     auto throw_body = [] {
99         throw 1;
100     };
101     CountingTask<decltype(throw_body)> task(throw_body, wait);
102 
103     constexpr std::size_t iter_counter = 1000;
104     for (std::size_t i = 0; i < iter_counter; ++i) {
105         try {
106             tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
107         } catch(int ex) {
108             REQUIRE(ex == 1);
109         }
110         if (reset_ctx) {
111             test_context.reset();
112         }
113         wait.reserve(1);
114     }
115     wait.release(1);
116 
117     REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
118     REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
119     task.reset();
120 }
121 #endif // TBB_USE_EXCEPTIONS
122 
123 //! \brief \ref error_guessing
124 TEST_CASE("External threads sleep") {
125     if (utils::get_platform_max_threads() < 2) return;
126     utils::SpinBarrier barrier(2);
127 
128     tbb::task_group test_gr;
129 
130     test_gr.run([&] {
131         barrier.wait();
132         TestCPUUserTime(2);
133     });
134 
135     barrier.wait();
136 
137     test_gr.wait();
138 }
139 
140 //! \brief \ref error_guessing
141 TEST_CASE("Test that task was executed p times") {
142     tbb::detail::d1::wait_context wait(1);
143     tbb::task_group_context test_context;
144     CountingTask<> test_task(wait);
145 
146     constexpr std::size_t iter_counter = 10000;
147     for (std::size_t i = 0; i < iter_counter; ++i) {
148         tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
149         wait.reserve(1);
150     }
151 
152     wait.release(1);
153 
154     REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
155     REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
156     CountingTask<>::reset();
157 }
158 
159 #if TBB_USE_EXCEPTIONS
160 //! \brief \ref error_guessing
161 TEST_CASE("Test cancellation on exception") {
162     test_cancellation_on_exception(/*reset_ctx = */true);
163     test_cancellation_on_exception(/*reset_ctx = */false);
164 }
165 #endif // TBB_USE_EXCEPTIONS
166 
167 //! \brief \ref error_guessing
168 TEST_CASE("Simple test parallelism usage") {
169     std::size_t threads_num = utils::get_platform_max_threads();
170     utils::SpinBarrier barrier(threads_num);
171 
172     auto barrier_wait = [&barrier] {
173         barrier.wait();
174     };
175 
176     tbb::detail::d1::wait_context wait(threads_num);
177     tbb::detail::d1::task_group_context test_context;
178     using task_type = CountingTask<decltype(barrier_wait)>;
179 
180     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
181 
182     constexpr std::size_t iter_counter = 100;
183     for (std::size_t i = 0; i < iter_counter; ++i) {
184         for (std::size_t j = 0; j < threads_num; ++j) {
185             tbb::detail::d1::spawn(vector_test_task[j], test_context);
186         }
187         tbb::detail::d1::wait(wait, test_context);
188         wait.reserve(threads_num);
189     }
190     wait.release(threads_num);
191 
192     REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
193     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
194     task_type::reset();
195 }
196 
197 //! \brief \ref error_guessing
198 TEST_CASE("Test parallelism usage with parallel_for") {
199     std::size_t task_threads_num = utils::get_platform_max_threads();
200     utils::SpinBarrier barrier(task_threads_num);
201 
202     auto barrier_wait = [&barrier] {
203         barrier.wait();
204     };
205 
206     std::size_t pfor_iter_count = 10000;
207     std::atomic<std::size_t> pfor_counter(0);
208 
209     auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
210         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
211                           [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
212                               for (auto it = range.begin(); it != range.end(); ++it) {
213                                   ++pfor_counter;
214                               }
215                            }
216         );
217     };
218 
219     tbb::detail::d1::wait_context wait(task_threads_num);
220     tbb::detail::d1::task_group_context test_context;
221     using task_type = CountingTask<decltype(barrier_wait)>;
222     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
223 
224     constexpr std::size_t iter_count = 10;
225     constexpr std::size_t pfor_threads_num = 4;
226     for (std::size_t i = 0; i < iter_count; ++i) {
227         std::vector<std::thread> pfor_threads;
228 
229         for (std::size_t j = 0; j < task_threads_num; ++j) {
230             tbb::detail::d1::spawn(vector_test_task[j], test_context);
231         }
232 
233         for (std::size_t k = 0; k < pfor_threads_num; ++k) {
234             pfor_threads.emplace_back(parallel_for_func);
235         }
236 
237         tbb::detail::d1::wait(wait, test_context);
238 
239         for (auto& thread : pfor_threads) {
240             if (thread.joinable()) {
241                 thread.join();
242             }
243         }
244 
245         wait.reserve(task_threads_num);
246     }
247     wait.release(task_threads_num);
248 
249     REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
250     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
251     REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
252     task_type::reset();
253 }
254 
255 //! \brief \ref error_guessing
256 TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
257     std::size_t threads_num = utils::get_platform_max_threads();
258     utils::SpinBarrier barrier(threads_num);
259 
260     auto barrier_wait = [&barrier] {
261         barrier.wait();
262     };
263 
264     tbb::detail::d1::wait_context wait(threads_num);
265     tbb::detail::d1::task_group_context test_context;
266     using task_type = CountingTask<decltype(barrier_wait)>;
267     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
268 
269     auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
270         tbb::detail::d1::spawn(vector_test_task[idx], test_context);
271     };
272 
273     constexpr std::size_t iter_count = 10;
274     for (std::size_t i = 0; i < iter_count; ++i) {
275         std::vector<std::thread> threads;
276 
277         for (std::size_t k = 0; k < threads_num - 1; ++k) {
278             threads.emplace_back(thread_func, k);
279         }
280 
281         for (auto& thread : threads) {
282             if (thread.joinable()) {
283                 thread.join();
284             }
285         }
286 
287         tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
288         wait.reserve(threads_num);
289     }
290     wait.release(threads_num);
291 
292     REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
293     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
294     task_type::reset();
295 }
296 
297 class SpawningTaskBody;
298 
299 using SpawningTask = CountingTask<SpawningTaskBody>;
300 
301 class SpawningTaskBody {
302 public:
303     using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
304 
305     SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
306         : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
307 
308     void operator()() const {
309         std::size_t delta = 7;
310         std::size_t start_idx = my_current_task.fetch_add(delta);
311 
312         if (start_idx < my_task_pool.size()) {
313             for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
314                 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
315             }
316         }
317     }
318 private:
319     task_pool_type& my_task_pool;
320     tbb::task_group_context& my_test_ctx;
321     static std::atomic<std::size_t> my_current_task;
322 }; // class SpawningTaskBody
323 
324 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
325 
326 //! \brief \ref error_guessing
327 TEST_CASE("Actively adding tasks") {
328     std::size_t task_number = 500 * utils::get_platform_max_threads();
329 
330     tbb::detail::d1::wait_context wait(task_number + 1);
331     tbb::task_group_context test_context;
332 
333     SpawningTaskBody::task_pool_type task_pool;
334 
335     SpawningTaskBody task_body{task_pool, test_context};
336     for (std::size_t i = 0; i < task_number; ++i) {
337         task_pool.emplace_back(task_body, wait);
338     }
339 
340     SpawningTask first_task(task_body, wait);
341     tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
342 
343     REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
344     REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
345 }
346 
347 #if __TBB_RESUMABLE_TASKS
348 
349 struct suspended_task : public tbb::detail::d1::task {
350 
351     suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
352         : my_suspend_tag(tag), my_wait(wait)
353     {}
354 
355     task* execute(tbb::detail::d1::execution_data&) override {
356         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
357             [] (const tbb::blocked_range<std::size_t>& range) {
358                 // Make some heavy work
359                 std::atomic<int> sum{};
360                 for (auto it = range.begin(); it != range.end(); ++it) {
361                     ++sum;
362                 }
363             },
364             tbb::static_partitioner{}
365         );
366 
367         my_wait.release();
368         tbb::task::resume(my_suspend_tag);
369         return nullptr;
370     }
371 
372     task* cancel(tbb::detail::d1::execution_data&) override {
373         FAIL("The function should never be called.");
374         return nullptr;
375     }
376 
377     tbb::task::suspend_point my_suspend_tag;
378     tbb::detail::d1::wait_context& my_wait;
379 };
380 
381 //! \brief \ref error_guessing
382 TEST_CASE("Isolation + resumable tasks") {
383     std::atomic<int> suspend_flag{};
384     tbb::task_group_context test_context;
385 
386     std::atomic<int> suspend_count{};
387     std::atomic<int> resume_count{};
388 
389     tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
390         [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
391             int ticket = 0;
392             for (auto it = range.begin(); it != range.end(); ++it) {
393                 ticket = suspend_flag++;
394             }
395 
396             if (ticket % 5 == 0) {
397                 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
398                 tbb::detail::d1::wait_context wait(1);
399                 ++suspend_count;
400                 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
401                     tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) {
402                         test_task.emplace_back(tag, wait);
403                         tbb::detail::d1::spawn(test_task[0], test_context);
404                     });
405                     }
406                 );
407                 tbb::detail::d1::wait(wait, test_context);
408                 ++resume_count;
409             }
410         }
411     );
412 
413     CHECK(suspend_count == resume_count);
414 }
415 
416 struct bypass_task : public tbb::detail::d1::task {
417     using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
418 
419     bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
420                 std::atomic<bool>& resume_flag, tbb::task::suspend_point& suspend_tag)
421         : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
422     {}
423 
424     task* execute(tbb::detail::d1::execution_data&) override {
425         std::atomic<int> sum{};
426 
427         // Make some heavy work
428         for (std::size_t i = 0; i < 100000; ++i) {
429             ++sum;
430         }
431 
432         my_wait.release();
433 
434         if (my_resume_flag.exchange(false)) {
435             tbb::task::resume(my_suspend_tag);
436         }
437 
438         std::size_t ticket = my_current_task++;
439         return ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
440     }
441 
442     task* cancel(tbb::detail::d1::execution_data&) override {
443         FAIL("The function should never be called.");
444         return nullptr;
445     }
446 
447     tbb::detail::d1::wait_context& my_wait;
448     task_pool_type& my_task_pool;
449     std::atomic<bool>& my_resume_flag;
450     tbb::task::suspend_point& my_suspend_tag;
451     static std::atomic<int> my_current_task;
452 };
453 
454 std::atomic<int> bypass_task::my_current_task(0);
455 
456 thread_local int test_tls = 0;
457 
458 //! \brief \ref error_guessing
459 TEST_CASE("Bypass suspended by resume") {
460     std::size_t task_number = 500 * utils::get_platform_max_threads();
461     tbb::task_group_context test_context;
462     tbb::detail::d1::wait_context wait(task_number + 1);
463 
464     test_tls = 1;
465 
466     std::atomic<bool> resume_flag{false};
467     tbb::task::suspend_point test_suspend_tag;
468 
469     std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
470 
471     for (std::size_t i = 0; i < task_number; ++i) {
472         test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
473     }
474 
475     for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
476         tbb::detail::d1::spawn(test_task_pool[bypass_task::my_current_task++], test_context);
477     }
478 
479     auto suspend_func = [&resume_flag, &test_suspend_tag] {
480         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
481             test_suspend_tag = tag;
482             resume_flag.store(true, std::memory_order_release);
483         });
484     };
485     using task_type = CountingTask<decltype(suspend_func)>;
486     task_type suspend_task(suspend_func, wait);
487 
488     tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
489     CHECK(bypass_task::my_current_task >= test_task_pool.size());
490     REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
491 }
492 
493 //! \brief \ref error_guessing
494 TEST_CASE("Critical tasks + resume") {
495     std::size_t task_number = 500 * utils::get_platform_max_threads();
496 
497     tbb::task_group_context test_context;
498     tbb::detail::d1::wait_context wait(task_number);
499 
500     tbb::task_arena test_arena;
501 
502     test_arena.initialize();
503 
504     std::atomic<bool> resume_flag{};
505     tbb::task::suspend_point test_suspend_tag;
506 
507     auto task_body = [&resume_flag, &test_suspend_tag] {
508         std::atomic<int> sum{};
509 
510         // Make some work
511         for (std::size_t i = 0; i < 1000; ++i) {
512             ++sum;
513         }
514 
515         if (resume_flag.exchange(false)) {
516             tbb::task::resume(test_suspend_tag);
517         }
518     };
519 
520     using task_type = CountingTask<decltype(task_body)>;
521     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
522 
523     for (std::size_t i = 0; i < task_number; ++i) {
524         test_tasks.emplace_back(task_body, wait);
525     }
526 
527     for (std::size_t i = 0; i < task_number / 2; ++i) {
528         submit(test_tasks[i], test_arena, test_context, true);
529     }
530 
531     auto suspend_func = [&resume_flag, &test_suspend_tag] {
532         tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) {
533             test_suspend_tag = tag;
534             resume_flag.store(true, std::memory_order_release);
535         });
536     };
537     using suspend_task_type = CountingTask<decltype(suspend_func)>;
538     suspend_task_type suspend_task(suspend_func, wait);
539 
540     submit(suspend_task, test_arena, test_context, true);
541 
542     test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] {
543     tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] {
544         tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1),
545             [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
546                 for (std::size_t i = range.begin(); i != range.end(); ++i) {
547                     submit(test_tasks[i], test_arena, test_context, true);
548                 }
549             });
550         });
551     });
552 
553     tbb::detail::d1::wait(wait, test_context);
554 }
555 
556 //! \brief \ref error_guessing
557 TEST_CASE("Stress testing") {
558     std::size_t task_number = utils::get_platform_max_threads();
559 
560     tbb::task_group_context test_context;
561     tbb::detail::d1::wait_context wait(task_number);
562 
563     tbb::task_arena test_arena;
564 
565     test_arena.initialize();
566 
567     auto task_body = [] {
568         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
569             std::atomic<int> sum{};
570             // Make some work
571             for (std::size_t i = 0; i < 100; ++i) {
572                 ++sum;
573             }
574         });
575     };
576     using task_type = CountingTask<decltype(task_body)>;
577 
578     std::size_t iter_counter = 20;
579 
580     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
581 
582     for (std::size_t j = 0; j < task_number; ++j) {
583         test_tasks.emplace_back(task_body, wait);
584     }
585 
586     test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
587         for (std::size_t i = 0; i < iter_counter; ++i) {
588 
589             for (std::size_t j = 0; j < task_number; ++j) {
590                 test_arena.enqueue(task_body);
591             }
592 
593             for (std::size_t j = 0; j < task_number / 2; ++j) {
594                 tbb::detail::d1::spawn(test_tasks[j], test_context);
595             }
596 
597             for (std::size_t j = task_number / 2; j < task_number; ++j) {
598                 submit(test_tasks[j], test_arena, test_context, true);
599             }
600 
601             tbb::detail::d1::wait(wait, test_context);
602             wait.reserve(task_number);
603         }
604         wait.release(task_number);
605     });
606 
607 
608     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
609     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
610 }
611 
612 //! \brief \ref error_guessing
613 TEST_CASE("All workers sleep") {
614     std::size_t thread_number = utils::get_platform_max_threads();
615     tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
616 
617     tbb::task_group test_gr;
618 
619     utils::SpinBarrier barrier(thread_number);
620     auto resumble_task = [&] {
621         barrier.wait();
622         tbb::task::suspend([&] (tbb::task::suspend_point sp) {
623             suspend_points.push_back(sp);
624             barrier.wait();
625         });
626     };
627 
628     for (std::size_t i = 0; i < thread_number - 1; ++i) {
629         test_gr.run(resumble_task);
630     }
631 
632     barrier.wait();
633     barrier.wait();
634     TestCPUUserTime(thread_number);
635 
636     for (auto sp : suspend_points)
637         tbb::task::resume(sp);
638     test_gr.wait();
639 }
640 
641 #endif // __TBB_RESUMABLE_TASKS
642 
643 //! \brief \ref error_guessing
644 TEST_CASE("Enqueue with exception") {
645     std::size_t task_number = 500 * utils::get_platform_max_threads();
646 
647     tbb::task_group_context test_context;
648     tbb::detail::d1::wait_context wait(task_number);
649 
650     tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
651 
652     test_arena.initialize();
653 
654     auto task_body = [] {
655         std::atomic<int> sum{};
656         // Make some work
657         for (std::size_t i = 0; i < 100; ++i) {
658             ++sum;
659         }
660     };
661 
662     std::atomic<bool> end_flag{false};
663     auto check_body = [&end_flag] {
664         end_flag.store(true, std::memory_order_relaxed);
665     };
666 
667     using task_type = CountingTask<decltype(task_body)>;
668     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
669 
670     for (std::size_t j = 0; j < task_number; ++j) {
671         test_tasks.emplace_back(task_body, wait);
672     }
673 
674     {
675         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
676         test_arena.enqueue(task_body);
677         // Initialize implicit arena
678         tbb::parallel_for(0, 1, [] (int) {});
679         tbb::task_arena test_arena2(tbb::task_arena::attach{});
680         test_arena2.enqueue(task_body);
681     }
682 
683     constexpr std::size_t iter_count = 10;
684     for (std::size_t k = 0; k < iter_count; ++k) {
685         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
686         test_arena.enqueue(check_body);
687 
688         while (!end_flag.load(std::memory_order_relaxed)) ;
689 
690         utils::Sleep(1);
691         end_flag.store(false, std::memory_order_relaxed);
692 
693         test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
694             for (std::size_t j = 0; j < task_number; ++j) {
695                 tbb::detail::d1::spawn(test_tasks[j], test_context);
696             }
697 
698             tbb::detail::d1::wait(wait, test_context);
699             wait.reserve(task_number);
700         });
701     }
702     wait.release(task_number);
703 
704 
705     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
706     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
707 }
708