1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/test.h"
18
19 #define __TBB_EXTRA_DEBUG 1
20 #include "common/concurrency_tracker.h"
21 #include "common/cpu_usertime.h"
22 #include "common/spin_barrier.h"
23 #include "common/utils.h"
24 #include "common/utils_report.h"
25 #include "common/utils_concurrency_limit.h"
26
27 #include "tbb/task_arena.h"
28 #include "tbb/task_scheduler_observer.h"
29 #include "tbb/enumerable_thread_specific.h"
30 #include "tbb/parallel_for.h"
31 #include "tbb/global_control.h"
32 #include "tbb/concurrent_set.h"
33 #include "tbb/spin_mutex.h"
34 #include "tbb/spin_rw_mutex.h"
35 #include "tbb/task_group.h"
36
37 #include <atomic>
38 #include <condition_variable>
39 #include <cstdio>
40 #include <cstdlib>
41 #include <stdexcept>
42 #include <thread>
43 #include <vector>
44
45 //#include "harness_fp.h"
46
47 //! \file test_task_arena.cpp
48 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification
49
50 //--------------------------------------------------//
51 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
52 /* maxthread is treated as the biggest possible concurrency level. */
InitializeAndTerminate(int maxthread)53 void InitializeAndTerminate( int maxthread ) {
54 for( int i=0; i<200; ++i ) {
55 switch( i&3 ) {
56 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
57 // Explicit initialization can either keep the original values or change those.
58 // Arena termination can be explicit or implicit (in the destructor).
59 // TODO: extend with concurrency level checks if such a method is added.
60 default: {
61 tbb::task_arena arena( std::rand() % maxthread + 1 );
62 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
63 arena.initialize();
64 CHECK(arena.is_active());
65 arena.terminate();
66 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
67 break;
68 }
69 case 0: {
70 tbb::task_arena arena( 1 );
71 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
72 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
73 CHECK(arena.is_active());
74 break;
75 }
76 case 1: {
77 tbb::task_arena arena( tbb::task_arena::automatic );
78 CHECK(!arena.is_active());
79 arena.initialize();
80 CHECK(arena.is_active());
81 break;
82 }
83 case 2: {
84 tbb::task_arena arena;
85 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
86 arena.initialize( std::rand() % maxthread + 1 );
87 CHECK(arena.is_active());
88 arena.terminate();
89 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
90 break;
91 }
92 }
93 }
94 }
95
96 //--------------------------------------------------//
97
98 // Definitions used in more than one test
99 typedef tbb::blocked_range<int> Range;
100
101 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
102 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
103
ResetTLS()104 void ResetTLS() {
105 local_id.clear();
106 old_id.clear();
107 slot_id.clear();
108 }
109
110 class ArenaObserver : public tbb::task_scheduler_observer {
111 int myId; // unique observer/arena id within a test
112 int myMaxConcurrency; // concurrency of the associated arena
113 int myNumReservedSlots; // reserved slots in the associated arena
on_scheduler_entry(bool is_worker)114 void on_scheduler_entry( bool is_worker ) override {
115 int current_index = tbb::this_task_arena::current_thread_index();
116 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
117 if (is_worker) {
118 CHECK(current_index >= myNumReservedSlots);
119 }
120 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
121 old_id.local() = local_id.local();
122 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
123 local_id.local() = myId;
124 slot_id.local() = current_index;
125 }
on_scheduler_exit(bool)126 void on_scheduler_exit( bool /*is_worker*/ ) override {
127 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
128 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
129 slot_id.local() = -2;
130 local_id.local() = old_id.local();
131 old_id.local() = 0;
132 }
133 public:
ArenaObserver(tbb::task_arena & a,int maxConcurrency,int numReservedSlots,int id)134 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
135 : tbb::task_scheduler_observer(a)
136 , myId(id)
137 , myMaxConcurrency(maxConcurrency)
138 , myNumReservedSlots(numReservedSlots) {
139 CHECK(myId);
140 observe(true);
141 }
~ArenaObserver()142 ~ArenaObserver () {
143 observe(false);
144 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state");
145 }
146 };
147
148 struct IndexTrackingBody { // Must be used together with ArenaObserver
operator ()IndexTrackingBody149 void operator() ( const Range& ) const {
150 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
151 utils::doDummyWork(50000);
152 }
153 };
154
155 struct AsynchronousWork {
156 utils::SpinBarrier &my_barrier;
157 bool my_is_blocking;
AsynchronousWorkAsynchronousWork158 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true)
159 : my_barrier(a_barrier), my_is_blocking(blocking) {}
operator ()AsynchronousWork160 void operator()() const {
161 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena");
162 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner());
163 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread
164 else my_barrier.signalNoWait();
165 }
166 };
167
168 //-----------------------------------------------------------------------------------------//
169
170 // Test that task_arenas might be created and used from multiple application threads.
171 // Also tests arena observers. The parameter p is the index of an app thread running this test.
TestConcurrentArenasFunc(int idx)172 void TestConcurrentArenasFunc(int idx) {
173 // A regression test for observer activation order:
174 // check that arena observer can be activated before local observer
175 struct LocalObserver : public tbb::task_scheduler_observer {
176 LocalObserver() : tbb::task_scheduler_observer() { observe(true); }
177 LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
178 observe(true);
179 }
180 ~LocalObserver() {
181 observe(false);
182 }
183 };
184
185 tbb::task_arena a1;
186 a1.initialize(1,0);
187 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
188 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated");
189
190 tbb::task_arena a2(2,1);
191 ArenaObserver o2(a2, 2, 1, idx*2+2);
192 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated");
193
194 LocalObserver lo1;
195 CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated");
196
197 tbb::task_arena a3(1, 0);
198 LocalObserver lo2(a3);
199 CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated");
200
201 utils::SpinBarrier barrier(2);
202 AsynchronousWork work(barrier);
203
204 a1.enqueue(work); // put async work
205 barrier.wait();
206
207 a2.enqueue(work); // another work
208 a2.execute(work);
209
210 a3.execute([] {
211 utils::doDummyWork(100);
212 });
213
214 a1.debug_wait_until_empty();
215 a2.debug_wait_until_empty();
216 }
217
TestConcurrentArenas(int p)218 void TestConcurrentArenas(int p) {
219 // TODO REVAMP fix with global control
220 ResetTLS();
221 utils::NativeParallelFor( p, &TestConcurrentArenasFunc );
222 }
223 //--------------------------------------------------//
224 // Test multiple application threads working with a single arena at the same time.
225 class MultipleMastersPart1 : utils::NoAssign {
226 tbb::task_arena &my_a;
227 utils::SpinBarrier &my_b1, &my_b2;
228 public:
MultipleMastersPart1(tbb::task_arena & a,utils::SpinBarrier & b1,utils::SpinBarrier & b2)229 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
230 : my_a(a), my_b1(b1), my_b2(b2) {}
operator ()(int) const231 void operator()(int) const {
232 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
233 my_b1.wait();
234 // A regression test for bugs 1954 & 1971
235 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
236 }
237 };
238
239 class MultipleMastersPart2 : utils::NoAssign {
240 tbb::task_arena &my_a;
241 utils::SpinBarrier &my_b;
242 public:
MultipleMastersPart2(tbb::task_arena & a,utils::SpinBarrier & b)243 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {}
operator ()(int) const244 void operator()(int) const {
245 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
246 }
247 };
248
249 class MultipleMastersPart3 : utils::NoAssign {
250 tbb::task_arena &my_a;
251 utils::SpinBarrier &my_b;
252 using wait_context = tbb::detail::d1::wait_context;
253
254 struct Runner : NoAssign {
255 wait_context& myWait;
RunnerMultipleMastersPart3::Runner256 Runner(wait_context& w) : myWait(w) {}
operator ()MultipleMastersPart3::Runner257 void operator()() const {
258 utils::doDummyWork(10000);
259 myWait.release();
260 }
261 };
262
263 struct Waiter : NoAssign {
264 wait_context& myWait;
WaiterMultipleMastersPart3::Waiter265 Waiter(wait_context& w) : myWait(w) {}
operator ()MultipleMastersPart3::Waiter266 void operator()() const {
267 tbb::task_group_context ctx;
268 tbb::detail::d1::wait(myWait, ctx);
269 }
270 };
271
272 public:
MultipleMastersPart3(tbb::task_arena & a,utils::SpinBarrier & b)273 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b)
274 : my_a(a), my_b(b) {}
operator ()(int) const275 void operator()(int) const {
276 wait_context wait(0);
277 my_b.wait(); // increases chances for task_arena initialization contention
278 for( int i=0; i<100; ++i) {
279 wait.reserve();
280 my_a.enqueue(Runner(wait));
281 my_a.execute(Waiter(wait));
282 }
283 my_b.wait();
284 }
285 };
286
TestMultipleMasters(int p)287 void TestMultipleMasters(int p) {
288 {
289 ResetTLS();
290 tbb::task_arena a(1,0);
291 a.initialize();
292 ArenaObserver o(a, 1, 0, 1);
293 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
294 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
295 barrier2.wait();
296 a.debug_wait_until_empty();
297 } {
298 ResetTLS();
299 tbb::task_arena a(2,1);
300 ArenaObserver o(a, 2, 1, 2);
301 utils::SpinBarrier barrier(p+2);
302 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
303 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task.
304 utils::Sleep(10);
305 NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
306 barrier.wait();
307 a.debug_wait_until_empty();
308 } {
309 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
310 tbb::task_arena a(p,1);
311 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs
312 // "Oversubscribe" the arena by 1 external thread
313 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
314 a.debug_wait_until_empty();
315 }
316 }
317
318 //--------------------------------------------------//
319 // TODO: explain what TestArenaEntryConsistency does
320 #include <sstream>
321 #include <stdexcept>
322 #include "oneapi/tbb/detail/_exception.h"
323 #include "common/fp_control.h"
324
325 struct TestArenaEntryBody : FPModeContext {
326 std::atomic<int> &my_stage; // each execute increases it
327 std::stringstream my_id;
328 bool is_caught, is_expected;
329 enum { arenaFPMode = 1 };
330
TestArenaEntryBodyTestArenaEntryBody331 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance
332 : FPModeContext(idx+i)
333 , my_stage(s)
334 , is_caught(false)
335 #if TBB_USE_EXCEPTIONS
336 , is_expected( (idx&(1<<i)) != 0 )
337 #else
338 , is_expected(false)
339 #endif
340 {
341 my_id << idx << ':' << i << '@';
342 }
operator ()TestArenaEntryBody343 void operator()() { // inside task_arena::execute()
344 // synchronize with other stages
345 int stage = my_stage++;
346 int slot = tbb::this_task_arena::current_thread_index();
347 CHECK(slot >= 0);
348 CHECK(slot <= 1);
349 // wait until the third stage is delegated and then starts on slot 0
350 while(my_stage < 2+slot) utils::yield();
351 // deduct its entry type and put it into id, it helps to find source of a problem
352 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
353 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
354 : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
355 AssertFPMode(arenaFPMode);
356 if (is_expected) {
357 TBB_TEST_THROW(std::logic_error(my_id.str()));
358 }
359 // no code can be put here since exceptions can be thrown
360 }
on_exceptionTestArenaEntryBody361 void on_exception(const char *e) { // outside arena, in catch block
362 is_caught = true;
363 CHECK(my_id.str() == e);
364 assertFPMode();
365 }
after_executeTestArenaEntryBody366 void after_execute() { // outside arena and catch block
367 CHECK(is_caught == is_expected);
368 assertFPMode();
369 }
370 };
371
372 class ForEachArenaEntryBody : utils::NoAssign {
373 tbb::task_arena &my_a; // expected task_arena(2,1)
374 std::atomic<int> &my_stage; // each execute increases it
375 int my_idx;
376
377 public:
ForEachArenaEntryBody(tbb::task_arena & a,std::atomic<int> & c)378 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c)
379 : my_a(a), my_stage(c), my_idx(0) {}
380
test(int idx)381 void test(int idx) {
382 my_idx = idx;
383 my_stage = 0;
384 NativeParallelFor(3, *this); // test cross-arena calls
385 CHECK(my_stage == 3);
386 my_a.execute(*this); // test nested calls
387 CHECK(my_stage == 5);
388 }
389
390 // task_arena functor for nested tests
operator ()() const391 void operator()() const {
392 test_arena_entry(3); // in current task group context
393 tbb::parallel_for(4, 5, *this); // in different context
394 }
395
396 // NativeParallelFor & parallel_for functor
operator ()(int i) const397 void operator()(int i) const {
398 test_arena_entry(i);
399 }
400
401 private:
test_arena_entry(int i) const402 void test_arena_entry(int i) const {
403 GetRoundingMode();
404 TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
405 GetRoundingMode();
406 #if TBB_USE_EXCEPTIONS
407 try {
408 my_a.execute(scoped_functor);
409 } catch(std::logic_error &e) {
410 scoped_functor.on_exception(e.what());
411 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); }
412 #else
413 my_a.execute(scoped_functor);
414 #endif
415 scoped_functor.after_execute();
416 }
417 };
418
TestArenaEntryConsistency()419 void TestArenaEntryConsistency() {
420 tbb::task_arena a(2, 1);
421 std::atomic<int> c;
422 ForEachArenaEntryBody body(a, c);
423
424 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
425 a.initialize(); // capture FP settings to arena
426 fp_scope.setNextFPMode();
427
428 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
429 body.test(i);
430 }
431 //--------------------------------------------------
432 // Test that the requested degree of concurrency for task_arena is achieved in various conditions
433 class TestArenaConcurrencyBody : utils::NoAssign {
434 tbb::task_arena &my_a;
435 int my_max_concurrency;
436 int my_reserved_slots;
437 utils::SpinBarrier *my_barrier;
438 utils::SpinBarrier *my_worker_barrier;
439 public:
TestArenaConcurrencyBody(tbb::task_arena & a,int max_concurrency,int reserved_slots,utils::SpinBarrier * b=nullptr,utils::SpinBarrier * wb=nullptr)440 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr )
441 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
442 // NativeParallelFor's functor
operator ()(int) const443 void operator()( int ) const {
444 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
445 local_id.local() = 1;
446 my_a.execute( *this );
447 }
448 // Arena's functor
operator ()() const449 void operator()() const {
450 int idx = tbb::this_task_arena::current_thread_index();
451 REQUIRE( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) );
452 REQUIRE( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() );
453 int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
454 REQUIRE( max_arena_concurrency == my_max_concurrency );
455 if ( my_worker_barrier ) {
456 if ( local_id.local() == 1 ) {
457 // External thread in a reserved slot
458 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
459 } else {
460 // Worker thread
461 CHECK( idx >= my_reserved_slots );
462 my_worker_barrier->wait();
463 }
464 } else if ( my_barrier )
465 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
466 if ( my_barrier ) my_barrier->wait();
467 else utils::Sleep( 1 );
468 }
469 };
470
TestArenaConcurrency(int p,int reserved=0,int step=1)471 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
472 for (; reserved <= p; reserved += step) {
473 tbb::task_arena a( p, reserved );
474 if (p - reserved < tbb::this_task_arena::max_concurrency()) {
475 // Check concurrency with worker & reserved external threads.
476 ResetTLS();
477 utils::SpinBarrier b( p );
478 utils::SpinBarrier wb( p-reserved );
479 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
480 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads
481 a.enqueue( test );
482 if ( reserved==1 )
483 test( 0 ); // calls execute()
484 else
485 utils::NativeParallelFor( reserved, test );
486 a.debug_wait_until_empty();
487 } { // Check if multiple external threads alone can achieve maximum concurrency.
488 ResetTLS();
489 utils::SpinBarrier b( p );
490 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
491 a.debug_wait_until_empty();
492 } { // Check oversubscription by external threads.
493 #if !_WIN32 || !_WIN64
494 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
495 // that makes impossible to create more than ~500 threads.
496 if ( !(sizeof(std::size_t) == 4 && p > 200) )
497 #endif
498 #if TBB_TEST_LOW_WORKLOAD
499 if ( p <= 16 )
500 #endif
501 {
502 ResetTLS();
503 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
504 a.debug_wait_until_empty();
505 }
506 }
507 }
508 }
509
510 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer {
511 utils::SpinBarrier& m_barrier;
512
TestMandatoryConcurrencyObserverTestMandatoryConcurrencyObserver513 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier)
514 : tbb::task_scheduler_observer(a), m_barrier(barrier) {
515 observe(true);
516 }
~TestMandatoryConcurrencyObserverTestMandatoryConcurrencyObserver517 ~TestMandatoryConcurrencyObserver() {
518 observe(false);
519 }
on_scheduler_exitTestMandatoryConcurrencyObserver520 void on_scheduler_exit(bool worker) override {
521 if (worker) {
522 m_barrier.wait();
523 }
524 }
525 };
526
TestMandatoryConcurrency()527 void TestMandatoryConcurrency() {
528 tbb::task_arena a(1);
529 a.execute([&a] {
530 int n_threads = 4;
531 utils::SpinBarrier exit_barrier(2);
532 TestMandatoryConcurrencyObserver observer(a, exit_barrier);
533 for (int j = 0; j < 5; ++j) {
534 utils::ExactConcurrencyLevel::check(1);
535 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 };
536 utils::SpinBarrier barrier(n_threads);
537 utils::NativeParallelFor(n_threads, [&](int) {
538 for (int i = 0; i < 5; ++i) {
539 barrier.wait();
540 a.enqueue([&] {
541 CHECK(tbb::this_task_arena::max_concurrency() == 2);
542 CHECK(a.max_concurrency() == 2);
543 ++curr_tasks;
544 CHECK(curr_tasks == 1);
545 utils::doDummyWork(1000);
546 CHECK(curr_tasks == 1);
547 --curr_tasks;
548 ++num_tasks;
549 });
550 barrier.wait();
551 }
552 });
553 do {
554 exit_barrier.wait();
555 } while (num_tasks < n_threads * 5);
556 }
557 });
558 }
559
TestConcurrentFunctionality(int min_thread_num=1,int max_thread_num=3)560 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) {
561 TestMandatoryConcurrency();
562 InitializeAndTerminate(max_thread_num);
563 for (int p = min_thread_num; p <= max_thread_num; ++p) {
564 TestConcurrentArenas(p);
565 TestMultipleMasters(p);
566 TestArenaConcurrency(p);
567 }
568 }
569
570 //--------------------------------------------------//
571 // Test creation/initialization of a task_arena that references an existing arena (aka attach).
572 // This part of the test uses the knowledge of task_arena internals
573
574 struct TaskArenaValidator {
575 int my_slot_at_construction;
576 const tbb::task_arena& my_arena;
TaskArenaValidatorTaskArenaValidator577 TaskArenaValidator( const tbb::task_arena& other )
578 : my_slot_at_construction(tbb::this_task_arena::current_thread_index())
579 , my_arena(other)
580 {}
581 // Inspect the internal state
concurrencyTaskArenaValidator582 int concurrency() { return my_arena.debug_max_concurrency(); }
reserved_for_mastersTaskArenaValidator583 int reserved_for_masters() { return my_arena.debug_reserved_slots(); }
584
585 // This method should be called in task_arena::execute() for a captured arena
586 // by the same thread that created the validator.
operator ()TaskArenaValidator587 void operator()() {
588 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
589 "Current thread index has changed since the validator construction" );
590 }
591 };
592
ValidateAttachedArena(tbb::task_arena & arena,bool expect_activated,int expect_concurrency,int expect_masters)593 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
594 int expect_concurrency, int expect_masters ) {
595 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" );
596 if( arena.is_active() ) {
597 TaskArenaValidator validator( arena );
598 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
599 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
600 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
601 CHECK(tbb::this_task_arena::current_thread_index() >= 0);
602 // for threads already in arena, check that the thread index remains the same
603 arena.execute( validator );
604 } else { // not_initialized
605 // Test the deprecated method
606 CHECK(tbb::this_task_arena::current_thread_index() == -1);
607 }
608
609 // Ideally, there should be a check for having the same internal arena object,
610 // but that object is not easily accessible for implicit arenas.
611 }
612 }
613
614 struct TestAttachBody : utils::NoAssign {
615 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor
616 const int maxthread;
TestAttachBodyTestAttachBody617 TestAttachBody( int max_thr ) : maxthread(max_thr) {}
618
619 // The functor body for NativeParallelFor
operator ()TestAttachBody620 void operator()( int idx ) const {
621 my_idx = idx;
622
623 int default_threads = tbb::this_task_arena::max_concurrency();
624
625 tbb::task_arena arena{tbb::task_arena::attach()};
626 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
627
628 arena.terminate();
629 ValidateAttachedArena( arena, false, -1, -1 );
630
631 // attach to an auto-initialized arena
632 tbb::parallel_for(0, 1, [](int) {});
633
634 tbb::task_arena arena2{tbb::task_arena::attach()};
635 ValidateAttachedArena( arena2, true, default_threads, 1 );
636
637 tbb::task_arena arena3;
638 arena3.initialize(tbb::attach());
639 ValidateAttachedArena( arena3, true, default_threads, 1 );
640
641
642 // attach to another task_arena
643 arena.initialize( maxthread, std::min(maxthread,idx) );
644 arena.execute( *this );
645 }
646
647 // The functor body for task_arena::execute above
operator ()TestAttachBody648 void operator()() const {
649 tbb::task_arena arena2{tbb::task_arena::attach()};
650 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) );
651 }
652
653 // The functor body for tbb::parallel_for
operator ()TestAttachBody654 void operator()( const Range& r ) const {
655 for( int i = r.begin(); i<r.end(); ++i ) {
656 tbb::task_arena arena2{tbb::task_arena::attach()};
657 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 );
658 }
659 }
660 };
661
662 thread_local int TestAttachBody::my_idx;
663
TestAttach(int maxthread)664 void TestAttach( int maxthread ) {
665 // Externally concurrent, but no concurrency within a thread
666 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) );
667 // Concurrent within the current arena; may also serve as a stress test
668 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
669 }
670
671 //--------------------------------------------------//
672
673 // Test that task_arena::enqueue does not tolerate a non-const functor.
674 // TODO: can it be reworked as SFINAE-based compile-time check?
675 struct TestFunctor {
operator ()TestFunctor676 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
operator ()TestFunctor677 void operator()() const { /* library requires this overload only */ }
678 };
679
TestConstantFunctorRequirement()680 void TestConstantFunctorRequirement() {
681 tbb::task_arena a;
682 TestFunctor tf;
683 a.enqueue( tf );
684 }
685
686 //--------------------------------------------------//
687
688 #include "tbb/parallel_reduce.h"
689 #include "tbb/parallel_invoke.h"
690
691 // Test this_task_arena::isolate
692 namespace TestIsolatedExecuteNS {
693 template <typename NestedPartitioner>
694 class NestedParFor : utils::NoAssign {
695 public:
NestedParFor()696 NestedParFor() {}
operator ()() const697 void operator()() const {
698 NestedPartitioner p;
699 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p );
700 }
701 };
702
703 template <typename NestedPartitioner>
704 class ParForBody : utils::NoAssign {
705 bool myOuterIsolation;
706 tbb::enumerable_thread_specific<int> &myEts;
707 std::atomic<bool> &myIsStolen;
708 public:
ParForBody(bool outer_isolation,tbb::enumerable_thread_specific<int> & ets,std::atomic<bool> & is_stolen)709 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen )
710 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
operator ()(int) const711 void operator()( int ) const {
712 int &e = myEts.local();
713 if ( e++ > 0 ) myIsStolen = true;
714 if ( myOuterIsolation )
715 NestedParFor<NestedPartitioner>()();
716 else
717 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
718 --e;
719 }
720 };
721
722 template <typename OuterPartitioner, typename NestedPartitioner>
723 class OuterParFor : utils::NoAssign {
724 bool myOuterIsolation;
725 std::atomic<bool> &myIsStolen;
726 public:
OuterParFor(bool outer_isolation,std::atomic<bool> & is_stolen)727 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
operator ()() const728 void operator()() const {
729 tbb::enumerable_thread_specific<int> ets( 0 );
730 OuterPartitioner p;
731 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
732 }
733 };
734
735 template <typename OuterPartitioner, typename NestedPartitioner>
TwoLoopsTest(bool outer_isolation)736 void TwoLoopsTest( bool outer_isolation ) {
737 std::atomic<bool> is_stolen;
738 is_stolen = false;
739 const int max_repeats = 100;
740 if ( outer_isolation ) {
741 for ( int i = 0; i <= max_repeats; ++i ) {
742 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
743 if ( is_stolen ) break;
744 }
745 // TODO: was ASSERT_WARNING
746 if (!is_stolen) {
747 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n");
748 }
749 } else {
750 for ( int i = 0; i <= max_repeats; ++i ) {
751 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
752 }
753 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer levels" );
754 }
755 }
756
TwoLoopsTest(bool outer_isolation)757 void TwoLoopsTest( bool outer_isolation ) {
758 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
759 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
760 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
761 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
762 }
763
TwoLoopsTest()764 void TwoLoopsTest() {
765 TwoLoopsTest( true );
766 TwoLoopsTest( false );
767 }
768 //--------------------------------------------------//
769 class HeavyMixTestBody : utils::NoAssign {
770 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom;
771 tbb::enumerable_thread_specific<int>& myIsolatedLevel;
772 int myNestedLevel;
773
774 template <typename Partitioner, typename Body>
RunTwoBodies(utils::FastRandom<> & rnd,const Body & body,Partitioner & p,tbb::task_group_context * ctx=nullptr)775 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = nullptr ) {
776 if ( rnd.get() % 2 ) {
777 if (ctx )
778 tbb::parallel_for( 0, 2, body, p, *ctx );
779 else
780 tbb::parallel_for( 0, 2, body, p );
781 } else {
782 tbb::parallel_invoke( body, body );
783 }
784 }
785
786 template <typename Partitioner>
787 class IsolatedBody : utils::NoAssign {
788 const HeavyMixTestBody &myHeavyMixTestBody;
789 Partitioner &myPartitioner;
790 public:
IsolatedBody(const HeavyMixTestBody & body,Partitioner & partitioner)791 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
792 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
operator ()() const793 void operator()() const {
794 RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
795 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
796 myHeavyMixTestBody.myNestedLevel + 1 ),
797 myPartitioner );
798 }
799 };
800
801 template <typename Partitioner>
RunNextLevel(utils::FastRandom<> & rnd,int & isolated_level) const802 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const {
803 Partitioner p;
804 switch ( rnd.get() % 2 ) {
805 case 0: {
806 // No features
807 tbb::task_group_context ctx;
808 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx );
809 break;
810 }
811 case 1: {
812 // Isolation
813 int previous_isolation = isolated_level;
814 isolated_level = myNestedLevel;
815 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
816 isolated_level = previous_isolation;
817 break;
818 }
819 }
820 }
821 public:
HeavyMixTestBody(tbb::enumerable_thread_specific<utils::FastRandom<>> & random,tbb::enumerable_thread_specific<int> & isolated_level,int nested_level)822 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random,
823 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level )
824 : myRandom( random ), myIsolatedLevel( isolated_level )
825 , myNestedLevel( nested_level ) {}
operator ()() const826 void operator()() const {
827 int &isolated_level = myIsolatedLevel.local();
828 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
829 if ( myNestedLevel == 20 )
830 return;
831 utils::FastRandom<>& rnd = myRandom.local();
832 if ( rnd.get() % 2 == 1 ) {
833 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
834 } else {
835 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
836 }
837 }
operator ()(int) const838 void operator()(int) const {
839 this->operator()();
840 }
841 };
842
843 struct RandomInitializer {
operator ()TestIsolatedExecuteNS::RandomInitializer844 utils::FastRandom<> operator()() {
845 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() );
846 }
847 };
848
HeavyMixTest()849 void HeavyMixTest() {
850 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency();
851 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
852
853 RandomInitializer init_random;
854 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random );
855 tbb::enumerable_thread_specific<int> isolated_level( 0 );
856 for ( int i = 0; i < 5; ++i ) {
857 HeavyMixTestBody b( random, isolated_level, 1 );
858 b( 0 );
859 }
860 }
861
862 //--------------------------------------------------//
863 #if TBB_USE_EXCEPTIONS
864 struct MyException {};
865 struct IsolatedBodyThrowsException {
operator ()TestIsolatedExecuteNS::IsolatedBodyThrowsException866 void operator()() const {
867 #if _MSC_VER && !__INTEL_COMPILER
868 // Workaround an unreachable code warning in task_arena_function.
869 volatile bool workaround = true;
870 if (workaround)
871 #endif
872 {
873 throw MyException();
874 }
875 }
876 };
877 struct ExceptionTestBody : utils::NoAssign {
878 tbb::enumerable_thread_specific<int>& myEts;
879 std::atomic<bool>& myIsStolen;
ExceptionTestBodyTestIsolatedExecuteNS::ExceptionTestBody880 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen )
881 : myEts( ets ), myIsStolen( is_stolen ) {}
operator ()TestIsolatedExecuteNS::ExceptionTestBody882 void operator()( int i ) const {
883 try {
884 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
885 REQUIRE_MESSAGE( false, "The exception has been lost" );
886 }
887 catch ( MyException ) {}
888 catch ( ... ) {
889 REQUIRE_MESSAGE( false, "Unexpected exception" );
890 }
891 // Check that nested algorithms can steal outer-level tasks
892 int &e = myEts.local();
893 if ( e++ > 0 ) myIsStolen = true;
894 // work imbalance increases chances for stealing
895 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) );
896 --e;
897 }
898 };
899
900 #endif /* TBB_USE_EXCEPTIONS */
ExceptionTest()901 void ExceptionTest() {
902 #if TBB_USE_EXCEPTIONS
903 tbb::enumerable_thread_specific<int> ets;
904 std::atomic<bool> is_stolen;
905 is_stolen = false;
906 for ( ;; ) {
907 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
908 if ( is_stolen ) break;
909 }
910 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" );
911 #endif /* TBB_USE_EXCEPTIONS */
912 }
913
914 struct NonConstBody {
915 unsigned int state;
operator ()TestIsolatedExecuteNS::NonConstBody916 void operator()() {
917 state ^= ~0u;
918 }
919 };
920
TestNonConstBody()921 void TestNonConstBody() {
922 NonConstBody body;
923 body.state = 0x6c97d5ed;
924 tbb::this_task_arena::isolate(body);
925 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state");
926 }
927
928 // TODO: Consider tbb::task_group instead of explicit task API.
929 class TestEnqueueTask : public tbb::detail::d1::task {
930 using wait_context = tbb::detail::d1::wait_context;
931
932 tbb::enumerable_thread_specific<bool>& executed;
933 std::atomic<int>& completed;
934
935 public:
936 wait_context& waiter;
937 tbb::task_arena& arena;
938 static const int N = 100;
939
TestEnqueueTask(tbb::enumerable_thread_specific<bool> & exe,std::atomic<int> & c,wait_context & w,tbb::task_arena & a)940 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a)
941 : executed(exe), completed(c), waiter(w), arena(a) {}
942
execute(tbb::detail::d1::execution_data &)943 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override {
944 for (int i = 0; i < N; ++i) {
945 arena.enqueue([&]() {
946 executed.local() = true;
947 ++completed;
948 for (int j = 0; j < 100; j++) utils::yield();
949 waiter.release(1);
950 });
951 }
952 return nullptr;
953 }
cancel(tbb::detail::d1::execution_data &)954 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; }
955 };
956
957 class TestEnqueueIsolateBody : utils::NoCopy {
958 tbb::enumerable_thread_specific<bool>& executed;
959 std::atomic<int>& completed;
960 tbb::task_arena& arena;
961 public:
962 static const int N = 100;
963
TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool> & exe,std::atomic<int> & c,tbb::task_arena & a)964 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a)
965 : executed(exe), completed(c), arena(a) {}
operator ()()966 void operator()() {
967 tbb::task_group_context ctx;
968 tbb::detail::d1::wait_context waiter(N);
969
970 TestEnqueueTask root(executed, completed, waiter, arena);
971 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx);
972 }
973 };
974
TestEnqueue()975 void TestEnqueue() {
976 tbb::enumerable_thread_specific<bool> executed(false);
977 std::atomic<int> completed;
978 tbb::task_arena arena{tbb::task_arena::attach()};
979
980 // Check that the main thread can process enqueued tasks.
981 completed = 0;
982 TestEnqueueIsolateBody b1(executed, completed, arena);
983 b1();
984
985 if (!executed.local()) {
986 REPORT("Warning: No one enqueued task has executed by the main thread.\n");
987 }
988
989 executed.local() = false;
990 completed = 0;
991 const int N = 100;
992 // Create enqueued tasks out of isolation.
993
994 tbb::task_group_context ctx;
995 tbb::detail::d1::wait_context waiter(N);
996 for (int i = 0; i < N; ++i) {
997 arena.enqueue([&]() {
998 executed.local() = true;
999 ++completed;
1000 utils::yield();
1001 waiter.release(1);
1002 });
1003 }
1004 TestEnqueueIsolateBody b2(executed, completed, arena);
1005 tbb::this_task_arena::isolate(b2);
1006 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate.");
1007
1008 tbb::detail::d1::wait(waiter, ctx);
1009 // while (completed < TestEnqueueTask::N + N) utils::yield();
1010 }
1011 }
1012
TestIsolatedExecute()1013 void TestIsolatedExecute() {
1014 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
1015 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
1016 // the owner will not have a possibility to steal outer level tasks.
1017 int platform_max_thread = tbb::this_task_arena::max_concurrency();
1018 int num_threads = utils::min( platform_max_thread, 3 );
1019 {
1020 // Too many threads require too many work to reproduce the stealing from outer level.
1021 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7));
1022 TestIsolatedExecuteNS::TwoLoopsTest();
1023 TestIsolatedExecuteNS::HeavyMixTest();
1024 TestIsolatedExecuteNS::ExceptionTest();
1025 }
1026 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
1027 TestIsolatedExecuteNS::HeavyMixTest();
1028 TestIsolatedExecuteNS::TestNonConstBody();
1029 TestIsolatedExecuteNS::TestEnqueue();
1030 }
1031
1032 //-----------------------------------------------------------------------------------------//
1033
1034 class TestDelegatedSpawnWaitBody : utils::NoAssign {
1035 tbb::task_arena &my_a;
1036 utils::SpinBarrier &my_b1, &my_b2;
1037 public:
TestDelegatedSpawnWaitBody(tbb::task_arena & a,utils::SpinBarrier & b1,utils::SpinBarrier & b2)1038 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
1039 : my_a(a), my_b1(b1), my_b2(b2) {}
1040 // NativeParallelFor's functor
operator ()(int idx) const1041 void operator()(int idx) const {
1042 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
1043 for (int i = 0; i < 2; ++i) {
1044 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers
1045 }
1046 tbb::task_group tg;
1047 my_b1.wait(); // sync with the workers
1048 for( int i=0; i<100000; ++i) {
1049 my_a.execute([&tg] { tg.run([] {}); });
1050 }
1051 my_a.execute([&tg] {tg.wait(); });
1052 }
1053
1054 my_b2.wait(); // sync both threads
1055 }
1056 };
1057
TestDelegatedSpawnWait()1058 void TestDelegatedSpawnWait() {
1059 if (tbb::this_task_arena::max_concurrency() < 3) {
1060 // The test requires at least 2 worker threads
1061 return;
1062 }
1063 // Regression test for a bug with missed wakeup notification from a delegated task
1064 tbb::task_arena a(2,0);
1065 a.initialize();
1066 utils::SpinBarrier barrier1(3), barrier2(2);
1067 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
1068 a.debug_wait_until_empty();
1069 }
1070
1071 //-----------------------------------------------------------------------------------------//
1072
1073 class TestMultipleWaitsArenaWait : utils::NoAssign {
1074 using wait_context = tbb::detail::d1::wait_context;
1075 public:
TestMultipleWaitsArenaWait(int idx,int bunch_size,int num_tasks,std::vector<wait_context * > & waiters,std::atomic<int> & processed,tbb::task_group_context & tgc)1076 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1077 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
operator ()() const1078 void operator()() const {
1079 ++my_processed;
1080 // Wait for all tasks
1081 if ( my_idx < my_num_tasks ) {
1082 tbb::detail::d1::wait(*my_waiters[my_idx], my_context);
1083 }
1084 // Signal waiting tasks
1085 if ( my_idx >= my_bunch_size ) {
1086 my_waiters[my_idx-my_bunch_size]->release();
1087 }
1088 }
1089 private:
1090 int my_idx;
1091 int my_bunch_size;
1092 int my_num_tasks;
1093 std::vector<wait_context*>& my_waiters;
1094 std::atomic<int>& my_processed;
1095 tbb::task_group_context& my_context;
1096 };
1097
1098 class TestMultipleWaitsThreadBody : utils::NoAssign {
1099 using wait_context = tbb::detail::d1::wait_context;
1100 public:
TestMultipleWaitsThreadBody(int bunch_size,int num_tasks,tbb::task_arena & a,std::vector<wait_context * > & waiters,std::atomic<int> & processed,tbb::task_group_context & tgc)1101 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1102 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
operator ()(int idx) const1103 void operator()( int idx ) const {
1104 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) );
1105 --my_processed;
1106 }
1107 private:
1108 int my_bunch_size;
1109 int my_num_tasks;
1110 tbb::task_arena& my_arena;
1111 std::vector<wait_context*>& my_waiters;
1112 std::atomic<int>& my_processed;
1113 tbb::task_group_context& my_context;
1114 };
1115
TestMultipleWaits(int num_threads,int num_bunches,int bunch_size)1116 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
1117 tbb::task_arena a( num_threads );
1118 const int num_tasks = (num_bunches-1)*bunch_size;
1119
1120 tbb::task_group_context tgc;
1121 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks);
1122 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0);
1123
1124 std::atomic<int> processed(0);
1125 for ( int repeats = 0; repeats<10; ++repeats ) {
1126 int idx = 0;
1127 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1128 // Sync with the previous bunch of waiters to prevent "false" nested dependencies (when a nested task waits for an outer task).
1129 while ( processed < bunch*bunch_size ) utils::yield();
1130 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters.
1131 for ( int i = 0; i<bunch_size; ++i ) {
1132 waiters[idx]->reserve();
1133 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1134 }
1135 }
1136 // No sync because the threads of the last bunch do not call wait_for_all.
1137 // Run the last bunch of threads.
1138 for ( int i = 0; i<bunch_size; ++i )
1139 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1140 while ( processed ) utils::yield();
1141 }
1142 for (auto w : waiters) delete w;
1143 }
1144
TestMultipleWaits()1145 void TestMultipleWaits() {
1146 // Limit the number of threads to prevent heavy oversubscription.
1147 #if TBB_TEST_LOW_WORKLOAD
1148 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() );
1149 #else
1150 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() );
1151 #endif
1152
1153 utils::FastRandom<> rnd(1234);
1154 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) {
1155 for ( int i = 0; i<3; ++i ) {
1156 const int num_bunches = 3 + rnd.get()%3;
1157 const int bunch_size = max_threads + rnd.get()%max_threads;
1158 TestMultipleWaits( threads, num_bunches, bunch_size );
1159 }
1160 }
1161 }
1162
1163 //--------------------------------------------------//
1164
TestSmallStackSize()1165 void TestSmallStackSize() {
1166 tbb::global_control gc(tbb::global_control::thread_stack_size,
1167 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
1168 // The test produces the warning (not a error) if fails. So the test is run many times
1169 // to make the log annoying (to force to consider it as an error).
1170 for (int i = 0; i < 100; ++i) {
1171 tbb::task_arena a;
1172 a.initialize();
1173 }
1174 }
1175
1176 //--------------------------------------------------//
1177
1178 namespace TestMoveSemanticsNS {
1179 struct TestFunctor {
operator ()TestMoveSemanticsNS::TestFunctor1180 void operator()() const {};
1181 };
1182
1183 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
MoveOnlyFunctorTestMoveSemanticsNS::MoveOnlyFunctor1184 MoveOnlyFunctor() : utils::MoveOnly() {};
MoveOnlyFunctorTestMoveSemanticsNS::MoveOnlyFunctor1185 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
1186 };
1187
1188 struct MovePreferableFunctor : utils::Movable, TestFunctor {
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor1189 MovePreferableFunctor() : utils::Movable() {};
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor1190 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {};
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor1191 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
1192 };
1193
1194 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
NoMoveNoCopyFunctorTestMoveSemanticsNS::NoMoveNoCopyFunctor1195 NoMoveNoCopyFunctor() : utils::NoCopy() {};
1196 // mv ctor is not allowed as cp ctor from parent NoCopy
1197 private:
1198 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
1199 };
1200
TestFunctors()1201 void TestFunctors() {
1202 tbb::task_arena ta;
1203 MovePreferableFunctor mpf;
1204 // execute() doesn't have any copies or moves of arguments inside the impl
1205 ta.execute( NoMoveNoCopyFunctor() );
1206
1207 ta.enqueue( MoveOnlyFunctor() );
1208 ta.enqueue( mpf );
1209 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval");
1210 mpf.Reset();
1211 ta.enqueue( std::move(mpf) );
1212 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
1213 mpf.Reset();
1214 }
1215 }
1216
TestMoveSemantics()1217 void TestMoveSemantics() {
1218 TestMoveSemanticsNS::TestFunctors();
1219 }
1220
1221 //--------------------------------------------------//
1222
1223 #include <vector>
1224
1225 #include "common/state_trackable.h"
1226
1227 namespace TestReturnValueNS {
1228 struct noDefaultTag {};
1229 class ReturnType : public StateTrackable<> {
1230 static const int SIZE = 42;
1231 std::vector<int> data;
1232 public:
ReturnType(noDefaultTag)1233 ReturnType(noDefaultTag) : StateTrackable<>(0) {}
1234 // Define copy constructor to test that it is never called
ReturnType(const ReturnType & r)1235 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {}
ReturnType(ReturnType && r)1236 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
1237
fill()1238 void fill() {
1239 for (int i = 0; i < SIZE; ++i)
1240 data.push_back(i);
1241 }
check()1242 void check() {
1243 REQUIRE(data.size() == unsigned(SIZE));
1244 for (int i = 0; i < SIZE; ++i)
1245 REQUIRE(data[i] == i);
1246 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters;
1247 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0);
1248 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1);
1249 std::size_t copied = cnts[StateTrackableBase::CopyInitialized];
1250 std::size_t moved = cnts[StateTrackableBase::MoveInitialized];
1251 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved);
1252 // The number of copies/moves should not exceed 3 if copy elision takes a place:
1253 // function return, store to an internal storage, acquire internal storage.
1254 // For compilation, without copy elision, this number may be grown up to 7.
1255 REQUIRE((copied == 0 && moved <= 7));
1256 WARN_MESSAGE(moved <= 3,
1257 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place."
1258 "Take an attention to this warning only if copy elision is enabled."
1259 );
1260 }
1261 };
1262
1263 template <typename R>
function()1264 R function() {
1265 noDefaultTag tag;
1266 R r(tag);
1267 r.fill();
1268 return r;
1269 }
1270
1271 template <>
function()1272 void function<void>() {}
1273
1274 template <typename R>
1275 struct Functor {
operator ()TestReturnValueNS::Functor1276 R operator()() const {
1277 return function<R>();
1278 }
1279 };
1280
arena()1281 tbb::task_arena& arena() {
1282 static tbb::task_arena a;
1283 return a;
1284 }
1285
1286 template <typename F>
TestExecute(F & f)1287 void TestExecute(F &f) {
1288 StateTrackableCounters::reset();
1289 ReturnType r{arena().execute(f)};
1290 r.check();
1291 }
1292
1293 template <typename F>
TestExecute(const F & f)1294 void TestExecute(const F &f) {
1295 StateTrackableCounters::reset();
1296 ReturnType r{arena().execute(f)};
1297 r.check();
1298 }
1299 template <typename F>
TestIsolate(F & f)1300 void TestIsolate(F &f) {
1301 StateTrackableCounters::reset();
1302 ReturnType r{tbb::this_task_arena::isolate(f)};
1303 r.check();
1304 }
1305
1306 template <typename F>
TestIsolate(const F & f)1307 void TestIsolate(const F &f) {
1308 StateTrackableCounters::reset();
1309 ReturnType r{tbb::this_task_arena::isolate(f)};
1310 r.check();
1311 }
1312
Test()1313 void Test() {
1314 TestExecute(Functor<ReturnType>());
1315 Functor<ReturnType> f1;
1316 TestExecute(f1);
1317 TestExecute(function<ReturnType>);
1318
1319 arena().execute(Functor<void>());
1320 Functor<void> f2;
1321 arena().execute(f2);
1322 arena().execute(function<void>);
1323 TestIsolate(Functor<ReturnType>());
1324 TestIsolate(f1);
1325 TestIsolate(function<ReturnType>);
1326 tbb::this_task_arena::isolate(Functor<void>());
1327 tbb::this_task_arena::isolate(f2);
1328 tbb::this_task_arena::isolate(function<void>);
1329 }
1330 }
1331
TestReturnValue()1332 void TestReturnValue() {
1333 TestReturnValueNS::Test();
1334 }
1335
1336 //--------------------------------------------------//
1337
1338 // MyObserver checks if threads join to the same arena
1339 struct MyObserver: public tbb::task_scheduler_observer {
1340 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
1341 tbb::task_arena& my_arena;
1342 std::atomic<int>& my_failure_counter;
1343 std::atomic<int>& my_counter;
1344 utils::SpinBarrier& m_barrier;
1345
MyObserverMyObserver1346 MyObserver(tbb::task_arena& a,
1347 tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
1348 std::atomic<int>& failure_counter,
1349 std::atomic<int>& counter,
1350 utils::SpinBarrier& barrier)
1351 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1352 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) {
1353 observe(true);
1354 }
~MyObserverMyObserver1355 ~MyObserver(){
1356 observe(false);
1357 }
on_scheduler_entryMyObserver1358 void on_scheduler_entry(bool worker) override {
1359 if (worker) {
1360 ++my_counter;
1361 tbb::task_arena*& cur_arena = my_tls.local();
1362 if (cur_arena != nullptr && cur_arena != &my_arena) {
1363 ++my_failure_counter;
1364 }
1365 cur_arena = &my_arena;
1366 m_barrier.wait();
1367 }
1368 }
on_scheduler_exitMyObserver1369 void on_scheduler_exit(bool worker) override {
1370 if (worker) {
1371 m_barrier.wait(); // before wakeup
1372 m_barrier.wait(); // after wakeup
1373 }
1374 }
1375 };
1376
TestArenaWorkersMigrationWithNumThreads(int n_threads=0)1377 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
1378 if (n_threads == 0) {
1379 n_threads = tbb::this_task_arena::max_concurrency();
1380 }
1381
1382 const int max_n_arenas = 8;
1383 int n_arenas = 2;
1384 if(n_threads > 16) {
1385 n_arenas = max_n_arenas;
1386 } else if (n_threads > 8) {
1387 n_arenas = 4;
1388 }
1389
1390 int n_workers = n_threads - 1;
1391 n_workers = n_arenas * (n_workers / n_arenas);
1392 if (n_workers == 0) {
1393 return;
1394 }
1395
1396 n_threads = n_workers + 1;
1397 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads);
1398
1399 const int n_repetitions = 20;
1400 const int n_outer_repetitions = 100;
1401 std::multiset<float> failure_ratio; // for median calculating
1402 utils::SpinBarrier barrier(n_threads);
1403 utils::SpinBarrier worker_barrier(n_workers);
1404 MyObserver* observer[max_n_arenas];
1405 std::vector<tbb::task_arena> arenas(n_arenas);
1406 std::atomic<int> failure_counter;
1407 std::atomic<int> counter;
1408 tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1409
1410 for (int i = 0; i < n_arenas; ++i) {
1411 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master
1412 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier);
1413 }
1414
1415 int ii = 0;
1416 for (; ii < n_outer_repetitions; ++ii) {
1417 failure_counter = 0;
1418 counter = 0;
1419
1420 // Main code
1421 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); };
1422 wakeup();
1423 for (int j = 0; j < n_repetitions; ++j) {
1424 barrier.wait(); // entry
1425 barrier.wait(); // exit1
1426 wakeup();
1427 barrier.wait(); // exit2
1428 }
1429 barrier.wait(); // entry
1430 barrier.wait(); // exit1
1431 barrier.wait(); // exit2
1432
1433 failure_ratio.insert(float(failure_counter) / counter);
1434 tls.clear();
1435 // collect 3 elements in failure_ratio before calculating median
1436 if (ii > 1) {
1437 std::multiset<float>::iterator it = failure_ratio.begin();
1438 std::advance(it, failure_ratio.size() / 2);
1439 if (*it < 0.02)
1440 break;
1441 }
1442 }
1443 for (int i = 0; i < n_arenas; ++i) {
1444 delete observer[i];
1445 }
1446 // check if median is so big
1447 std::multiset<float>::iterator it = failure_ratio.begin();
1448 std::advance(it, failure_ratio.size() / 2);
1449 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
1450 if (*it > 0.05) {
1451 REPORT("Warning: So many cases when threads join to different arenas.\n");
1452 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
1453 }
1454 }
1455
TestArenaWorkersMigration()1456 void TestArenaWorkersMigration() {
1457 TestArenaWorkersMigrationWithNumThreads(4);
1458 if (tbb::this_task_arena::max_concurrency() != 4) {
1459 TestArenaWorkersMigrationWithNumThreads();
1460 }
1461 }
1462
1463 //--------------------------------------------------//
TestDefaultCreatedWorkersAmount()1464 void TestDefaultCreatedWorkersAmount() {
1465 int threads = tbb::this_task_arena::max_concurrency();
1466 utils::NativeParallelFor(1, [threads](int idx) {
1467 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS");
1468 utils::SpinBarrier barrier(threads);
1469 ResetTLS();
1470 for (auto blocked : { false, true }) {
1471 for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) {
1472 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) {
1473 CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
1474 CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
1475 local_id.local() = 1;
1476 if (blocked) {
1477 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads.
1478 utils::Sleep(1);
1479 barrier.wait();
1480 }
1481 }, tbb::simple_partitioner());
1482 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num");
1483 if (blocked) {
1484 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num");
1485 }
1486 }
1487 }
1488 });
1489 }
1490
TestAbilityToCreateWorkers(int thread_num)1491 void TestAbilityToCreateWorkers(int thread_num) {
1492 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num);
1493 // Checks only some part of reserved-external threads amount:
1494 // 0 and 1 reserved threads are important cases but it is also needed
1495 // to collect some statistic data with other amount and to not consume
1496 // whole test session time checking each amount
1497 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
1498 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
1499 }
1500
TestDefaultWorkersLimit()1501 void TestDefaultWorkersLimit() {
1502 TestDefaultCreatedWorkersAmount();
1503 #if TBB_TEST_LOW_WORKLOAD
1504 TestAbilityToCreateWorkers(24);
1505 #else
1506 TestAbilityToCreateWorkers(256);
1507 #endif
1508 }
1509
1510 #if TBB_USE_EXCEPTIONS
1511
ExceptionInExecute()1512 void ExceptionInExecute() {
1513 std::size_t thread_number = utils::get_platform_max_threads();
1514 int arena_concurrency = static_cast<int>(thread_number) / 2;
1515 tbb::task_arena test_arena(arena_concurrency, arena_concurrency);
1516
1517 std::atomic<int> canceled_task{};
1518
1519 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) {
1520 for (std::size_t i = 0; i < 1000; ++i) {
1521 try {
1522 test_arena.execute([] {
1523 volatile bool suppress_unreachable_code_warning = true;
1524 if (suppress_unreachable_code_warning) {
1525 throw -1;
1526 }
1527 });
1528 FAIL("An exception should have thrown.");
1529 } catch (int) {
1530 ++canceled_task;
1531 } catch (...) {
1532 FAIL("Wrong type of exception.");
1533 }
1534 }
1535 };
1536
1537 utils::NativeParallelFor(thread_number, parallel_func);
1538 CHECK(canceled_task == thread_number * 1000);
1539 }
1540
1541 #endif // TBB_USE_EXCEPTIONS
1542
1543 class simple_observer : public tbb::task_scheduler_observer {
1544 static std::atomic<int> idx_counter;
1545 int my_idx;
1546 int myMaxConcurrency; // concurrency of the associated arena
1547 int myNumReservedSlots; // reserved slots in the associated arena
on_scheduler_entry(bool is_worker)1548 void on_scheduler_entry( bool is_worker ) override {
1549 int current_index = tbb::this_task_arena::current_thread_index();
1550 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
1551 if (is_worker) {
1552 CHECK(current_index >= myNumReservedSlots);
1553 }
1554 }
on_scheduler_exit(bool)1555 void on_scheduler_exit( bool /*is_worker*/ ) override
1556 {}
1557 public:
simple_observer(tbb::task_arena & a,int maxConcurrency,int numReservedSlots)1558 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
1559 : tbb::task_scheduler_observer(a), my_idx(idx_counter++)
1560 , myMaxConcurrency(maxConcurrency)
1561 , myNumReservedSlots(numReservedSlots) {
1562 observe(true);
1563 }
1564
~simple_observer()1565 ~simple_observer(){
1566 observe(false);
1567 }
1568
operator <(const simple_observer & lhs,const simple_observer & rhs)1569 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) {
1570 return lhs.my_idx < rhs.my_idx;
1571 }
1572 };
1573
1574 std::atomic<int> simple_observer::idx_counter{};
1575
1576 struct arena_handler {
1577 enum arena_status {
1578 alive,
1579 deleting,
1580 deleted
1581 };
1582
1583 tbb::task_arena* arena;
1584
1585 std::atomic<arena_status> status{alive};
1586 tbb::spin_rw_mutex arena_in_use{};
1587
1588 tbb::concurrent_set<simple_observer> observers;
1589
arena_handlerarena_handler1590 arena_handler(tbb::task_arena* ptr) : arena(ptr)
1591 {}
1592
operator <(const arena_handler & lhs,const arena_handler & rhs)1593 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) {
1594 return lhs.arena < rhs.arena;
1595 }
1596 };
1597
1598 // TODO: Add observer operations
StressTestMixFunctionality()1599 void StressTestMixFunctionality() {
1600 enum operation_type {
1601 create_arena,
1602 delete_arena,
1603 attach_observer,
1604 detach_observer,
1605 arena_execute,
1606 enqueue_task,
1607 last_operation_marker
1608 };
1609
1610 std::size_t operations_number = last_operation_marker;
1611 std::size_t thread_number = utils::get_platform_max_threads();
1612 utils::FastRandom<> operation_rnd(42);
1613 tbb::spin_mutex random_operation_guard;
1614
1615 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () {
1616 tbb::spin_mutex::scoped_lock lock(random_operation_guard);
1617 return static_cast<operation_type>(operation_rnd.get() % operations_number);
1618 };
1619
1620 utils::FastRandom<> arena_rnd(42);
1621 tbb::spin_mutex random_arena_guard;
1622 auto get_random_arena = [&arena_rnd, &random_arena_guard] () {
1623 tbb::spin_mutex::scoped_lock lock(random_arena_guard);
1624 return arena_rnd.get();
1625 };
1626
1627 tbb::concurrent_set<arena_handler> arenas_pool;
1628
1629 std::vector<std::thread> thread_pool;
1630
1631 utils::SpinBarrier thread_barrier(thread_number);
1632 std::size_t max_operations = 20000;
1633 std::atomic<std::size_t> curr_operation{};
1634
1635 auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) {
1636 for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) {
1637 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1638 if (curr_arena->status == arena_handler::alive) {
1639 return curr_arena;
1640 }
1641 else {
1642 lock.release();
1643 }
1644 }
1645 }
1646 return arenas_pool.end();
1647 };
1648
1649 auto thread_func = [&] () {
1650 arenas_pool.emplace(new tbb::task_arena());
1651 thread_barrier.wait();
1652 while (curr_operation++ < max_operations) {
1653 switch (get_random_operation()) {
1654 case create_arena :
1655 {
1656 arenas_pool.emplace(new tbb::task_arena());
1657 break;
1658 }
1659 case delete_arena :
1660 {
1661 auto curr_arena = arenas_pool.begin();
1662 for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1663 arena_handler::arena_status curr_status = arena_handler::alive;
1664 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) {
1665 break;
1666 }
1667 }
1668
1669 if (curr_arena == arenas_pool.end()) break;
1670
1671 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true);
1672
1673 delete curr_arena->arena;
1674 curr_arena->status.store(arena_handler::deleted);
1675
1676 break;
1677 }
1678 case attach_observer :
1679 {
1680 tbb::spin_rw_mutex::scoped_lock lock{};
1681
1682 auto curr_arena = find_arena(lock);
1683 if (curr_arena != arenas_pool.end()) {
1684 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1);
1685 }
1686 break;
1687 }
1688 case detach_observer:
1689 {
1690 auto arena_number = get_random_arena() % arenas_pool.size();
1691 auto curr_arena = arenas_pool.begin();
1692 std::advance(curr_arena, arena_number);
1693
1694 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) {
1695 if (it->is_observing()) {
1696 it->observe(false);
1697 break;
1698 }
1699 }
1700
1701 break;
1702 }
1703 case arena_execute:
1704 {
1705 tbb::spin_rw_mutex::scoped_lock lock{};
1706 auto curr_arena = find_arena(lock);
1707
1708 if (curr_arena != arenas_pool.end()) {
1709 curr_arena->arena->execute([]() {
1710 tbb::affinity_partitioner aff;
1711 tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{});
1712 tbb::parallel_for(0, 10000, utils::DummyBody(10), aff);
1713 });
1714 }
1715
1716 break;
1717 }
1718 case enqueue_task:
1719 {
1720 tbb::spin_rw_mutex::scoped_lock lock{};
1721 auto curr_arena = find_arena(lock);
1722
1723 if (curr_arena != arenas_pool.end()) {
1724 curr_arena->arena->enqueue([] { utils::doDummyWork(1000); });
1725 }
1726
1727 break;
1728 }
1729 case last_operation_marker :
1730 break;
1731 }
1732 }
1733 };
1734
1735 for (std::size_t i = 0; i < thread_number - 1; ++i) {
1736 thread_pool.emplace_back(thread_func);
1737 }
1738
1739 thread_func();
1740
1741 for (std::size_t i = 0; i < thread_number - 1; ++i) {
1742 if (thread_pool[i].joinable()) thread_pool[i].join();
1743 }
1744
1745 for (auto& handler : arenas_pool) {
1746 if (handler.status != arena_handler::deleted) delete handler.arena;
1747 }
1748 }
1749
1750 struct enqueue_test_helper {
enqueue_test_helperenqueue_test_helper1751 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter)
1752 : my_arena(arena), my_ets(ets), my_task_counter(task_counter)
1753 {}
1754
enqueue_test_helperenqueue_test_helper1755 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter)
1756 {}
1757
operator ()enqueue_test_helper1758 void operator() () const {
1759 CHECK(my_ets.local());
1760 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter));
1761 utils::yield();
1762 }
1763
1764 tbb::task_arena& my_arena;
1765 tbb::enumerable_thread_specific<bool>& my_ets;
1766 std::atomic<std::size_t>& my_task_counter;
1767 };
1768
test_threads_sleep(int concurrency,int reserved_slots,int num_external_threads)1769 void test_threads_sleep(int concurrency, int reserved_slots, int num_external_threads) {
1770 tbb::task_arena a(concurrency, reserved_slots);
1771 std::mutex m;
1772 std::condition_variable cond_var;
1773 bool completed{ false };
1774 utils::SpinBarrier barrier( concurrency - reserved_slots + 1 );
1775
1776 auto body = [&] {
1777 std::unique_lock<std::mutex> lock(m);
1778 cond_var.wait(lock, [&] { return completed == true; });
1779 };
1780
1781 for (int i = 0; i < concurrency - reserved_slots; ++i) {
1782 a.enqueue([&] {
1783 body();
1784 barrier.signalNoWait();
1785 });
1786 }
1787 std::vector<std::thread> threads;
1788 for (int i = 0; i < num_external_threads; ++i) {
1789 threads.emplace_back([&]() { a.execute(body); });
1790 }
1791 TestCPUUserTime(concurrency);
1792
1793 {
1794 std::lock_guard<std::mutex> lock(m);
1795 completed = true;
1796 cond_var.notify_all();
1797 }
1798 for (auto& t : threads) {
1799 t.join();
1800 }
1801 barrier.wait();
1802 }
1803
test_threads_sleep(int concurrency,int reserved_slots)1804 void test_threads_sleep(int concurrency, int reserved_slots) {
1805 test_threads_sleep(concurrency, reserved_slots, reserved_slots);
1806 test_threads_sleep(concurrency, reserved_slots, 2 * concurrency);
1807 }
1808
1809 //--------------------------------------------------//
1810
1811 // This test requires TBB in an uninitialized state
1812 //! \brief \ref requirement
1813 TEST_CASE("task_arena initialize soft limit ignoring affinity mask") {
1814 REQUIRE_MESSAGE((tbb::this_task_arena::current_thread_index() == tbb::task_arena::not_initialized), "TBB was initialized state");
1815 tbb::enumerable_thread_specific<int> ets;
1816
1817 tbb::task_arena arena(int(utils::get_platform_max_threads() * 2));
__anon1d1198c31c02null1818 arena.execute([&ets] {
1819 tbb::parallel_for(0, 10000000, [&ets](int){
1820 ets.local() = 1;
1821 utils::doDummyWork(100);
1822 });
1823 });
1824
1825 CHECK(ets.combine(std::plus<int>{}) <= int(utils::get_platform_max_threads()));
1826 }
1827
1828 //! Test for task arena in concurrent cases
1829 //! \brief \ref requirement
1830 TEST_CASE("Test for concurrent functionality") {
1831 TestConcurrentFunctionality();
1832 }
1833
1834 #if !EMSCRIPTEN
1835 //! For emscripten, FPU control state has not been set correctly
1836 //! Test for arena entry consistency
1837 //! \brief \ref requirement \ref error_guessing
1838 TEST_CASE("Test for task arena entry consistency") {
1839 TestArenaEntryConsistency();
1840 }
1841 #endif
1842
1843 //! Test for task arena attach functionality
1844 //! \brief \ref requirement \ref interface
1845 TEST_CASE("Test for the attach functionality") {
1846 TestAttach(4);
1847 }
1848
1849 //! Test for constant functor requirements
1850 //! \brief \ref requirement \ref interface
1851 TEST_CASE("Test for constant functor requirement") {
1852 TestConstantFunctorRequirement();
1853 }
1854
1855 //! Test for move semantics support
1856 //! \brief \ref requirement \ref interface
1857 TEST_CASE("Move semantics support") {
1858 TestMoveSemantics();
1859 }
1860
1861 //! Test for different return value types
1862 //! \brief \ref requirement \ref interface
1863 TEST_CASE("Return value test") {
1864 TestReturnValue();
1865 }
1866
1867 //! Test for delegated task spawn in case of unsuccessful slot attach
1868 //! \brief \ref error_guessing
1869 TEST_CASE("Delegated spawn wait") {
1870 TestDelegatedSpawnWait();
1871 }
1872
1873 #if !EMSCRIPTEN
1874 //! For emscripten, FPU control state has not been set correctly
1875 //! Test task arena isolation functionality
1876 //! \brief \ref requirement \ref interface
1877 TEST_CASE("Isolated execute") {
1878 // Isolation tests cases is valid only for more then 2 threads
1879 if (tbb::this_task_arena::max_concurrency() > 2) {
1880 TestIsolatedExecute();
1881 }
1882 }
1883 #endif
1884
1885 //! Test for TBB Workers creation limits
1886 //! \brief \ref requirement
1887 TEST_CASE("Default workers limit") {
1888 TestDefaultWorkersLimit();
1889 }
1890
1891 //! Test for workers migration between arenas
1892 //! \brief \ref error_guessing \ref stress
1893 TEST_CASE("Arena workers migration") {
1894 TestArenaWorkersMigration();
1895 }
1896
1897 #if !EMSCRIPTEN
1898 //! For emscripten, FPU control state has not been set correctly
1899 //! Test for multiple waits, threads should not block each other
1900 //! \brief \ref requirement
1901 TEST_CASE("Multiple waits") {
1902 TestMultipleWaits();
1903 }
1904 #endif
1905
1906 //! Test for small stack size settings and arena initialization
1907 //! \brief \ref error_guessing
1908 TEST_CASE("Small stack size") {
1909 TestSmallStackSize();
1910 }
1911
1912 #if TBB_USE_EXCEPTIONS
1913 //! \brief \ref requirement \ref stress
1914 TEST_CASE("Test for exceptions during execute.") {
1915 ExceptionInExecute();
1916 }
1917
1918 //! \brief \ref error_guessing
1919 TEST_CASE("Exception thrown during tbb::task_arena::execute call") {
1920 struct throwing_obj {
throwing_objthrowing_obj1921 throwing_obj() {
1922 volatile bool flag = true;
1923 if (flag) throw std::exception{};
1924 }
1925 throwing_obj(const throwing_obj&) = default;
~throwing_objthrowing_obj1926 ~throwing_obj() { FAIL("An destructor was called."); }
1927 };
1928
1929 tbb::task_arena arena;
1930
__anon1d1198c31e02null1931 REQUIRE_THROWS_AS( [&] {
1932 arena.execute([] {
1933 return throwing_obj{};
1934 });
1935 }(), std::exception );
1936 }
1937 #endif // TBB_USE_EXCEPTIONS
1938
1939 //! \brief \ref stress
1940 TEST_CASE("Stress test with mixing functionality") {
1941 StressTestMixFunctionality();
1942 }
1943
1944 //! \brief \ref stress
1945 TEST_CASE("Workers oversubscription") {
1946 std::size_t num_threads = utils::get_platform_max_threads();
1947 tbb::enumerable_thread_specific<bool> ets;
1948 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2);
1949 tbb::task_arena arena(static_cast<int>(num_threads) * 2);
1950
1951 utils::SpinBarrier barrier(num_threads * 2);
1952
__anon1d1198c32002null1953 arena.execute([&] {
1954 tbb::parallel_for(std::size_t(0), num_threads * 2,
1955 [&] (const std::size_t&) {
1956 ets.local() = true;
1957 barrier.wait();
1958 }
1959 );
1960 });
1961
1962 utils::yield();
1963
1964 std::atomic<std::size_t> task_counter{0};
1965 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) {
1966 arena.enqueue(enqueue_test_helper(arena, ets, task_counter));
1967 }
1968
1969 while (task_counter < 100000) utils::yield();
1970
__anon1d1198c32202null1971 arena.execute([&] {
1972 tbb::parallel_for(std::size_t(0), num_threads * 2,
1973 [&] (const std::size_t&) {
1974 CHECK(ets.local());
1975 barrier.wait();
1976 }
1977 );
1978 });
1979 }
1980
1981 #if TBB_USE_EXCEPTIONS
1982 //! The test for error in scheduling empty task_handle
1983 //! \brief \ref requirement
1984 TEST_CASE("Empty task_handle cannot be scheduled"
should_fail()1985 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions
1986 * doctest::skip() //skip the test for now, to not pollute the test log
1987 ){
1988 tbb::task_arena ta;
1989
1990 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1991 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1992 }
1993 #endif
1994
1995 #if !EMSCRIPTEN
1996 //! For emscripten, FPU control state has not been set correctly
1997 //! \brief \ref error_guessing
1998 TEST_CASE("Test threads sleep") {
1999 for (auto concurrency_level : utils::concurrency_range()) {
2000 int conc = int(concurrency_level);
2001 test_threads_sleep(conc, 0);
2002 test_threads_sleep(conc, 1);
2003 test_threads_sleep(conc, conc/2);
2004 test_threads_sleep(conc, conc);
2005 }
2006 }
2007 #endif
2008
2009 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2010
2011 //! Basic test for is_inside_task in task_group
2012 //! \brief \ref interface \ref requirement
2013 TEST_CASE("is_inside_task in task_group"){
2014 CHECK( false == tbb::is_inside_task());
2015
2016 tbb::task_group tg;
__anon1d1198c32402null2017 tg.run_and_wait([&]{
2018 CHECK( true == tbb::is_inside_task());
2019 });
2020 }
2021
2022 //! Basic test for is_inside_task in arena::execute
2023 //! \brief \ref interface \ref requirement
2024 TEST_CASE("is_inside_task in arena::execute"){
2025 CHECK( false == tbb::is_inside_task());
2026
2027 tbb::task_arena arena;
2028
__anon1d1198c32502null2029 arena.execute([&]{
2030 // The execute method is processed outside of any task
2031 CHECK( false == tbb::is_inside_task());
2032 });
2033 }
2034
2035 //! The test for is_inside_task in arena::execute when inside other task
2036 //! \brief \ref error_guessing
2037 TEST_CASE("is_inside_task in arena::execute") {
2038 CHECK(false == tbb::is_inside_task());
2039
2040 tbb::task_arena arena;
2041 tbb::task_group tg;
__anon1d1198c32602null2042 tg.run_and_wait([&] {
2043 arena.execute([&] {
2044 // The execute method is processed outside of any task
2045 CHECK(false == tbb::is_inside_task());
2046 });
2047 });
2048 }
2049 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2050
2051 //! \brief \ref interface \ref requirement \ref regression
2052 TEST_CASE("worker threads occupy slots in correct range") {
2053 std::vector<tbb::task_arena> arenas(42);
2054 for (auto& arena : arenas) {
2055 arena.initialize(1, 0);
2056 }
2057
2058 std::atomic<int> counter{0};
2059 for (auto& arena : arenas) {
__anon1d1198c32802null2060 arena.enqueue([&] {
2061 CHECK(tbb::this_task_arena::current_thread_index() == 0);
2062 ++counter;
2063 });
2064 }
2065
2066 while (counter < 42) { utils::yield(); }
2067 }
2068