1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 //! \file test_tbb_fork.cpp 18 //! \brief Test for [sched.global_control] specification 19 20 #define TBB_PREVIEW_WAITING_FOR_WORKERS 1 21 22 #include "tbb/global_control.h" 23 #include "tbb/blocked_range.h" 24 #include "tbb/cache_aligned_allocator.h" 25 #include "tbb/parallel_for.h" 26 27 static const int MinThread = 1; 28 static const int MaxThread = 4; 29 30 // Doctest is not used here, but placed just to prevent compiler errors for bad headers design 31 #define DOCTEST_CONFIG_IMPLEMENT 32 #include "common/test.h" 33 34 #include "common/utils.h" 35 #include "common/utils_assert.h" 36 37 #if _WIN32||_WIN64 38 #include "tbb/concurrent_hash_map.h" 39 40 HANDLE getCurrentThreadHandle() 41 { 42 HANDLE hProc = GetCurrentProcess(), hThr = INVALID_HANDLE_VALUE; 43 #if TBB_USE_ASSERT 44 BOOL res = 45 #endif 46 DuplicateHandle( hProc, GetCurrentThread(), hProc, &hThr, 0, FALSE, DUPLICATE_SAME_ACCESS ); 47 __TBB_ASSERT( res, "Retrieving current thread handle failed" ); 48 return hThr; 49 } 50 51 bool threadTerminated(HANDLE h) 52 { 53 DWORD ret = WaitForSingleObjectEx(h, 0, FALSE); 54 return WAIT_OBJECT_0 == ret; 55 } 56 57 struct Data { 58 HANDLE h; 59 }; 60 61 typedef tbb::concurrent_hash_map<DWORD, Data> TidTableType; 62 63 static TidTableType tidTable; 64 65 #else 66 67 #if __sun || __SUNPRO_CC 68 #define _POSIX_PTHREAD_SEMANTICS 1 // to get standard-conforming sigwait(2) 69 #endif 70 #include <signal.h> 71 #include <sys/types.h> 72 #include <unistd.h> 73 #include <sys/wait.h> 74 #include <sched.h> 75 76 #include "tbb/tick_count.h" 77 78 void SigHandler(int) { } 79 80 #endif // _WIN32||_WIN64 81 82 class AllocTask { 83 public: 84 void operator() (const tbb::blocked_range<int> &r) const { 85 #if _WIN32||_WIN64 86 HANDLE h = getCurrentThreadHandle(); 87 DWORD tid = GetCurrentThreadId(); 88 { 89 TidTableType::accessor acc; 90 if (tidTable.insert(acc, tid)) { 91 acc->second.h = h; 92 } 93 } 94 #endif 95 for (int y = r.begin(); y != r.end(); ++y) { 96 void *p = tbb::detail::r1::cache_aligned_allocate(7000); 97 tbb::detail::r1::cache_aligned_deallocate(p); 98 } 99 } 100 AllocTask() {} 101 }; 102 103 void CallParallelFor() 104 { 105 tbb::parallel_for(tbb::blocked_range<int>(0, 10000, 1), AllocTask(), 106 tbb::simple_partitioner()); 107 } 108 109 /* Regression test against data race between termination of workers 110 and setting blocking termination mode in main thread. */ 111 class RunWorkersBody : utils::NoAssign { 112 bool wait_workers; 113 public: 114 RunWorkersBody(bool waitWorkers) : wait_workers(waitWorkers) {} 115 void operator()(const int /*threadID*/) const { 116 tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle::get(); 117 CallParallelFor(); 118 if (wait_workers) { 119 bool ok = tbb::finalize(tsi, std::nothrow); 120 ASSERT(ok, NULL); 121 } else { 122 tbb::task_scheduler_handle::release(tsi); 123 } 124 } 125 }; 126 127 void TestBlockNonblock() 128 { 129 for (int i=0; i<100; i++) { 130 utils::NativeParallelFor(4, RunWorkersBody(/*wait_workers=*/false)); 131 RunWorkersBody(/*wait_workers=*/true)(0); 132 } 133 } 134 135 class RunInNativeThread : utils::NoAssign { 136 bool blocking; 137 138 public: 139 RunInNativeThread(bool blocking_) : blocking(blocking_) {} 140 void operator()(const int /*threadID*/) const { 141 tbb::task_scheduler_handle tsi = tbb::task_scheduler_handle::get(); 142 CallParallelFor(); 143 if (blocking) { 144 bool ok = tbb::finalize(tsi, std::nothrow); 145 ASSERT(!ok, "Nested blocking terminate must fail."); 146 } else { 147 tbb::task_scheduler_handle::release(tsi); 148 } 149 } 150 }; 151 152 void TestTasksInThread() 153 { 154 tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get(); 155 CallParallelFor(); 156 utils::NativeParallelFor(2, RunInNativeThread(/*blocking=*/false)); 157 bool ok = tbb::finalize(sch, std::nothrow); 158 ASSERT(ok, NULL); 159 } 160 161 #if TBB_REVAMP_TODO 162 163 #include "common/memory_usage.h" 164 165 // check for memory leak during TBB task scheduler init/terminate life cycle 166 // TODO: move to test_task_scheduler_init after workers waiting productization 167 void TestSchedulerMemLeaks() 168 { 169 const int ITERS = 10; 170 int it; 171 172 for (it=0; it<ITERS; it++) { 173 size_t memBefore = utils::GetMemoryUsage(); 174 #if _MSC_VER && _DEBUG 175 // _CrtMemCheckpoint() and _CrtMemDifference are non-empty only in _DEBUG 176 _CrtMemState stateBefore, stateAfter, diffState; 177 _CrtMemCheckpoint(&stateBefore); 178 #endif 179 for (int i=0; i<100; i++) { 180 tbb::task_arena arena(1, 1); arena.initialize(); // right approach? 181 // tbb::task_scheduler_init sch(1); 182 tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get(); 183 for (int k=0; k<10; k++) { 184 // tbb::empty_task *t = new( tbb::task::allocate_root() ) tbb::empty_task(); 185 // tbb::task::enqueue(*t); 186 arena.enqueue([&]{}); 187 } 188 bool ok = tbb::finalize(sch, std::nothrow); 189 ASSERT(ok, NULL); 190 } 191 #if _MSC_VER && _DEBUG 192 _CrtMemCheckpoint(&stateAfter); 193 int ret = _CrtMemDifference(&diffState, &stateBefore, &stateAfter); 194 ASSERT(!ret, "It must be no memory leaks at this point."); 195 #endif 196 if (utils::GetMemoryUsage() <= memBefore) 197 break; 198 } 199 ASSERT(it < ITERS, "Memory consumption has not stabilized. Memory Leak?"); 200 } 201 202 #endif // TBB_REVAMP_TODO 203 204 void TestNestingTSI() 205 { 206 // nesting with and without blocking is possible 207 for (int i=0; i<2; i++) { 208 tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle::get(); 209 CallParallelFor(); 210 tbb::task_scheduler_handle schBlock1 = tbb::task_scheduler_handle::get(); 211 CallParallelFor(); 212 if (i) { 213 tbb::task_scheduler_handle::release(schBlock1); 214 } else { 215 bool ok = tbb::finalize(schBlock1, std::nothrow); 216 ASSERT(!ok, "Nested blocking terminate must fail."); 217 } 218 bool ok = tbb::finalize(schBlock, std::nothrow); 219 ASSERT(ok, NULL); 220 } 221 { 222 tbb::task_scheduler_handle schBlock = tbb::task_scheduler_handle::get(); 223 utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true)); 224 bool ok = tbb::finalize(schBlock, std::nothrow); 225 ASSERT(ok, NULL); 226 } 227 } 228 229 void TestAutoInit() 230 { 231 CallParallelFor(); // autoinit 232 // creation of blocking scheduler is possible, but one is not block 233 utils::NativeParallelFor(1, RunInNativeThread(/*blocking=*/true)); 234 } 235 236 int main() 237 { 238 TestNestingTSI(); 239 TestBlockNonblock(); 240 TestTasksInThread(); 241 242 #if TBB_REVAMP_TODO 243 TestSchedulerMemLeaks(); 244 #endif 245 246 bool child = false; 247 #if _WIN32||_WIN64 248 DWORD masterTid = GetCurrentThreadId(); 249 #else 250 struct sigaction sa; 251 sigset_t sig_set; 252 253 sigemptyset(&sa.sa_mask); 254 sa.sa_flags = 0; 255 sa.sa_handler = SigHandler; 256 if (sigaction(SIGCHLD, &sa, NULL)) 257 ASSERT(0, "sigaction failed"); 258 if (sigaction(SIGALRM, &sa, NULL)) 259 ASSERT(0, "sigaction failed"); 260 // block SIGCHLD and SIGALRM, the mask is inherited by worker threads 261 sigemptyset(&sig_set); 262 sigaddset(&sig_set, SIGCHLD); 263 sigaddset(&sig_set, SIGALRM); 264 if (pthread_sigmask(SIG_BLOCK, &sig_set, NULL)) 265 ASSERT(0, "pthread_sigmask failed"); 266 #endif 267 utils::suppress_unused_warning(child); 268 for (int threads=MinThread; threads<=MaxThread; threads+=MinThread) { 269 for (int i=0; i<20; i++) { 270 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, threads); 271 { 272 tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get(); 273 bool ok = tbb::finalize( sch, std::nothrow ); 274 ASSERT(ok, NULL); 275 } 276 tbb::task_scheduler_handle sch = tbb::task_scheduler_handle::get(); 277 CallParallelFor(); 278 bool ok = tbb::finalize( sch, std::nothrow ); 279 ASSERT(ok, NULL); 280 #if _WIN32||_WIN64 281 // check that there is no alive threads after terminate() 282 for (TidTableType::const_iterator it = tidTable.begin(); 283 it != tidTable.end(); ++it) { 284 if (masterTid != it->first) { 285 ASSERT(threadTerminated(it->second.h), NULL); 286 } 287 } 288 tidTable.clear(); 289 #else // _WIN32||_WIN64 290 if (child) 291 exit(0); 292 else { 293 pid_t pid = fork(); 294 if (!pid) { 295 i = -1; 296 child = true; 297 } else { 298 int sig; 299 pid_t w_ret = 0; 300 // wait for SIGCHLD up to timeout 301 alarm(30); 302 if (0 != sigwait(&sig_set, &sig)) 303 ASSERT(0, "sigwait failed"); 304 alarm(0); 305 w_ret = waitpid(pid, NULL, WNOHANG); 306 ASSERT(w_ret>=0, "waitpid failed"); 307 if (!w_ret) { 308 ASSERT(!kill(pid, SIGKILL), NULL); 309 w_ret = waitpid(pid, NULL, 0); 310 ASSERT(w_ret!=-1, "waitpid failed"); 311 312 ASSERT(0, "Hang after fork"); 313 } 314 // clean pending signals (if any occurs since sigwait) 315 sigset_t p_mask; 316 for (;;) { 317 sigemptyset(&p_mask); 318 sigpending(&p_mask); 319 if (sigismember(&p_mask, SIGALRM) 320 || sigismember(&p_mask, SIGCHLD)) { 321 if (0 != sigwait(&p_mask, &sig)) 322 ASSERT(0, "sigwait failed"); 323 } else 324 break; 325 } 326 } 327 } 328 #endif // _WIN32||_WIN64 329 } 330 } 331 // auto initialization at this point 332 TestAutoInit(); 333 334 return 0; 335 } 336