1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 //! \file test_tbb_fork.cpp 18 //! \brief Test for [sched.global_control] specification 19 20 #include "tbb/global_control.h" 21 #include "tbb/blocked_range.h" 22 #include "tbb/cache_aligned_allocator.h" 23 #include "tbb/parallel_for.h" 24 25 static const int MinThread = 1; 26 static const int MaxThread = 4; 27 28 // Doctest is not used here, but placed just to prevent compiler errors for bad headers design 29 #define DOCTEST_CONFIG_IMPLEMENT 30 #include "common/test.h" 31 32 #include "common/utils.h" 33 #include "common/utils_assert.h" 34 35 #if _WIN32||_WIN64 36 #include "tbb/concurrent_hash_map.h" 37 38 HANDLE getCurrentThreadHandle() 39 { 40 HANDLE hProc = GetCurrentProcess(), hThr = INVALID_HANDLE_VALUE; 41 #if TBB_USE_ASSERT 42 BOOL res = 43 #endif 44 DuplicateHandle( hProc, GetCurrentThread(), hProc, &hThr, 0, FALSE, DUPLICATE_SAME_ACCESS ); 45 __TBB_ASSERT( res, "Retrieving current thread handle failed" ); 46 return hThr; 47 } 48 49 bool threadTerminated(HANDLE h) 50 { 51 DWORD ret = WaitForSingleObjectEx(h, 0, FALSE); 52 return WAIT_OBJECT_0 == ret; 53 } 54 55 struct Data { 56 HANDLE h; 57 }; 58 59 typedef tbb::concurrent_hash_map<DWORD, Data> TidTableType; 60 61 static TidTableType tidTable; 62 63 #else 64 65 #if __sun || __SUNPRO_CC 66 #define _POSIX_PTHREAD_SEMANTICS 1 // to get standard-conforming sigwait(2) 67 #endif 68 #include <signal.h> 69 #include <sys/types.h> 70 #include <unistd.h> 71 #include <sys/wait.h> 72 #include <sched.h> 73 74 #include "tbb/tick_count.h" 75 76 void SigHandler(int) { } 77 78 #endif // _WIN32||_WIN64 79 80 class AllocTask { 81 public: 82 void operator() (const tbb::blocked_range<int> &r) const { 83 #if _WIN32||_WIN64 84 HANDLE h = getCurrentThreadHandle(); 85 DWORD tid = GetCurrentThreadId(); 86 { 87 TidTableType::accessor acc; 88 if (tidTable.insert(acc, tid)) { 89 acc->second.h = h; 90 } 91 } 92 #endif 93 for (int y = r.begin(); y != r.end(); ++y) { 94 void *p = tbb::detail::r1::cache_aligned_allocate(7000); 95 tbb::detail::r1::cache_aligned_deallocate(p); 96 } 97 } 98 AllocTask() {} 99 }; 100 101 void CallParallelFor() 102 { 103 tbb::parallel_for(tbb::blocked_range<int>(0, 10000, 1), AllocTask(), 104 tbb::simple_partitioner()); 105 } 106 107 /* Regression test against data race between termination of workers 108 and setting blocking termination mode in main thread. */ 109 class RunWorkersBody : utils::NoAssign { 110 bool wait_workers; 111 public: 112 RunWorkersBody(bool waitWorkers) : wait_workers(waitWorkers) {} 113 void operator()(const int /*threadID*/) const { 114 tbb::task_scheduler_handle tsi{tbb::attach{}}; 115 CallParallelFor(); 116 if (wait_workers) { 117 bool ok = tbb::finalize(tsi, std::nothrow); 118 ASSERT(ok, nullptr); 119 } else { 120 tsi.release(); 121 } 122 } 123 }; 124 125 void TestBlockNonblock() 126 { 127 for (int i=0; i<100; i++) { 128 utils::NativeParallelFor(4, RunWorkersBody(/*wait_workers=*/false)); 129 RunWorkersBody(/*wait_workers=*/true)(0); 130 } 131 } 132 133 class RunInNativeThread : utils::NoAssign { 134 bool blocking; 135 136 public: 137 RunInNativeThread(bool blocking_) : blocking(blocking_) {} 138 void operator()(const int /*threadID*/) const { 139 tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle{tbb::attach{}}; 140 CallParallelFor(); 141 if (blocking) { 142 bool ok = tbb::finalize(tsi, std::nothrow); 143 ASSERT(!ok, "Nested blocking terminate must fail."); 144 } else { 145 tsi.release(); 146 } 147 } 148 }; 149 150 void TestTasksInThread() 151 { 152 tbb::task_scheduler_handle sch{tbb::attach{}}; 153 CallParallelFor(); 154 utils::NativeParallelFor(2, RunInNativeThread(/*blocking=*/false)); 155 bool ok = tbb::finalize(sch, std::nothrow); 156 ASSERT(ok, nullptr); 157 } 158 159 #if TBB_REVAMP_TODO 160 161 #include "common/memory_usage.h" 162 163 // check for memory leak during TBB task scheduler init/terminate life cycle 164 // TODO: move to test_task_scheduler_init after workers waiting productization 165 void TestSchedulerMemLeaks() 166 { 167 const int ITERS = 10; 168 int it; 169 170 for (it=0; it<ITERS; it++) { 171 size_t memBefore = utils::GetMemoryUsage(); 172 #if _MSC_VER && _DEBUG 173 // _CrtMemCheckpoint() and _CrtMemDifference are non-empty only in _DEBUG 174 _CrtMemState stateBefore, stateAfter, diffState; 175 _CrtMemCheckpoint(&stateBefore); 176 #endif 177 for (int i=0; i<100; i++) { 178 tbb::task_arena arena(1, 1); arena.initialize(); // right approach? 179 // tbb::task_scheduler_init sch(1); 180 tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get(); 181 for (int k=0; k<10; k++) { 182 // tbb::empty_task *t = new( tbb::task::allocate_root() ) tbb::empty_task(); 183 // tbb::task::enqueue(*t); 184 arena.enqueue([&]{}); 185 } 186 bool ok = tbb::finalize(sch, std::nothrow); 187 ASSERT(ok, nullptr); 188 } 189 #if _MSC_VER && _DEBUG 190 _CrtMemCheckpoint(&stateAfter); 191 int ret = _CrtMemDifference(&diffState, &stateBefore, &stateAfter); 192 ASSERT(!ret, "It must be no memory leaks at this point."); 193 #endif 194 if (utils::GetMemoryUsage() <= memBefore) 195 break; 196 } 197 ASSERT(it < ITERS, "Memory consumption has not stabilized. Memory Leak?"); 198 } 199 200 #endif // TBB_REVAMP_TODO 201 202 void TestNestingTSI() 203 { 204 // nesting with and without blocking is possible 205 for (int i=0; i<2; i++) { 206 tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle{tbb::attach{}}; 207 CallParallelFor(); 208 tbb::task_scheduler_handle schBlock1; 209 schBlock1 = tbb::task_scheduler_handle{tbb::attach{}}; 210 CallParallelFor(); 211 if (i) { 212 schBlock1.release(); 213 } else { 214 bool ok = tbb::finalize(schBlock1, std::nothrow); 215 ASSERT(!ok, "Nested blocking terminate must fail."); 216 } 217 bool ok = tbb::finalize(schBlock, std::nothrow); 218 ASSERT(ok, nullptr); 219 } 220 { 221 tbb::task_scheduler_handle schBlock{tbb::attach{}}; 222 utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true)); 223 bool ok = tbb::finalize(schBlock, std::nothrow); 224 ASSERT(ok, nullptr); 225 } 226 } 227 228 void TestAutoInit() 229 { 230 CallParallelFor(); // autoinit 231 // creation of blocking scheduler is possible, but one is not block 232 utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true)); 233 } 234 235 int main() 236 { 237 TestNestingTSI(); 238 TestBlockNonblock(); 239 TestTasksInThread(); 240 241 #if TBB_REVAMP_TODO 242 TestSchedulerMemLeaks(); 243 #endif 244 245 bool child = false; 246 #if _WIN32||_WIN64 247 DWORD masterTid = GetCurrentThreadId(); 248 #else 249 struct sigaction sa; 250 sigset_t sig_set; 251 252 sigemptyset(&sa.sa_mask); 253 sa.sa_flags = 0; 254 sa.sa_handler = SigHandler; 255 if (sigaction(SIGCHLD, &sa, nullptr)) 256 ASSERT(0, "sigaction failed"); 257 if (sigaction(SIGALRM, &sa, nullptr)) 258 ASSERT(0, "sigaction failed"); 259 // block SIGCHLD and SIGALRM, the mask is inherited by worker threads 260 sigemptyset(&sig_set); 261 sigaddset(&sig_set, SIGCHLD); 262 sigaddset(&sig_set, SIGALRM); 263 if (pthread_sigmask(SIG_BLOCK, &sig_set, nullptr)) 264 ASSERT(0, "pthread_sigmask failed"); 265 #endif 266 utils::suppress_unused_warning(child); 267 for (int threads=MinThread; threads<=MaxThread; threads+=MinThread) { 268 for (int i=0; i<20; i++) { 269 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, threads); 270 { 271 tbb::task_scheduler_handle sch{tbb::attach{}}; 272 bool ok = tbb::finalize( sch, std::nothrow ); 273 ASSERT(ok, nullptr); 274 } 275 tbb::task_scheduler_handle sch{tbb::attach{}}; 276 CallParallelFor(); 277 bool ok = tbb::finalize( sch, std::nothrow ); 278 ASSERT(ok, nullptr); 279 #if _WIN32||_WIN64 280 // check that there is no alive threads after terminate() 281 for (TidTableType::const_iterator it = tidTable.begin(); 282 it != tidTable.end(); ++it) { 283 if (masterTid != it->first) { 284 ASSERT(threadTerminated(it->second.h), nullptr); 285 } 286 } 287 tidTable.clear(); 288 #else // _WIN32||_WIN64 289 if (child) 290 exit(0); 291 else { 292 pid_t pid = fork(); 293 if (!pid) { 294 i = -1; 295 child = true; 296 } else { 297 int sig; 298 pid_t w_ret = 0; 299 // wait for SIGCHLD up to timeout 300 alarm(30); 301 if (0 != sigwait(&sig_set, &sig)) 302 ASSERT(0, "sigwait failed"); 303 alarm(0); 304 w_ret = waitpid(pid, nullptr, WNOHANG); 305 ASSERT(w_ret>=0, "waitpid failed"); 306 if (!w_ret) { 307 ASSERT(!kill(pid, SIGKILL), nullptr); 308 w_ret = waitpid(pid, nullptr, 0); 309 ASSERT(w_ret!=-1, "waitpid failed"); 310 311 ASSERT(0, "Hang after fork"); 312 } 313 // clean pending signals (if any occurs since sigwait) 314 sigset_t p_mask; 315 for (;;) { 316 sigemptyset(&p_mask); 317 sigpending(&p_mask); 318 if (sigismember(&p_mask, SIGALRM) 319 || sigismember(&p_mask, SIGCHLD)) { 320 if (0 != sigwait(&p_mask, &sig)) 321 ASSERT(0, "sigwait failed"); 322 } else 323 break; 324 } 325 } 326 } 327 #endif // _WIN32||_WIN64 328 } 329 } 330 // auto initialization at this point 331 TestAutoInit(); 332 333 return 0; 334 } 335