1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "common/dummy_body.h"
20 #include "common/spin_barrier.h"
21 #include "common/utils_concurrency_limit.h"
22 #include "common/cpu_usertime.h"
23
24 #include "tbb/task.h"
25 #include "tbb/task_group.h"
26 #include "tbb/parallel_for.h"
27 #include "tbb/cache_aligned_allocator.h"
28 #include "tbb/global_control.h"
29 #include "tbb/concurrent_vector.h"
30
31 #include <atomic>
32 #include <thread>
33 #include <thread>
34
35 //! \file test_task.cpp
36 //! \brief Test for [internal] functionality
37 struct EmptyBody {
operator ()EmptyBody38 void operator()() const {}
39 };
40
41 #if _MSC_VER && !defined(__INTEL_COMPILER)
42 // unreachable code
43 #pragma warning( push )
44 #pragma warning( disable: 4702 )
45 #endif
46
47 template <typename Body = EmptyBody>
48 class CountingTask : public tbb::detail::d1::task {
49 public:
CountingTask(Body body,tbb::detail::d1::wait_context & wait)50 CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
51
CountingTask(tbb::detail::d1::wait_context & wait)52 CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
53
execute(tbb::detail::d1::execution_data &)54 task* execute( tbb::detail::d1::execution_data& ) override {
55 ++my_execute_counter;
56 my_body();
57 my_wait.release();
58 return nullptr;
59 }
60
cancel(tbb::detail::d1::execution_data &)61 task* cancel( tbb::detail::d1::execution_data& ) override {
62 ++my_cancel_counter;
63 my_wait.release();
64 return nullptr;
65 }
66
reset()67 static void reset() {
68 my_execute_counter = 0;
69 my_cancel_counter = 0;
70 }
71
execute_counter()72 static std::size_t execute_counter() { return my_execute_counter; }
cancel_counter()73 static std::size_t cancel_counter() { return my_cancel_counter; }
74
75 private:
76 Body my_body;
77 tbb::detail::d1::wait_context& my_wait;
78
79 static std::atomic<std::size_t> my_execute_counter;
80 static std::atomic<std::size_t> my_cancel_counter;
81 }; // struct CountingTask
82
83
84 #if _MSC_VER && !defined(__INTEL_COMPILER)
85 #pragma warning( pop )
86 #endif // warning 4702 is back
87
88 template <typename Body>
89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
90
91 template <typename Body>
92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
93
94 #if TBB_USE_EXCEPTIONS
test_cancellation_on_exception(bool reset_ctx)95 void test_cancellation_on_exception( bool reset_ctx ) {
96 tbb::detail::d1::wait_context wait(1);
97 tbb::task_group_context test_context;
98 auto throw_body = [] {
99 throw 1;
100 };
101 CountingTask<decltype(throw_body)> task(throw_body, wait);
102
103 constexpr std::size_t iter_counter = 1000;
104 for (std::size_t i = 0; i < iter_counter; ++i) {
105 try {
106 tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
107 } catch(int ex) {
108 REQUIRE(ex == 1);
109 }
110 if (reset_ctx) {
111 test_context.reset();
112 }
113 wait.reserve(1);
114 }
115 wait.release(1);
116
117 REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
118 REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
119 task.reset();
120 }
121 #endif // TBB_USE_EXCEPTIONS
122
123 //! \brief \ref error_guessing
124 TEST_CASE("External threads sleep") {
125 if (utils::get_platform_max_threads() < 2) return;
126 utils::SpinBarrier barrier(2);
127
128 tbb::task_group test_gr;
129
__anon509d20bd0202null130 test_gr.run([&] {
131 barrier.wait();
132 TestCPUUserTime(2);
133 });
134
135 barrier.wait();
136
137 test_gr.wait();
138 }
139
140 //! \brief \ref error_guessing
141 TEST_CASE("Test that task was executed p times") {
142 tbb::detail::d1::wait_context wait(1);
143 tbb::task_group_context test_context;
144 CountingTask<> test_task(wait);
145
146 constexpr std::size_t iter_counter = 10000;
147 for (std::size_t i = 0; i < iter_counter; ++i) {
148 tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
149 wait.reserve(1);
150 }
151
152 wait.release(1);
153
154 REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
155 REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
156 CountingTask<>::reset();
157 }
158
159 #if TBB_USE_EXCEPTIONS
160 //! \brief \ref error_guessing
161 TEST_CASE("Test cancellation on exception") {
162 test_cancellation_on_exception(/*reset_ctx = */true);
163 test_cancellation_on_exception(/*reset_ctx = */false);
164 }
165 #endif // TBB_USE_EXCEPTIONS
166
167 //! \brief \ref error_guessing
168 TEST_CASE("Simple test parallelism usage") {
169 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
170 utils::SpinBarrier barrier(threads_num);
171
__anon509d20bd0302null172 auto barrier_wait = [&barrier] {
173 barrier.wait();
174 };
175
176 tbb::detail::d1::wait_context wait(threads_num);
177 tbb::detail::d1::task_group_context test_context;
178 using task_type = CountingTask<decltype(barrier_wait)>;
179
180 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
181
182 constexpr std::size_t iter_counter = 100;
183 for (std::size_t i = 0; i < iter_counter; ++i) {
184 for (std::size_t j = 0; j < threads_num; ++j) {
185 tbb::detail::d1::spawn(vector_test_task[j], test_context);
186 }
187 tbb::detail::d1::wait(wait, test_context);
188 wait.reserve(threads_num);
189 }
190 wait.release(threads_num);
191
192 REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
193 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
194 task_type::reset();
195 }
196
197 //! \brief \ref error_guessing
198 TEST_CASE("Test parallelism usage with parallel_for") {
199 std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
200 utils::SpinBarrier barrier(task_threads_num);
201
__anon509d20bd0402null202 auto barrier_wait = [&barrier] {
203 barrier.wait();
204 };
205
206 std::size_t pfor_iter_count = 10000;
207 std::atomic<std::size_t> pfor_counter(0);
208
__anon509d20bd0502null209 auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
210 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
211 [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
212 for (auto it = range.begin(); it != range.end(); ++it) {
213 ++pfor_counter;
214 }
215 }
216 );
217 };
218
219 tbb::detail::d1::wait_context wait(task_threads_num);
220 tbb::detail::d1::task_group_context test_context;
221 using task_type = CountingTask<decltype(barrier_wait)>;
222 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
223
224 constexpr std::size_t iter_count = 10;
225 constexpr std::size_t pfor_threads_num = 4;
226 for (std::size_t i = 0; i < iter_count; ++i) {
227 std::vector<std::thread> pfor_threads;
228
229 for (std::size_t j = 0; j < task_threads_num; ++j) {
230 tbb::detail::d1::spawn(vector_test_task[j], test_context);
231 }
232
233 for (std::size_t k = 0; k < pfor_threads_num; ++k) {
234 pfor_threads.emplace_back(parallel_for_func);
235 }
236
237 tbb::detail::d1::wait(wait, test_context);
238
239 for (auto& thread : pfor_threads) {
240 if (thread.joinable()) {
241 thread.join();
242 }
243 }
244
245 wait.reserve(task_threads_num);
246 }
247 wait.release(task_threads_num);
248
249 REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
250 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
251 REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
252 task_type::reset();
253 }
254
255 //! \brief \ref error_guessing
256 TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
257 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
258 utils::SpinBarrier barrier(threads_num);
259
__anon509d20bd0702null260 auto barrier_wait = [&barrier] {
261 barrier.wait();
262 };
263
264 tbb::detail::d1::wait_context wait(threads_num);
265 tbb::detail::d1::task_group_context test_context;
266 using task_type = CountingTask<decltype(barrier_wait)>;
267 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
268
__anon509d20bd0802( std::size_t idx ) 269 auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
270 tbb::detail::d1::spawn(vector_test_task[idx], test_context);
271 };
272
273 constexpr std::size_t iter_count = 10;
274 for (std::size_t i = 0; i < iter_count; ++i) {
275 std::vector<std::thread> threads;
276
277 for (std::size_t k = 0; k < threads_num - 1; ++k) {
278 threads.emplace_back(thread_func, k);
279 }
280
281 for (auto& thread : threads) {
282 if (thread.joinable()) {
283 thread.join();
284 }
285 }
286
287 tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
288 wait.reserve(threads_num);
289 }
290 wait.release(threads_num);
291
292 REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
293 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
294 task_type::reset();
295 }
296
297 class SpawningTaskBody;
298
299 using SpawningTask = CountingTask<SpawningTaskBody>;
300
301 class SpawningTaskBody {
302 public:
303 using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
304
SpawningTaskBody(task_pool_type & task_pool,tbb::task_group_context & test_ctx)305 SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
306 : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
307
operator ()() const308 void operator()() const {
309 std::size_t delta = 7;
310 std::size_t start_idx = my_current_task.fetch_add(delta);
311
312 if (start_idx < my_task_pool.size()) {
313 for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
314 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
315 }
316 }
317 }
318 private:
319 task_pool_type& my_task_pool;
320 tbb::task_group_context& my_test_ctx;
321 static std::atomic<std::size_t> my_current_task;
322 }; // class SpawningTaskBody
323
324 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
325
326 //! \brief \ref error_guessing
327 TEST_CASE("Actively adding tasks") {
328 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
329
330 tbb::detail::d1::wait_context wait(task_number + 1);
331 tbb::task_group_context test_context;
332
333 SpawningTaskBody::task_pool_type task_pool;
334
335 SpawningTaskBody task_body{task_pool, test_context};
336 for (std::size_t i = 0; i < task_number; ++i) {
337 task_pool.emplace_back(task_body, wait);
338 }
339
340 SpawningTask first_task(task_body, wait);
341 tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
342
343 REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
344 REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
345 }
346
347 #if __TBB_RESUMABLE_TASKS
348 struct suspended_task : public tbb::detail::d1::task {
349
suspended_tasksuspended_task350 suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
351 : my_suspend_tag(tag), my_wait(wait)
352 {}
353
executesuspended_task354 task* execute(tbb::detail::d1::execution_data&) override {
355 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
356 [] (const tbb::blocked_range<std::size_t>& range) {
357 // Make some heavy work
358 std::atomic<int> sum{};
359 for (auto it = range.begin(); it != range.end(); ++it) {
360 ++sum;
361 }
362 },
363 tbb::static_partitioner{}
364 );
365
366 my_wait.release();
367 tbb::task::resume(my_suspend_tag);
368 return nullptr;
369 }
370
cancelsuspended_task371 task* cancel(tbb::detail::d1::execution_data&) override {
372 FAIL("The function should never be called.");
373 return nullptr;
374 }
375
376 tbb::task::suspend_point my_suspend_tag;
377 tbb::detail::d1::wait_context& my_wait;
378 };
379
380 //! \brief \ref error_guessing
381 TEST_CASE("Isolation + resumable tasks") {
382 std::atomic<int> suspend_flag{};
383 tbb::task_group_context test_context;
384
385 std::atomic<int> suspend_count{};
386 std::atomic<int> resume_count{};
387
388 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
__anon509d20bd0b02(const tbb::blocked_range<std::size_t>& range) 389 [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
390 int ticket = 0;
391 for (auto it = range.begin(); it != range.end(); ++it) {
392 ticket = suspend_flag++;
393 }
394
395 if (ticket % 5 == 0) {
396 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
397 tbb::detail::d1::wait_context wait(1);
398 ++suspend_count;
399 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
400 auto thread_id = std::this_thread::get_id();
401 tbb::task::suspend([&wait, &test_context, &test_task, thread_id] (tbb::task::suspend_point tag) {
402 CHECK(thread_id == std::this_thread::get_id());
403 test_task.emplace_back(tag, wait);
404 tbb::detail::d1::spawn(test_task[0], test_context);
405 });
406 }
407 );
408 tbb::detail::d1::wait(wait, test_context);
409 ++resume_count;
410 }
411 }
412 );
413
414 CHECK(suspend_count == resume_count);
415 }
416
417 struct bypass_task : public tbb::detail::d1::task {
418 using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
419
bypass_taskbypass_task420 bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
421 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag)
422 : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
423 {}
424
executebypass_task425 task* execute(tbb::detail::d1::execution_data&) override {
426 utils::doDummyWork(10000);
427
428 int expected = 1;
429 if (my_resume_flag.compare_exchange_strong(expected, 2)) {
430 tbb::task::resume(my_suspend_tag);
431 }
432
433 std::size_t ticket = my_current_task++;
434 task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
435
436 if (!next && my_resume_flag != 2) {
437 // Rarely all tasks can be executed before the suspend.
438 // So, wait for the suspend before leaving.
439 utils::SpinWaitWhileEq(my_resume_flag, 0);
440 expected = 1;
441 if (my_resume_flag.compare_exchange_strong(expected, 2)) {
442 tbb::task::resume(my_suspend_tag);
443 }
444 }
445
446 my_wait.release();
447 return next;
448 }
449
cancelbypass_task450 task* cancel(tbb::detail::d1::execution_data&) override {
451 FAIL("The function should never be called.");
452 return nullptr;
453 }
454
455 tbb::detail::d1::wait_context& my_wait;
456 task_pool_type& my_task_pool;
457 std::atomic<int>& my_resume_flag;
458 tbb::task::suspend_point& my_suspend_tag;
459 static std::atomic<int> my_current_task;
460 };
461
462 std::atomic<int> bypass_task::my_current_task(0);
463
464 thread_local int test_tls = 0;
465
466 //! \brief \ref error_guessing
467 TEST_CASE("Bypass suspended by resume") {
468 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
469 tbb::task_group_context test_context;
470 tbb::detail::d1::wait_context wait(task_number + 1);
471
472 test_tls = 1;
473
474 std::atomic<int> resume_flag{0};
475 tbb::task::suspend_point test_suspend_tag;
476
477 std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
478
479 for (std::size_t i = 0; i < task_number; ++i) {
480 test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
481 }
482
483 for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
484 std::size_t ticket = bypass_task::my_current_task++;
485 if (ticket < test_task_pool.size()) {
486 tbb::detail::d1::spawn(test_task_pool[ticket], test_context);
487 }
488 }
489
__anon509d20bd0e02null490 auto suspend_func = [&resume_flag, &test_suspend_tag] {
491 auto thread_id = std::this_thread::get_id();
492 tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
493 CHECK(thread_id == std::this_thread::get_id());
494 test_suspend_tag = tag;
495 resume_flag = 1;
496 });
497 };
498 using task_type = CountingTask<decltype(suspend_func)>;
499 task_type suspend_task(suspend_func, wait);
500
501 tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
502 CHECK(bypass_task::my_current_task >= test_task_pool.size());
503 REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
504 }
505
506 //! \brief \ref error_guessing
507 TEST_CASE("Critical tasks + resume") {
508 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
509
510 tbb::task_group_context test_context;
511 tbb::detail::d1::wait_context wait{ 0 };
512
513 // The test expects at least one thread in test_arena
514 int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads()));
515 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena);
516 tbb::task_arena test_arena(num_threads_in_test_arena);
517
518 test_arena.initialize();
519
520 std::atomic<bool> resume_flag{}, resumed{};
521 tbb::task::suspend_point test_suspend_tag;
522
__anon509d20bd1002null523 auto task_body = [&resume_flag, &resumed, &test_suspend_tag] {
524 // Make some work
525 utils::doDummyWork(1000);
526
527 if (resume_flag.exchange(false)) {
528 tbb::task::resume(test_suspend_tag);
529 resumed = true;
530 }
531 };
532
533 using task_type = CountingTask<decltype(task_body)>;
534 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
535
536 for (std::size_t i = 0; i < task_number; ++i) {
537 test_tasks.emplace_back(task_body, wait);
538 }
539
540 wait.reserve(task_number / 2);
541 for (std::size_t i = 0; i < task_number / 2; ++i) {
542 submit(test_tasks[i], test_arena, test_context, true);
543 }
544
__anon509d20bd1102null545 auto suspend_func = [&resume_flag, &test_suspend_tag] {
546 auto thread_id = std::this_thread::get_id();
547 tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
548 CHECK(thread_id == std::this_thread::get_id());
549 test_suspend_tag = tag;
550 resume_flag.store(true, std::memory_order_release);
551 });
552 };
553 using suspend_task_type = CountingTask<decltype(suspend_func)>;
554 suspend_task_type suspend_task(suspend_func, wait);
555
556 wait.reserve(1);
557 submit(suspend_task, test_arena, test_context, true);
558
__anon509d20bd1302null559 test_arena.execute([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
560 tbb::this_task_arena::isolate([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
561 do {
562 wait.reserve(task_number / 2);
563 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number),
564 [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
565 for (std::size_t i = range.begin(); i != range.end(); ++i) {
566 submit(test_tasks[i], test_arena, test_context, true);
567 }
568 }
569 );
570 } while (!resumed);
571 });
572 });
573
__anon509d20bd1602null574 test_arena.execute([&wait, &test_context] {
575 tbb::detail::d1::wait(wait, test_context);
576 });
577 }
578
579 //! \brief \ref error_guessing
580 TEST_CASE("Stress testing") {
581 std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
582
583 tbb::task_group_context test_context;
584 tbb::detail::d1::wait_context wait(task_number);
585
586 tbb::task_arena test_arena;
587
588 test_arena.initialize();
589
__anon509d20bd1702null590 auto task_body = [] {
591 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
592 utils::doDummyWork(100);
593 });
594 };
595 using task_type = CountingTask<decltype(task_body)>;
596
597 std::size_t iter_counter = 20;
598
599 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
600
601 for (std::size_t j = 0; j < task_number; ++j) {
602 test_tasks.emplace_back(task_body, wait);
603 }
604
__anon509d20bd1902null605 test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
606 for (std::size_t i = 0; i < iter_counter; ++i) {
607
608 for (std::size_t j = 0; j < task_number; ++j) {
609 test_arena.enqueue(task_body);
610 }
611
612 for (std::size_t j = 0; j < task_number / 2; ++j) {
613 tbb::detail::d1::spawn(test_tasks[j], test_context);
614 }
615
616 for (std::size_t j = task_number / 2; j < task_number; ++j) {
617 submit(test_tasks[j], test_arena, test_context, true);
618 }
619
620 tbb::detail::d1::wait(wait, test_context);
621 wait.reserve(task_number);
622 }
623 wait.release(task_number);
624 });
625
626
627 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
628 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
629 }
630
631 //! \brief \ref error_guessing
632 TEST_CASE("All workers sleep") {
633 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
634 tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
635
636 tbb::task_group test_gr;
637
638 utils::SpinBarrier barrier(thread_number);
__anon509d20bd1a02null639 auto resumble_task = [&] {
640 barrier.wait();
641 auto thread_id = std::this_thread::get_id();
642 tbb::task::suspend([&] (tbb::task::suspend_point sp) {
643 CHECK(thread_id == std::this_thread::get_id());
644 suspend_points.push_back(sp);
645 barrier.wait();
646 });
647 };
648
649 for (std::size_t i = 0; i < thread_number - 1; ++i) {
650 test_gr.run(resumble_task);
651 }
652
653 barrier.wait();
654 barrier.wait();
655 TestCPUUserTime(thread_number);
656
657 for (auto sp : suspend_points)
658 tbb::task::resume(sp);
659 test_gr.wait();
660 }
661
662 #endif // __TBB_RESUMABLE_TASKS
663
664 //! \brief \ref error_guessing
665 TEST_CASE("Enqueue with exception") {
666 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
667
668 tbb::task_group_context test_context;
669 tbb::detail::d1::wait_context wait(task_number);
670
671 tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
672
673 test_arena.initialize();
674
__anon509d20bd1c02null675 auto task_body = [] {
676 utils::doDummyWork(100);
677 };
678
679 std::atomic<bool> end_flag{false};
__anon509d20bd1d02null680 auto check_body = [&end_flag] {
681 end_flag.store(true, std::memory_order_relaxed);
682 };
683
684 using task_type = CountingTask<decltype(task_body)>;
685 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
686
687 for (std::size_t j = 0; j < task_number; ++j) {
688 test_tasks.emplace_back(task_body, wait);
689 }
690
691 {
692 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
693 test_arena.enqueue(task_body);
694 // Initialize implicit arena
__anon509d20bd1e02(int) 695 tbb::parallel_for(0, 1, [] (int) {});
696 tbb::task_arena test_arena2(tbb::task_arena::attach{});
697 test_arena2.enqueue(task_body);
698 }
699
700 constexpr std::size_t iter_count = 10;
701 for (std::size_t k = 0; k < iter_count; ++k) {
702 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
703 test_arena.enqueue(check_body);
704
705 while (!end_flag.load(std::memory_order_relaxed)) ;
706
707 utils::Sleep(1);
708 end_flag.store(false, std::memory_order_relaxed);
709
__anon509d20bd1f02null710 test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
711 for (std::size_t j = 0; j < task_number; ++j) {
712 tbb::detail::d1::spawn(test_tasks[j], test_context);
713 }
714
715 tbb::detail::d1::wait(wait, test_context);
716 wait.reserve(task_number);
717 });
718 }
719 wait.release(task_number);
720
721
722 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
723 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
724 }
725
726 struct resubmitting_task : public tbb::detail::d1::task {
727 tbb::task_arena& my_arena;
728 tbb::task_group_context& my_ctx;
729 std::atomic<int> counter{100000};
730
resubmitting_taskresubmitting_task731 resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx)
732 {}
733
executeresubmitting_task734 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override {
735 if (counter-- > 0) {
736 submit(*this, my_arena, my_ctx, true);
737 }
738 return nullptr;
739 }
740
cancelresubmitting_task741 tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override {
742 FAIL("The function should never be called.");
743 return nullptr;
744 }
745 };
746
747 //! \brief \ref error_guessing
748 TEST_CASE("Test with priority inversion") {
749 if (!utils::can_change_thread_priority()) return;
750
751 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
752 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1);
753
754 tbb::task_arena test_arena(2 * thread_number, thread_number);
755 test_arena.initialize();
756 utils::pinning_observer obsr(test_arena);
757 CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated");
758
759 std::uint32_t critical_task_counter = 1000 * thread_number;
760 std::atomic<std::size_t> task_counter{0};
761
762 tbb::task_group_context test_context;
763 tbb::detail::d1::wait_context wait(critical_task_counter);
764
__anon509d20bd2002null765 auto critical_work = [&] {
766 utils::doDummyWork(10);
767 };
768
769 using suspend_task_type = CountingTask<decltype(critical_work)>;
770 suspend_task_type critical_task(critical_work, wait);
771
__anon509d20bd2102null772 auto high_priority_thread_func = [&] {
773 // Increase external threads priority
774 utils::increased_priority_guard guard{};
775 utils::suppress_unused_warning(guard);
776 // pin external threads
777 test_arena.execute([]{});
778 while (task_counter++ < critical_task_counter) {
779 submit(critical_task, test_arena, test_context, true);
780 std::this_thread::sleep_for(std::chrono::milliseconds(1));
781 }
782 };
783
784 resubmitting_task worker_task(test_arena, test_context);
785 // warm up
786 // take first core on execute
787 utils::SpinBarrier barrier(thread_number + 1);
__anon509d20bd2302null788 test_arena.execute([&] {
789 tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t) {
790 barrier.wait();
791 submit(worker_task, test_arena, test_context, true);
792 });
793 });
794
795 std::vector<std::thread> high_priority_threads;
796 for (std::size_t i = 0; i < thread_number - 1; ++i) {
797 high_priority_threads.emplace_back(high_priority_thread_func);
798 }
799
800 utils::increased_priority_guard guard{};
801 utils::suppress_unused_warning(guard);
802 while (task_counter++ < critical_task_counter) {
803 submit(critical_task, test_arena, test_context, true);
804 std::this_thread::sleep_for(std::chrono::milliseconds(1));
805 }
806
807 tbb::detail::d1::wait(wait, test_context);
808
809 for (std::size_t i = 0; i < thread_number - 1; ++i) {
810 high_priority_threads[i].join();
811 }
812 }
813
814 // Explicit test for raii_guard move ctor because of copy elision optimization
815 // TODO: consider better test file for the test case
816 //! \brief \ref interface
817 TEST_CASE("raii_guard move ctor") {
818 int count{0};
__anon509d20bd2502null819 auto func = [&count] {
820 count++;
821 CHECK(count == 1);
822 };
823
824 tbb::detail::d0::raii_guard<decltype(func)> guard1(func);
825 tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1));
826 }
827
828 //! \brief \ref error_guessing
829 TEST_CASE("Check correct arena destruction with enqueue") {
830 for (int i = 0; i < 100; ++i) {
831 tbb::task_scheduler_handle handle{ tbb::attach{} };
832 {
833 tbb::task_arena a(2, 0);
834
__anon509d20bd2602null835 a.enqueue([] {
836 tbb::parallel_for(0, 100, [] (int) { std::this_thread::sleep_for(std::chrono::nanoseconds(10)); });
837 });
838 std::this_thread::sleep_for(std::chrono::microseconds(1));
839 }
840 tbb::finalize(handle, std::nothrow_t{});
841 }
842 }
843