xref: /oneTBB/test/tbb/test_tbb_fork.cpp (revision 57f524ca)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //! \file test_tbb_fork.cpp
18 //! \brief Test for [sched.global_control] specification
19 
20 #include "tbb/global_control.h"
21 #include "tbb/blocked_range.h"
22 #include "tbb/cache_aligned_allocator.h"
23 #include "tbb/parallel_for.h"
24 
25 static const int MinThread = 1;
26 static const int MaxThread = 4;
27 
28 // Doctest is not used here, but placed just to prevent compiler errors for bad headers design
29 #define DOCTEST_CONFIG_IMPLEMENT
30 #include "common/test.h"
31 
32 #include "common/utils.h"
33 #include "common/utils_assert.h"
34 
35 #if _WIN32||_WIN64
36 #include "tbb/concurrent_hash_map.h"
37 
38 HANDLE getCurrentThreadHandle()
39 {
40     HANDLE hProc = GetCurrentProcess(), hThr = INVALID_HANDLE_VALUE;
41 #if TBB_USE_ASSERT
42     BOOL res =
43 #endif
44     DuplicateHandle( hProc, GetCurrentThread(), hProc, &hThr, 0, FALSE, DUPLICATE_SAME_ACCESS );
45     __TBB_ASSERT( res, "Retrieving current thread handle failed" );
46     return hThr;
47 }
48 
49 bool threadTerminated(HANDLE h)
50 {
51     DWORD ret = WaitForSingleObjectEx(h, 0, FALSE);
52     return WAIT_OBJECT_0 == ret;
53 }
54 
55 struct Data {
56     HANDLE h;
57 };
58 
59 typedef tbb::concurrent_hash_map<DWORD, Data> TidTableType;
60 
61 static TidTableType tidTable;
62 
63 #else
64 
65 #if __sun || __SUNPRO_CC
66 #define _POSIX_PTHREAD_SEMANTICS 1 // to get standard-conforming sigwait(2)
67 #endif
68 #include <signal.h>
69 #include <sys/types.h>
70 #include <unistd.h>
71 #include <sys/wait.h>
72 #include <sched.h>
73 
74 #include "tbb/tick_count.h"
75 
76 void SigHandler(int) { }
77 
78 #endif // _WIN32||_WIN64
79 
80 class AllocTask {
81 public:
82     void operator() (const tbb::blocked_range<int> &r) const {
83 #if _WIN32||_WIN64
84         HANDLE h = getCurrentThreadHandle();
85         DWORD tid = GetCurrentThreadId();
86         {
87             TidTableType::accessor acc;
88             if (tidTable.insert(acc, tid)) {
89                 acc->second.h = h;
90             }
91         }
92 #endif
93         for (int y = r.begin(); y != r.end(); ++y) {
94             void *p = tbb::detail::r1::cache_aligned_allocate(7000);
95             tbb::detail::r1::cache_aligned_deallocate(p);
96         }
97     }
98     AllocTask() {}
99 };
100 
101 void CallParallelFor()
102 {
103     tbb::parallel_for(tbb::blocked_range<int>(0, 10000, 1), AllocTask(),
104                       tbb::simple_partitioner());
105 }
106 
107 /* Regression test against data race between termination of workers
108    and setting blocking termination mode in main thread. */
109 class RunWorkersBody : utils::NoAssign {
110     bool wait_workers;
111 public:
112     RunWorkersBody(bool waitWorkers) : wait_workers(waitWorkers) {}
113     void operator()(const int /*threadID*/) const {
114         tbb::task_scheduler_handle tsi{tbb::attach{}};
115         CallParallelFor();
116         if (wait_workers) {
117             bool ok = tbb::finalize(tsi, std::nothrow);
118             ASSERT(ok, nullptr);
119         } else {
120             tsi.release();
121         }
122     }
123 };
124 
125 void TestBlockNonblock()
126 {
127     for (int i=0; i<100; i++) {
128         utils::NativeParallelFor(4, RunWorkersBody(/*wait_workers=*/false));
129         RunWorkersBody(/*wait_workers=*/true)(0);
130     }
131 }
132 
133 class RunInNativeThread : utils::NoAssign {
134     bool blocking;
135 
136 public:
137     RunInNativeThread(bool blocking_) : blocking(blocking_) {}
138     void operator()(const int /*threadID*/) const {
139         tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle{tbb::attach{}};
140         CallParallelFor();
141         if (blocking) {
142             bool ok = tbb::finalize(tsi, std::nothrow);
143             ASSERT(!ok, "Nested blocking terminate must fail.");
144         } else {
145             tsi.release();
146         }
147     }
148 };
149 
150 void TestTasksInThread()
151 {
152     tbb::task_scheduler_handle sch{tbb::attach{}};
153     CallParallelFor();
154     utils::NativeParallelFor(2, RunInNativeThread(/*blocking=*/false));
155     bool ok = tbb::finalize(sch, std::nothrow);
156     ASSERT(ok, nullptr);
157 }
158 
159 #if TBB_REVAMP_TODO
160 
161 #include "common/memory_usage.h"
162 
163 // check for memory leak during TBB task scheduler init/terminate life cycle
164 // TODO: move to test_task_scheduler_init after workers waiting productization
165 void TestSchedulerMemLeaks()
166 {
167     const int ITERS = 10;
168     int it;
169 
170     for (it=0; it<ITERS; it++) {
171         size_t memBefore = utils::GetMemoryUsage();
172 #if _MSC_VER && _DEBUG
173         // _CrtMemCheckpoint() and _CrtMemDifference are non-empty only in _DEBUG
174         _CrtMemState stateBefore, stateAfter, diffState;
175         _CrtMemCheckpoint(&stateBefore);
176 #endif
177         for (int i=0; i<100; i++) {
178             tbb::task_arena arena(1, 1); arena.initialize(); // right approach?
179             // tbb::task_scheduler_init sch(1);
180             tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get();
181             for (int k=0; k<10; k++) {
182                 // tbb::empty_task *t = new( tbb::task::allocate_root() ) tbb::empty_task();
183                 // tbb::task::enqueue(*t);
184                 arena.enqueue([&]{});
185             }
186             bool ok = tbb::finalize(sch, std::nothrow);
187             ASSERT(ok, nullptr);
188         }
189 #if _MSC_VER && _DEBUG
190         _CrtMemCheckpoint(&stateAfter);
191         int ret = _CrtMemDifference(&diffState, &stateBefore, &stateAfter);
192         ASSERT(!ret, "It must be no memory leaks at this point.");
193 #endif
194         if (utils::GetMemoryUsage() <= memBefore)
195             break;
196     }
197     ASSERT(it < ITERS, "Memory consumption has not stabilized. Memory Leak?");
198 }
199 
200 #endif // TBB_REVAMP_TODO
201 
202 void TestNestingTSI()
203 {
204     // nesting with and without blocking is possible
205     for (int i=0; i<2; i++) {
206         tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle{tbb::attach{}};
207         CallParallelFor();
208         tbb::task_scheduler_handle schBlock1;
209         schBlock1 = tbb::task_scheduler_handle{tbb::attach{}};
210         CallParallelFor();
211         if (i) {
212             schBlock1.release();
213         } else {
214             bool ok = tbb::finalize(schBlock1, std::nothrow);
215             ASSERT(!ok, "Nested blocking terminate must fail.");
216         }
217         bool ok = tbb::finalize(schBlock, std::nothrow);
218         ASSERT(ok, nullptr);
219     }
220     {
221         tbb::task_scheduler_handle schBlock{tbb::attach{}};
222         utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
223         bool ok = tbb::finalize(schBlock, std::nothrow);
224         ASSERT(ok, nullptr);
225     }
226 }
227 
228 void TestAutoInit()
229 {
230     CallParallelFor(); // autoinit
231     // creation of blocking scheduler is possible, but one is not block
232     utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
233 }
234 
235 int main()
236 {
237     TestNestingTSI();
238     TestBlockNonblock();
239     TestTasksInThread();
240 
241 #if TBB_REVAMP_TODO
242     TestSchedulerMemLeaks();
243 #endif
244 
245     bool child = false;
246 #if _WIN32||_WIN64
247     DWORD masterTid = GetCurrentThreadId();
248 #else
249     struct sigaction sa;
250     sigset_t sig_set;
251 
252     sigemptyset(&sa.sa_mask);
253     sa.sa_flags = 0;
254     sa.sa_handler = SigHandler;
255     if (sigaction(SIGCHLD, &sa, nullptr))
256         ASSERT(0, "sigaction failed");
257     if (sigaction(SIGALRM, &sa, nullptr))
258         ASSERT(0, "sigaction failed");
259     // block SIGCHLD and SIGALRM, the mask is inherited by worker threads
260     sigemptyset(&sig_set);
261     sigaddset(&sig_set, SIGCHLD);
262     sigaddset(&sig_set, SIGALRM);
263     if (pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
264         ASSERT(0, "pthread_sigmask failed");
265 #endif
266     utils::suppress_unused_warning(child);
267     for (int threads=MinThread; threads<=MaxThread; threads+=MinThread) {
268         for (int i=0; i<20; i++) {
269             tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, threads);
270             {
271                 tbb::task_scheduler_handle sch{tbb::attach{}};
272                 bool ok = tbb::finalize( sch, std::nothrow );
273                 ASSERT(ok, nullptr);
274             }
275             tbb::task_scheduler_handle sch{tbb::attach{}};
276             CallParallelFor();
277             bool ok = tbb::finalize( sch, std::nothrow );
278             ASSERT(ok, nullptr);
279 #if _WIN32||_WIN64
280             // check that there is no alive threads after terminate()
281             for (TidTableType::const_iterator it = tidTable.begin();
282                 it != tidTable.end(); ++it) {
283                 if (masterTid != it->first) {
284                     ASSERT(threadTerminated(it->second.h), nullptr);
285                 }
286             }
287             tidTable.clear();
288 #else // _WIN32||_WIN64
289             if (child)
290                 exit(0);
291             else {
292                 pid_t pid = fork();
293                 if (!pid) {
294                     i = -1;
295                     child = true;
296                 } else {
297                     int sig;
298                     pid_t w_ret = 0;
299                     // wait for SIGCHLD up to timeout
300                     alarm(30);
301                     if (0 != sigwait(&sig_set, &sig))
302                         ASSERT(0, "sigwait failed");
303                     alarm(0);
304                     w_ret = waitpid(pid, nullptr, WNOHANG);
305                     ASSERT(w_ret>=0, "waitpid failed");
306                     if (!w_ret) {
307                         ASSERT(!kill(pid, SIGKILL), nullptr);
308                         w_ret = waitpid(pid, nullptr, 0);
309                         ASSERT(w_ret!=-1, "waitpid failed");
310 
311                         ASSERT(0, "Hang after fork");
312                     }
313                     // clean pending signals (if any occurs since sigwait)
314                     sigset_t p_mask;
315                     for (;;) {
316                         sigemptyset(&p_mask);
317                         sigpending(&p_mask);
318                         if (sigismember(&p_mask, SIGALRM)
319                             || sigismember(&p_mask, SIGCHLD)) {
320                             if (0 != sigwait(&p_mask, &sig))
321                                 ASSERT(0, "sigwait failed");
322                         } else
323                             break;
324                     }
325                 }
326             }
327 #endif // _WIN32||_WIN64
328         }
329     }
330     // auto initialization at this point
331     TestAutoInit();
332 
333     return 0;
334 }
335