xref: /oneTBB/test/tbb/test_tbb_fork.cpp (revision 8dcbd5b1)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 //! \file test_tbb_fork.cpp
18 //! \brief Test for [sched.global_control] specification
19 
20 #define TBB_PREVIEW_WAITING_FOR_WORKERS 1
21 
22 #include "tbb/global_control.h"
23 #include "tbb/blocked_range.h"
24 #include "tbb/cache_aligned_allocator.h"
25 #include "tbb/parallel_for.h"
26 
27 static const int MinThread = 1;
28 static const int MaxThread = 4;
29 
30 // Doctest is not used here, but placed just to prevent compiler errors for bad headers design
31 #define DOCTEST_CONFIG_IMPLEMENT
32 #include "common/test.h"
33 
34 #include "common/utils.h"
35 #include "common/utils_assert.h"
36 
37 #if _WIN32||_WIN64
38 #include "tbb/concurrent_hash_map.h"
39 
40 HANDLE getCurrentThreadHandle()
41 {
42     HANDLE hProc = GetCurrentProcess(), hThr = INVALID_HANDLE_VALUE;
43 #if TBB_USE_ASSERT
44     BOOL res =
45 #endif
46     DuplicateHandle( hProc, GetCurrentThread(), hProc, &hThr, 0, FALSE, DUPLICATE_SAME_ACCESS );
47     __TBB_ASSERT( res, "Retrieving current thread handle failed" );
48     return hThr;
49 }
50 
51 bool threadTerminated(HANDLE h)
52 {
53     DWORD ret = WaitForSingleObjectEx(h, 0, FALSE);
54     return WAIT_OBJECT_0 == ret;
55 }
56 
57 struct Data {
58     HANDLE h;
59 };
60 
61 typedef tbb::concurrent_hash_map<DWORD, Data> TidTableType;
62 
63 static TidTableType tidTable;
64 
65 #else
66 
67 #if __sun || __SUNPRO_CC
68 #define _POSIX_PTHREAD_SEMANTICS 1 // to get standard-conforming sigwait(2)
69 #endif
70 #include <signal.h>
71 #include <sys/types.h>
72 #include <unistd.h>
73 #include <sys/wait.h>
74 #include <sched.h>
75 
76 #include "tbb/tick_count.h"
77 
78 void SigHandler(int) { }
79 
80 #endif // _WIN32||_WIN64
81 
82 class AllocTask {
83 public:
84     void operator() (const tbb::blocked_range<int> &r) const {
85 #if _WIN32||_WIN64
86         HANDLE h = getCurrentThreadHandle();
87         DWORD tid = GetCurrentThreadId();
88         {
89             TidTableType::accessor acc;
90             if (tidTable.insert(acc, tid)) {
91                 acc->second.h = h;
92             }
93         }
94 #endif
95         for (int y = r.begin(); y != r.end(); ++y) {
96             void *p = tbb::detail::r1::cache_aligned_allocate(7000);
97             tbb::detail::r1::cache_aligned_deallocate(p);
98         }
99     }
100     AllocTask() {}
101 };
102 
103 void CallParallelFor()
104 {
105     tbb::parallel_for(tbb::blocked_range<int>(0, 10000, 1), AllocTask(),
106                       tbb::simple_partitioner());
107 }
108 
109 /* Regression test against data race between termination of workers
110    and setting blocking termination mode in main thread. */
111 class RunWorkersBody : utils::NoAssign {
112     bool wait_workers;
113 public:
114     RunWorkersBody(bool waitWorkers) : wait_workers(waitWorkers) {}
115     void operator()(const int /*threadID*/) const {
116         tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle::get();
117         CallParallelFor();
118         if (wait_workers) {
119             bool ok = tbb::finalize(tsi, std::nothrow);
120             ASSERT(ok, NULL);
121         } else {
122             tbb::task_scheduler_handle::release(tsi);
123         }
124     }
125 };
126 
127 void TestBlockNonblock()
128 {
129     for (int i=0; i<100; i++) {
130         utils::NativeParallelFor(4, RunWorkersBody(/*wait_workers=*/false));
131         RunWorkersBody(/*wait_workers=*/true)(0);
132     }
133 }
134 
135 class RunInNativeThread : utils::NoAssign {
136     bool blocking;
137 
138 public:
139     RunInNativeThread(bool blocking_) : blocking(blocking_) {}
140     void operator()(const int /*threadID*/) const {
141         tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle::get();
142         CallParallelFor();
143         if (blocking) {
144             bool ok = tbb::finalize(tsi, std::nothrow);
145             ASSERT(!ok, "Nested blocking terminate must fail.");
146         } else {
147             tbb::task_scheduler_handle::release(tsi);
148         }
149     }
150 };
151 
152 void TestTasksInThread()
153 {
154     tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get();
155     CallParallelFor();
156     utils::NativeParallelFor(2, RunInNativeThread(/*blocking=*/false));
157     bool ok = tbb::finalize(sch, std::nothrow);
158     ASSERT(ok, NULL);
159 }
160 
161 #if TBB_REVAMP_TODO
162 
163 #include "common/memory_usage.h"
164 
165 // check for memory leak during TBB task scheduler init/terminate life cycle
166 // TODO: move to test_task_scheduler_init after workers waiting productization
167 void TestSchedulerMemLeaks()
168 {
169     const int ITERS = 10;
170     int it;
171 
172     for (it=0; it<ITERS; it++) {
173         size_t memBefore = utils::GetMemoryUsage();
174 #if _MSC_VER && _DEBUG
175         // _CrtMemCheckpoint() and _CrtMemDifference are non-empty only in _DEBUG
176         _CrtMemState stateBefore, stateAfter, diffState;
177         _CrtMemCheckpoint(&stateBefore);
178 #endif
179         for (int i=0; i<100; i++) {
180             tbb::task_arena arena(1, 1); arena.initialize(); // right approach?
181             // tbb::task_scheduler_init sch(1);
182             tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get();
183             for (int k=0; k<10; k++) {
184                 // tbb::empty_task *t = new( tbb::task::allocate_root() ) tbb::empty_task();
185                 // tbb::task::enqueue(*t);
186                 arena.enqueue([&]{});
187             }
188             bool ok = tbb::finalize(sch, std::nothrow);
189             ASSERT(ok, NULL);
190         }
191 #if _MSC_VER && _DEBUG
192         _CrtMemCheckpoint(&stateAfter);
193         int ret = _CrtMemDifference(&diffState, &stateBefore, &stateAfter);
194         ASSERT(!ret, "It must be no memory leaks at this point.");
195 #endif
196         if (utils::GetMemoryUsage() <= memBefore)
197             break;
198     }
199     ASSERT(it < ITERS, "Memory consumption has not stabilized. Memory Leak?");
200 }
201 
202 #endif // TBB_REVAMP_TODO
203 
204 void TestNestingTSI()
205 {
206     // nesting with and without blocking is possible
207     for (int i=0; i<2; i++) {
208         tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle::get();
209         CallParallelFor();
210         tbb::task_scheduler_handle schBlock1 = tbb::task_scheduler_handle::get();
211         CallParallelFor();
212         if (i) {
213             tbb::task_scheduler_handle::release(schBlock1);
214         } else {
215             bool ok = tbb::finalize(schBlock1, std::nothrow);
216             ASSERT(!ok, "Nested blocking terminate must fail.");
217         }
218         bool ok = tbb::finalize(schBlock, std::nothrow);
219         ASSERT(ok, NULL);
220     }
221     {
222         tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle::get();
223         utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
224         bool ok = tbb::finalize(schBlock, std::nothrow);
225         ASSERT(ok, NULL);
226     }
227 }
228 
229 void TestAutoInit()
230 {
231     CallParallelFor(); // autoinit
232     // creation of blocking scheduler is possible, but one is not block
233     utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true));
234 }
235 
236 int main()
237 {
238     TestNestingTSI();
239     TestBlockNonblock();
240     TestTasksInThread();
241 
242 #if TBB_REVAMP_TODO
243     TestSchedulerMemLeaks();
244 #endif
245 
246     bool child = false;
247 #if _WIN32||_WIN64
248     DWORD masterTid = GetCurrentThreadId();
249 #else
250     struct sigaction sa;
251     sigset_t sig_set;
252 
253     sigemptyset(&sa.sa_mask);
254     sa.sa_flags = 0;
255     sa.sa_handler = SigHandler;
256     if (sigaction(SIGCHLD, &sa, NULL))
257         ASSERT(0, "sigaction failed");
258     if (sigaction(SIGALRM, &sa, NULL))
259         ASSERT(0, "sigaction failed");
260     // block SIGCHLD and SIGALRM, the mask is inherited by worker threads
261     sigemptyset(&sig_set);
262     sigaddset(&sig_set, SIGCHLD);
263     sigaddset(&sig_set, SIGALRM);
264     if (pthread_sigmask(SIG_BLOCK, &sig_set, NULL))
265         ASSERT(0, "pthread_sigmask failed");
266 #endif
267     utils::suppress_unused_warning(child);
268     for (int threads=MinThread; threads<=MaxThread; threads+=MinThread) {
269         for (int i=0; i<20; i++) {
270             tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, threads);
271             {
272                 tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get();
273                 bool ok = tbb::finalize( sch, std::nothrow );
274                 ASSERT(ok, NULL);
275             }
276             tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get();
277             CallParallelFor();
278             bool ok = tbb::finalize( sch, std::nothrow );
279             ASSERT(ok, NULL);
280 #if _WIN32||_WIN64
281             // check that there is no alive threads after terminate()
282             for (TidTableType::const_iterator it = tidTable.begin();
283                 it != tidTable.end(); ++it) {
284                 if (masterTid != it->first) {
285                     ASSERT(threadTerminated(it->second.h), NULL);
286                 }
287             }
288             tidTable.clear();
289 #else // _WIN32||_WIN64
290             if (child)
291                 exit(0);
292             else {
293                 pid_t pid = fork();
294                 if (!pid) {
295                     i = -1;
296                     child = true;
297                 } else {
298                     int sig;
299                     pid_t w_ret = 0;
300                     // wait for SIGCHLD up to timeout
301                     alarm(30);
302                     if (0 != sigwait(&sig_set, &sig))
303                         ASSERT(0, "sigwait failed");
304                     alarm(0);
305                     w_ret = waitpid(pid, NULL, WNOHANG);
306                     ASSERT(w_ret>=0, "waitpid failed");
307                     if (!w_ret) {
308                         ASSERT(!kill(pid, SIGKILL), NULL);
309                         w_ret = waitpid(pid, NULL, 0);
310                         ASSERT(w_ret!=-1, "waitpid failed");
311 
312                         ASSERT(0, "Hang after fork");
313                     }
314                     // clean pending signals (if any occurs since sigwait)
315                     sigset_t p_mask;
316                     for (;;) {
317                         sigemptyset(&p_mask);
318                         sigpending(&p_mask);
319                         if (sigismember(&p_mask, SIGALRM)
320                             || sigismember(&p_mask, SIGCHLD)) {
321                             if (0 != sigwait(&p_mask, &sig))
322                                 ASSERT(0, "sigwait failed");
323                         } else
324                             break;
325                     }
326                 }
327             }
328 #endif // _WIN32||_WIN64
329         }
330     }
331     // auto initialization at this point
332     TestAutoInit();
333 
334     return 0;
335 }
336