151c0b2f7Stbbdev /*
2*c21e688aSSergey Zheltov Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev //! \file test_tbb_fork.cpp
1851c0b2f7Stbbdev //! \brief Test for [sched.global_control] specification
1951c0b2f7Stbbdev
2051c0b2f7Stbbdev #include "tbb/global_control.h"
2151c0b2f7Stbbdev #include "tbb/blocked_range.h"
2251c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h"
2351c0b2f7Stbbdev #include "tbb/parallel_for.h"
2451c0b2f7Stbbdev
2551c0b2f7Stbbdev static const int MinThread = 1;
2651c0b2f7Stbbdev static const int MaxThread = 4;
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev // Doctest is not used here, but placed just to prevent compiler errors for bad headers design
2951c0b2f7Stbbdev #define DOCTEST_CONFIG_IMPLEMENT
3049e08aacStbbdev #include "common/test.h"
3151c0b2f7Stbbdev
3251c0b2f7Stbbdev #include "common/utils.h"
3351c0b2f7Stbbdev #include "common/utils_assert.h"
3451c0b2f7Stbbdev
3551c0b2f7Stbbdev #if _WIN32||_WIN64
3651c0b2f7Stbbdev #include "tbb/concurrent_hash_map.h"
3751c0b2f7Stbbdev
getCurrentThreadHandle()3851c0b2f7Stbbdev HANDLE getCurrentThreadHandle()
3951c0b2f7Stbbdev {
4051c0b2f7Stbbdev HANDLE hProc = GetCurrentProcess(), hThr = INVALID_HANDLE_VALUE;
4151c0b2f7Stbbdev #if TBB_USE_ASSERT
4251c0b2f7Stbbdev BOOL res =
4351c0b2f7Stbbdev #endif
4451c0b2f7Stbbdev DuplicateHandle( hProc, GetCurrentThread(), hProc, &hThr, 0, FALSE, DUPLICATE_SAME_ACCESS );
4551c0b2f7Stbbdev __TBB_ASSERT( res, "Retrieving current thread handle failed" );
4651c0b2f7Stbbdev return hThr;
4751c0b2f7Stbbdev }
4851c0b2f7Stbbdev
threadTerminated(HANDLE h)4951c0b2f7Stbbdev bool threadTerminated(HANDLE h)
5051c0b2f7Stbbdev {
5151c0b2f7Stbbdev DWORD ret = WaitForSingleObjectEx(h, 0, FALSE);
5251c0b2f7Stbbdev return WAIT_OBJECT_0 == ret;
5351c0b2f7Stbbdev }
5451c0b2f7Stbbdev
5551c0b2f7Stbbdev struct Data {
5651c0b2f7Stbbdev HANDLE h;
5751c0b2f7Stbbdev };
5851c0b2f7Stbbdev
5951c0b2f7Stbbdev typedef tbb::concurrent_hash_map<DWORD, Data> TidTableType;
6051c0b2f7Stbbdev
6151c0b2f7Stbbdev static TidTableType tidTable;
6251c0b2f7Stbbdev
6351c0b2f7Stbbdev #else
6451c0b2f7Stbbdev
6551c0b2f7Stbbdev #if __sun || __SUNPRO_CC
6651c0b2f7Stbbdev #define _POSIX_PTHREAD_SEMANTICS 1 // to get standard-conforming sigwait(2)
6751c0b2f7Stbbdev #endif
6851c0b2f7Stbbdev #include <signal.h>
6951c0b2f7Stbbdev #include <sys/types.h>
7051c0b2f7Stbbdev #include <unistd.h>
7151c0b2f7Stbbdev #include <sys/wait.h>
7251c0b2f7Stbbdev #include <sched.h>
7351c0b2f7Stbbdev
7451c0b2f7Stbbdev #include "tbb/tick_count.h"
7551c0b2f7Stbbdev
SigHandler(int)7651c0b2f7Stbbdev void SigHandler(int) { }
7751c0b2f7Stbbdev
7851c0b2f7Stbbdev #endif // _WIN32||_WIN64
7951c0b2f7Stbbdev
8051c0b2f7Stbbdev class AllocTask {
8151c0b2f7Stbbdev public:
operator ()(const tbb::blocked_range<int> & r) const8251c0b2f7Stbbdev void operator() (const tbb::blocked_range<int> &r) const {
8351c0b2f7Stbbdev #if _WIN32||_WIN64
8451c0b2f7Stbbdev HANDLE h = getCurrentThreadHandle();
8551c0b2f7Stbbdev DWORD tid = GetCurrentThreadId();
8651c0b2f7Stbbdev {
8751c0b2f7Stbbdev TidTableType::accessor acc;
8851c0b2f7Stbbdev if (tidTable.insert(acc, tid)) {
8951c0b2f7Stbbdev acc->second.h = h;
9051c0b2f7Stbbdev }
9151c0b2f7Stbbdev }
9251c0b2f7Stbbdev #endif
9351c0b2f7Stbbdev for (int y = r.begin(); y != r.end(); ++y) {
9451c0b2f7Stbbdev void *p = tbb::detail::r1::cache_aligned_allocate(7000);
9551c0b2f7Stbbdev tbb::detail::r1::cache_aligned_deallocate(p);
9651c0b2f7Stbbdev }
9751c0b2f7Stbbdev }
AllocTask()9851c0b2f7Stbbdev AllocTask() {}
9951c0b2f7Stbbdev };
10051c0b2f7Stbbdev
CallParallelFor()10151c0b2f7Stbbdev void CallParallelFor()
10251c0b2f7Stbbdev {
10351c0b2f7Stbbdev tbb::parallel_for(tbb::blocked_range<int>(0, 10000, 1), AllocTask(),
10451c0b2f7Stbbdev tbb::simple_partitioner());
10551c0b2f7Stbbdev }
10651c0b2f7Stbbdev
10751c0b2f7Stbbdev /* Regression test against data race between termination of workers
10851c0b2f7Stbbdev and setting blocking termination mode in main thread. */
10951c0b2f7Stbbdev class RunWorkersBody : utils::NoAssign {
11051c0b2f7Stbbdev bool wait_workers;
11151c0b2f7Stbbdev public:
RunWorkersBody(bool waitWorkers)11251c0b2f7Stbbdev RunWorkersBody(bool waitWorkers) : wait_workers(waitWorkers) {}
operator ()(const int) const11351c0b2f7Stbbdev void operator()(const int /*threadID*/) const {
1145fc0a5f6SAlex tbb::task_scheduler_handle tsi{tbb::attach{}};
11551c0b2f7Stbbdev CallParallelFor();
11651c0b2f7Stbbdev if (wait_workers) {
11751c0b2f7Stbbdev bool ok = tbb::finalize(tsi, std::nothrow);
11857f524caSIlya Isaev ASSERT(ok, nullptr);
11951c0b2f7Stbbdev } else {
1205fc0a5f6SAlex tsi.release();
12151c0b2f7Stbbdev }
12251c0b2f7Stbbdev }
12351c0b2f7Stbbdev };
12451c0b2f7Stbbdev
TestBlockNonblock()12551c0b2f7Stbbdev void TestBlockNonblock()
12651c0b2f7Stbbdev {
12751c0b2f7Stbbdev for (int i=0; i<100; i++) {
12851c0b2f7Stbbdev utils::NativeParallelFor(4, RunWorkersBody(/*wait_workers=*/false));
12951c0b2f7Stbbdev RunWorkersBody(/*wait_workers=*/true)(0);
13051c0b2f7Stbbdev }
13151c0b2f7Stbbdev }
13251c0b2f7Stbbdev
13351c0b2f7Stbbdev class RunInNativeThread : utils::NoAssign {
13451c0b2f7Stbbdev bool blocking;
13551c0b2f7Stbbdev
13651c0b2f7Stbbdev public:
RunInNativeThread(bool blocking_)13751c0b2f7Stbbdev RunInNativeThread(bool blocking_) : blocking(blocking_) {}
operator ()(const int) const13851c0b2f7Stbbdev void operator()(const int /*threadID*/) const {
1395fc0a5f6SAlex tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle{tbb::attach{}};
14051c0b2f7Stbbdev CallParallelFor();
14151c0b2f7Stbbdev if (blocking) {
14251c0b2f7Stbbdev bool ok = tbb::finalize(tsi, std::nothrow);
14351c0b2f7Stbbdev ASSERT(!ok, "Nested blocking terminate must fail.");
14451c0b2f7Stbbdev } else {
1455fc0a5f6SAlex tsi.release();
14651c0b2f7Stbbdev }
14751c0b2f7Stbbdev }
14851c0b2f7Stbbdev };
14951c0b2f7Stbbdev
TestTasksInThread()15051c0b2f7Stbbdev void TestTasksInThread()
15151c0b2f7Stbbdev {
1525fc0a5f6SAlex tbb::task_scheduler_handle sch{tbb::attach{}};
15351c0b2f7Stbbdev CallParallelFor();
15451c0b2f7Stbbdev utils::NativeParallelFor(2, RunInNativeThread(/*blocking=*/false));
15551c0b2f7Stbbdev bool ok = tbb::finalize(sch, std::nothrow);
15657f524caSIlya Isaev ASSERT(ok, nullptr);
15751c0b2f7Stbbdev }
15851c0b2f7Stbbdev
15951c0b2f7Stbbdev #if TBB_REVAMP_TODO
16051c0b2f7Stbbdev
16151c0b2f7Stbbdev #include "common/memory_usage.h"
16251c0b2f7Stbbdev
16351c0b2f7Stbbdev // check for memory leak during TBB task scheduler init/terminate life cycle
16451c0b2f7Stbbdev // TODO: move to test_task_scheduler_init after workers waiting productization
TestSchedulerMemLeaks()16551c0b2f7Stbbdev void TestSchedulerMemLeaks()
16651c0b2f7Stbbdev {
16751c0b2f7Stbbdev const int ITERS = 10;
16851c0b2f7Stbbdev int it;
16951c0b2f7Stbbdev
17051c0b2f7Stbbdev for (it=0; it<ITERS; it++) {
17151c0b2f7Stbbdev size_t memBefore = utils::GetMemoryUsage();
17251c0b2f7Stbbdev #if _MSC_VER && _DEBUG
17351c0b2f7Stbbdev // _CrtMemCheckpoint() and _CrtMemDifference are non-empty only in _DEBUG
17451c0b2f7Stbbdev _CrtMemState stateBefore, stateAfter, diffState;
17551c0b2f7Stbbdev _CrtMemCheckpoint(&stateBefore);
17651c0b2f7Stbbdev #endif
17751c0b2f7Stbbdev for (int i=0; i<100; i++) {
17851c0b2f7Stbbdev tbb::task_arena arena(1, 1); arena.initialize(); // right approach?
17951c0b2f7Stbbdev // tbb::task_scheduler_init sch(1);
18051c0b2f7Stbbdev tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get();
18151c0b2f7Stbbdev for (int k=0; k<10; k++) {
18251c0b2f7Stbbdev // tbb::empty_task *t = new( tbb::task::allocate_root() ) tbb::empty_task();
18351c0b2f7Stbbdev // tbb::task::enqueue(*t);
18451c0b2f7Stbbdev arena.enqueue([&]{});
18551c0b2f7Stbbdev }
18651c0b2f7Stbbdev bool ok = tbb::finalize(sch, std::nothrow);
18757f524caSIlya Isaev ASSERT(ok, nullptr);
18851c0b2f7Stbbdev }
18951c0b2f7Stbbdev #if _MSC_VER && _DEBUG
19051c0b2f7Stbbdev _CrtMemCheckpoint(&stateAfter);
19151c0b2f7Stbbdev int ret = _CrtMemDifference(&diffState, &stateBefore, &stateAfter);
19251c0b2f7Stbbdev ASSERT(!ret, "It must be no memory leaks at this point.");
19351c0b2f7Stbbdev #endif
19451c0b2f7Stbbdev if (utils::GetMemoryUsage() <= memBefore)
19551c0b2f7Stbbdev break;
19651c0b2f7Stbbdev }
19751c0b2f7Stbbdev ASSERT(it < ITERS, "Memory consumption has not stabilized. Memory Leak?");
19851c0b2f7Stbbdev }
19951c0b2f7Stbbdev
20051c0b2f7Stbbdev #endif // TBB_REVAMP_TODO
20151c0b2f7Stbbdev
TestNestingTSI()20251c0b2f7Stbbdev void TestNestingTSI()
20351c0b2f7Stbbdev {
20451c0b2f7Stbbdev // nesting with and without blocking is possible
20551c0b2f7Stbbdev for (int i=0; i<2; i++) {
2065fc0a5f6SAlex tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle{tbb::attach{}};
20751c0b2f7Stbbdev CallParallelFor();
2085fc0a5f6SAlex tbb::task_scheduler_handle schBlock1;
2095fc0a5f6SAlex schBlock1 = tbb::task_scheduler_handle{tbb::attach{}};
21051c0b2f7Stbbdev CallParallelFor();
21151c0b2f7Stbbdev if (i) {
2125fc0a5f6SAlex schBlock1.release();
21351c0b2f7Stbbdev } else {
21451c0b2f7Stbbdev bool ok = tbb::finalize(schBlock1, std::nothrow);
21551c0b2f7Stbbdev ASSERT(!ok, "Nested blocking terminate must fail.");
21651c0b2f7Stbbdev }
21751c0b2f7Stbbdev bool ok = tbb::finalize(schBlock, std::nothrow);
21857f524caSIlya Isaev ASSERT(ok, nullptr);
21951c0b2f7Stbbdev }
22051c0b2f7Stbbdev {
2215fc0a5f6SAlex tbb::task_scheduler_handle schBlock{tbb::attach{}};
22251c0b2f7Stbbdev utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
22351c0b2f7Stbbdev bool ok = tbb::finalize(schBlock, std::nothrow);
22457f524caSIlya Isaev ASSERT(ok, nullptr);
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev }
22751c0b2f7Stbbdev
TestAutoInit()22851c0b2f7Stbbdev void TestAutoInit()
22951c0b2f7Stbbdev {
23051c0b2f7Stbbdev CallParallelFor(); // autoinit
23151c0b2f7Stbbdev // creation of blocking scheduler is possible, but one is not block
23251c0b2f7Stbbdev utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
23351c0b2f7Stbbdev }
23451c0b2f7Stbbdev
main()23551c0b2f7Stbbdev int main()
23651c0b2f7Stbbdev {
23751c0b2f7Stbbdev TestNestingTSI();
23851c0b2f7Stbbdev TestBlockNonblock();
23951c0b2f7Stbbdev TestTasksInThread();
24051c0b2f7Stbbdev
24151c0b2f7Stbbdev #if TBB_REVAMP_TODO
24251c0b2f7Stbbdev TestSchedulerMemLeaks();
24351c0b2f7Stbbdev #endif
24451c0b2f7Stbbdev
24551c0b2f7Stbbdev bool child = false;
24651c0b2f7Stbbdev #if _WIN32||_WIN64
24751c0b2f7Stbbdev DWORD masterTid = GetCurrentThreadId();
24851c0b2f7Stbbdev #else
24951c0b2f7Stbbdev struct sigaction sa;
25051c0b2f7Stbbdev sigset_t sig_set;
25151c0b2f7Stbbdev
25251c0b2f7Stbbdev sigemptyset(&sa.sa_mask);
25351c0b2f7Stbbdev sa.sa_flags = 0;
25451c0b2f7Stbbdev sa.sa_handler = SigHandler;
25557f524caSIlya Isaev if (sigaction(SIGCHLD, &sa, nullptr))
25651c0b2f7Stbbdev ASSERT(0, "sigaction failed");
25757f524caSIlya Isaev if (sigaction(SIGALRM, &sa, nullptr))
25851c0b2f7Stbbdev ASSERT(0, "sigaction failed");
25951c0b2f7Stbbdev // block SIGCHLD and SIGALRM, the mask is inherited by worker threads
26051c0b2f7Stbbdev sigemptyset(&sig_set);
26151c0b2f7Stbbdev sigaddset(&sig_set, SIGCHLD);
26251c0b2f7Stbbdev sigaddset(&sig_set, SIGALRM);
26357f524caSIlya Isaev if (pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
26451c0b2f7Stbbdev ASSERT(0, "pthread_sigmask failed");
26551c0b2f7Stbbdev #endif
26651c0b2f7Stbbdev utils::suppress_unused_warning(child);
26751c0b2f7Stbbdev for (int threads=MinThread; threads<=MaxThread; threads+=MinThread) {
26851c0b2f7Stbbdev for (int i=0; i<20; i++) {
26951c0b2f7Stbbdev tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, threads);
27051c0b2f7Stbbdev {
2715fc0a5f6SAlex tbb::task_scheduler_handle sch{tbb::attach{}};
27251c0b2f7Stbbdev bool ok = tbb::finalize( sch, std::nothrow );
27357f524caSIlya Isaev ASSERT(ok, nullptr);
27451c0b2f7Stbbdev }
2755fc0a5f6SAlex tbb::task_scheduler_handle sch{tbb::attach{}};
27651c0b2f7Stbbdev CallParallelFor();
27751c0b2f7Stbbdev bool ok = tbb::finalize( sch, std::nothrow );
27857f524caSIlya Isaev ASSERT(ok, nullptr);
27951c0b2f7Stbbdev #if _WIN32||_WIN64
28051c0b2f7Stbbdev // check that there is no alive threads after terminate()
28151c0b2f7Stbbdev for (TidTableType::const_iterator it = tidTable.begin();
28251c0b2f7Stbbdev it != tidTable.end(); ++it) {
28351c0b2f7Stbbdev if (masterTid != it->first) {
28457f524caSIlya Isaev ASSERT(threadTerminated(it->second.h), nullptr);
28551c0b2f7Stbbdev }
28651c0b2f7Stbbdev }
28751c0b2f7Stbbdev tidTable.clear();
28851c0b2f7Stbbdev #else // _WIN32||_WIN64
28951c0b2f7Stbbdev if (child)
29051c0b2f7Stbbdev exit(0);
29151c0b2f7Stbbdev else {
29251c0b2f7Stbbdev pid_t pid = fork();
29351c0b2f7Stbbdev if (!pid) {
29451c0b2f7Stbbdev i = -1;
29551c0b2f7Stbbdev child = true;
29651c0b2f7Stbbdev } else {
29751c0b2f7Stbbdev int sig;
29851c0b2f7Stbbdev pid_t w_ret = 0;
29951c0b2f7Stbbdev // wait for SIGCHLD up to timeout
30051c0b2f7Stbbdev alarm(30);
30151c0b2f7Stbbdev if (0 != sigwait(&sig_set, &sig))
30251c0b2f7Stbbdev ASSERT(0, "sigwait failed");
30351c0b2f7Stbbdev alarm(0);
30457f524caSIlya Isaev w_ret = waitpid(pid, nullptr, WNOHANG);
30551c0b2f7Stbbdev ASSERT(w_ret>=0, "waitpid failed");
30651c0b2f7Stbbdev if (!w_ret) {
30757f524caSIlya Isaev ASSERT(!kill(pid, SIGKILL), nullptr);
30857f524caSIlya Isaev w_ret = waitpid(pid, nullptr, 0);
30951c0b2f7Stbbdev ASSERT(w_ret!=-1, "waitpid failed");
31051c0b2f7Stbbdev
31151c0b2f7Stbbdev ASSERT(0, "Hang after fork");
31251c0b2f7Stbbdev }
31351c0b2f7Stbbdev // clean pending signals (if any occurs since sigwait)
31451c0b2f7Stbbdev sigset_t p_mask;
31551c0b2f7Stbbdev for (;;) {
31651c0b2f7Stbbdev sigemptyset(&p_mask);
31751c0b2f7Stbbdev sigpending(&p_mask);
31851c0b2f7Stbbdev if (sigismember(&p_mask, SIGALRM)
31951c0b2f7Stbbdev || sigismember(&p_mask, SIGCHLD)) {
32051c0b2f7Stbbdev if (0 != sigwait(&p_mask, &sig))
32151c0b2f7Stbbdev ASSERT(0, "sigwait failed");
32251c0b2f7Stbbdev } else
32351c0b2f7Stbbdev break;
32451c0b2f7Stbbdev }
32551c0b2f7Stbbdev }
32651c0b2f7Stbbdev }
32751c0b2f7Stbbdev #endif // _WIN32||_WIN64
32851c0b2f7Stbbdev }
32951c0b2f7Stbbdev }
33051c0b2f7Stbbdev // auto initialization at this point
33151c0b2f7Stbbdev TestAutoInit();
33251c0b2f7Stbbdev
33351c0b2f7Stbbdev return 0;
33451c0b2f7Stbbdev }
335