1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 //! \file test_tbb_fork.cpp
18 //! \brief Test for [sched.global_control] specification
19
20 #include "tbb/global_control.h"
21 #include "tbb/blocked_range.h"
22 #include "tbb/cache_aligned_allocator.h"
23 #include "tbb/parallel_for.h"
24
25 static const int MinThread = 1;
26 static const int MaxThread = 4;
27
28 // Doctest is not used here, but placed just to prevent compiler errors for bad headers design
29 #define DOCTEST_CONFIG_IMPLEMENT
30 #include "common/test.h"
31
32 #include "common/utils.h"
33 #include "common/utils_assert.h"
34
35 #if _WIN32||_WIN64
36 #include "tbb/concurrent_hash_map.h"
37
getCurrentThreadHandle()38 HANDLE getCurrentThreadHandle()
39 {
40 HANDLE hProc = GetCurrentProcess(), hThr = INVALID_HANDLE_VALUE;
41 #if TBB_USE_ASSERT
42 BOOL res =
43 #endif
44 DuplicateHandle( hProc, GetCurrentThread(), hProc, &hThr, 0, FALSE, DUPLICATE_SAME_ACCESS );
45 __TBB_ASSERT( res, "Retrieving current thread handle failed" );
46 return hThr;
47 }
48
threadTerminated(HANDLE h)49 bool threadTerminated(HANDLE h)
50 {
51 DWORD ret = WaitForSingleObjectEx(h, 0, FALSE);
52 return WAIT_OBJECT_0 == ret;
53 }
54
55 struct Data {
56 HANDLE h;
57 };
58
59 typedef tbb::concurrent_hash_map<DWORD, Data> TidTableType;
60
61 static TidTableType tidTable;
62
63 #else
64
65 #if __sun || __SUNPRO_CC
66 #define _POSIX_PTHREAD_SEMANTICS 1 // to get standard-conforming sigwait(2)
67 #endif
68 #include <signal.h>
69 #include <sys/types.h>
70 #include <unistd.h>
71 #include <sys/wait.h>
72 #include <sched.h>
73
74 #include "tbb/tick_count.h"
75
SigHandler(int)76 void SigHandler(int) { }
77
78 #endif // _WIN32||_WIN64
79
80 class AllocTask {
81 public:
operator ()(const tbb::blocked_range<int> & r) const82 void operator() (const tbb::blocked_range<int> &r) const {
83 #if _WIN32||_WIN64
84 HANDLE h = getCurrentThreadHandle();
85 DWORD tid = GetCurrentThreadId();
86 {
87 TidTableType::accessor acc;
88 if (tidTable.insert(acc, tid)) {
89 acc->second.h = h;
90 }
91 }
92 #endif
93 for (int y = r.begin(); y != r.end(); ++y) {
94 void *p = tbb::detail::r1::cache_aligned_allocate(7000);
95 tbb::detail::r1::cache_aligned_deallocate(p);
96 }
97 }
AllocTask()98 AllocTask() {}
99 };
100
CallParallelFor()101 void CallParallelFor()
102 {
103 tbb::parallel_for(tbb::blocked_range<int>(0, 10000, 1), AllocTask(),
104 tbb::simple_partitioner());
105 }
106
107 /* Regression test against data race between termination of workers
108 and setting blocking termination mode in main thread. */
109 class RunWorkersBody : utils::NoAssign {
110 bool wait_workers;
111 public:
RunWorkersBody(bool waitWorkers)112 RunWorkersBody(bool waitWorkers) : wait_workers(waitWorkers) {}
operator ()(const int) const113 void operator()(const int /*threadID*/) const {
114 tbb::task_scheduler_handle tsi{tbb::attach{}};
115 CallParallelFor();
116 if (wait_workers) {
117 bool ok = tbb::finalize(tsi, std::nothrow);
118 ASSERT(ok, nullptr);
119 } else {
120 tsi.release();
121 }
122 }
123 };
124
TestBlockNonblock()125 void TestBlockNonblock()
126 {
127 for (int i=0; i<100; i++) {
128 utils::NativeParallelFor(4, RunWorkersBody(/*wait_workers=*/false));
129 RunWorkersBody(/*wait_workers=*/true)(0);
130 }
131 }
132
133 class RunInNativeThread : utils::NoAssign {
134 bool blocking;
135
136 public:
RunInNativeThread(bool blocking_)137 RunInNativeThread(bool blocking_) : blocking(blocking_) {}
operator ()(const int) const138 void operator()(const int /*threadID*/) const {
139 tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle{tbb::attach{}};
140 CallParallelFor();
141 if (blocking) {
142 bool ok = tbb::finalize(tsi, std::nothrow);
143 ASSERT(!ok, "Nested blocking terminate must fail.");
144 } else {
145 tsi.release();
146 }
147 }
148 };
149
TestTasksInThread()150 void TestTasksInThread()
151 {
152 tbb::task_scheduler_handle sch{tbb::attach{}};
153 CallParallelFor();
154 utils::NativeParallelFor(2, RunInNativeThread(/*blocking=*/false));
155 bool ok = tbb::finalize(sch, std::nothrow);
156 ASSERT(ok, nullptr);
157 }
158
159 #if TBB_REVAMP_TODO
160
161 #include "common/memory_usage.h"
162
163 // check for memory leak during TBB task scheduler init/terminate life cycle
164 // TODO: move to test_task_scheduler_init after workers waiting productization
TestSchedulerMemLeaks()165 void TestSchedulerMemLeaks()
166 {
167 const int ITERS = 10;
168 int it;
169
170 for (it=0; it<ITERS; it++) {
171 size_t memBefore = utils::GetMemoryUsage();
172 #if _MSC_VER && _DEBUG
173 // _CrtMemCheckpoint() and _CrtMemDifference are non-empty only in _DEBUG
174 _CrtMemState stateBefore, stateAfter, diffState;
175 _CrtMemCheckpoint(&stateBefore);
176 #endif
177 for (int i=0; i<100; i++) {
178 tbb::task_arena arena(1, 1); arena.initialize(); // right approach?
179 // tbb::task_scheduler_init sch(1);
180 tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get();
181 for (int k=0; k<10; k++) {
182 // tbb::empty_task *t = new( tbb::task::allocate_root() ) tbb::empty_task();
183 // tbb::task::enqueue(*t);
184 arena.enqueue([&]{});
185 }
186 bool ok = tbb::finalize(sch, std::nothrow);
187 ASSERT(ok, nullptr);
188 }
189 #if _MSC_VER && _DEBUG
190 _CrtMemCheckpoint(&stateAfter);
191 int ret = _CrtMemDifference(&diffState, &stateBefore, &stateAfter);
192 ASSERT(!ret, "It must be no memory leaks at this point.");
193 #endif
194 if (utils::GetMemoryUsage() <= memBefore)
195 break;
196 }
197 ASSERT(it < ITERS, "Memory consumption has not stabilized. Memory Leak?");
198 }
199
200 #endif // TBB_REVAMP_TODO
201
TestNestingTSI()202 void TestNestingTSI()
203 {
204 // nesting with and without blocking is possible
205 for (int i=0; i<2; i++) {
206 tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle{tbb::attach{}};
207 CallParallelFor();
208 tbb::task_scheduler_handle schBlock1;
209 schBlock1 = tbb::task_scheduler_handle{tbb::attach{}};
210 CallParallelFor();
211 if (i) {
212 schBlock1.release();
213 } else {
214 bool ok = tbb::finalize(schBlock1, std::nothrow);
215 ASSERT(!ok, "Nested blocking terminate must fail.");
216 }
217 bool ok = tbb::finalize(schBlock, std::nothrow);
218 ASSERT(ok, nullptr);
219 }
220 {
221 tbb::task_scheduler_handle schBlock{tbb::attach{}};
222 utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
223 bool ok = tbb::finalize(schBlock, std::nothrow);
224 ASSERT(ok, nullptr);
225 }
226 }
227
TestAutoInit()228 void TestAutoInit()
229 {
230 CallParallelFor(); // autoinit
231 // creation of blocking scheduler is possible, but one is not block
232 utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
233 }
234
main()235 int main()
236 {
237 TestNestingTSI();
238 TestBlockNonblock();
239 TestTasksInThread();
240
241 #if TBB_REVAMP_TODO
242 TestSchedulerMemLeaks();
243 #endif
244
245 bool child = false;
246 #if _WIN32||_WIN64
247 DWORD masterTid = GetCurrentThreadId();
248 #else
249 struct sigaction sa;
250 sigset_t sig_set;
251
252 sigemptyset(&sa.sa_mask);
253 sa.sa_flags = 0;
254 sa.sa_handler = SigHandler;
255 if (sigaction(SIGCHLD, &sa, nullptr))
256 ASSERT(0, "sigaction failed");
257 if (sigaction(SIGALRM, &sa, nullptr))
258 ASSERT(0, "sigaction failed");
259 // block SIGCHLD and SIGALRM, the mask is inherited by worker threads
260 sigemptyset(&sig_set);
261 sigaddset(&sig_set, SIGCHLD);
262 sigaddset(&sig_set, SIGALRM);
263 if (pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
264 ASSERT(0, "pthread_sigmask failed");
265 #endif
266 utils::suppress_unused_warning(child);
267 for (int threads=MinThread; threads<=MaxThread; threads+=MinThread) {
268 for (int i=0; i<20; i++) {
269 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, threads);
270 {
271 tbb::task_scheduler_handle sch{tbb::attach{}};
272 bool ok = tbb::finalize( sch, std::nothrow );
273 ASSERT(ok, nullptr);
274 }
275 tbb::task_scheduler_handle sch{tbb::attach{}};
276 CallParallelFor();
277 bool ok = tbb::finalize( sch, std::nothrow );
278 ASSERT(ok, nullptr);
279 #if _WIN32||_WIN64
280 // check that there is no alive threads after terminate()
281 for (TidTableType::const_iterator it = tidTable.begin();
282 it != tidTable.end(); ++it) {
283 if (masterTid != it->first) {
284 ASSERT(threadTerminated(it->second.h), nullptr);
285 }
286 }
287 tidTable.clear();
288 #else // _WIN32||_WIN64
289 if (child)
290 exit(0);
291 else {
292 pid_t pid = fork();
293 if (!pid) {
294 i = -1;
295 child = true;
296 } else {
297 int sig;
298 pid_t w_ret = 0;
299 // wait for SIGCHLD up to timeout
300 alarm(30);
301 if (0 != sigwait(&sig_set, &sig))
302 ASSERT(0, "sigwait failed");
303 alarm(0);
304 w_ret = waitpid(pid, nullptr, WNOHANG);
305 ASSERT(w_ret>=0, "waitpid failed");
306 if (!w_ret) {
307 ASSERT(!kill(pid, SIGKILL), nullptr);
308 w_ret = waitpid(pid, nullptr, 0);
309 ASSERT(w_ret!=-1, "waitpid failed");
310
311 ASSERT(0, "Hang after fork");
312 }
313 // clean pending signals (if any occurs since sigwait)
314 sigset_t p_mask;
315 for (;;) {
316 sigemptyset(&p_mask);
317 sigpending(&p_mask);
318 if (sigismember(&p_mask, SIGALRM)
319 || sigismember(&p_mask, SIGCHLD)) {
320 if (0 != sigwait(&p_mask, &sig))
321 ASSERT(0, "sigwait failed");
322 } else
323 break;
324 }
325 }
326 }
327 #endif // _WIN32||_WIN64
328 }
329 }
330 // auto initialization at this point
331 TestAutoInit();
332
333 return 0;
334 }
335