1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/ThreadPool.h" 10 11 #include "llvm/ADT/STLExtras.h" 12 #include "llvm/ADT/SetVector.h" 13 #include "llvm/ADT/SmallVector.h" 14 #include "llvm/ADT/Triple.h" 15 #include "llvm/Support/CommandLine.h" 16 #include "llvm/Support/Host.h" 17 #include "llvm/Support/Program.h" 18 #include "llvm/Support/TargetSelect.h" 19 #include "llvm/Support/Threading.h" 20 21 #include <chrono> 22 #include <thread> 23 24 #include "gtest/gtest.h" 25 26 using namespace llvm; 27 28 // Fixture for the unittests, allowing to *temporarily* disable the unittests 29 // on a particular platform 30 class ThreadPoolTest : public testing::Test { 31 Triple Host; 32 SmallVector<Triple::ArchType, 4> UnsupportedArchs; 33 SmallVector<Triple::OSType, 4> UnsupportedOSs; 34 SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments; 35 36 protected: 37 // This is intended for platform as a temporary "XFAIL" 38 bool isUnsupportedOSOrEnvironment() { 39 Triple Host(Triple::normalize(sys::getProcessTriple())); 40 41 if (find(UnsupportedEnvironments, Host.getEnvironment()) != 42 UnsupportedEnvironments.end()) 43 return true; 44 45 if (is_contained(UnsupportedOSs, Host.getOS())) 46 return true; 47 48 if (is_contained(UnsupportedArchs, Host.getArch())) 49 return true; 50 51 return false; 52 } 53 54 ThreadPoolTest() { 55 // Add unsupported configuration here, example: 56 // UnsupportedArchs.push_back(Triple::x86_64); 57 58 // See https://llvm.org/bugs/show_bug.cgi?id=25829 59 UnsupportedArchs.push_back(Triple::ppc64le); 60 UnsupportedArchs.push_back(Triple::ppc64); 61 } 62 63 /// Make sure this thread not progress faster than the main thread. 64 void waitForMainThread() { waitForPhase(1); } 65 66 /// Set the readiness of the main thread. 67 void setMainThreadReady() { setPhase(1); } 68 69 /// Wait until given phase is set using setPhase(); first "main" phase is 1. 70 /// See also PhaseResetHelper below. 71 void waitForPhase(int Phase) { 72 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); 73 CurrentPhaseCondition.wait( 74 LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; }); 75 } 76 /// If a thread waits on another phase, the test could bail out on a failed 77 /// assertion and ThreadPool destructor would wait() on all threads, which 78 /// would deadlock on the task waiting. Create this helper to automatically 79 /// reset the phase and unblock such threads. 80 struct PhaseResetHelper { 81 PhaseResetHelper(ThreadPoolTest *test) : test(test) {} 82 ~PhaseResetHelper() { test->setPhase(-1); } 83 ThreadPoolTest *test; 84 }; 85 86 /// Advance to the given phase. 87 void setPhase(int Phase) { 88 { 89 std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex); 90 assert(Phase == CurrentPhase + 1 || Phase < 0); 91 CurrentPhase = Phase; 92 } 93 CurrentPhaseCondition.notify_all(); 94 } 95 96 void SetUp() override { CurrentPhase = 0; } 97 98 std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S); 99 100 std::condition_variable CurrentPhaseCondition; 101 std::mutex CurrentPhaseMutex; 102 int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom 103 }; 104 105 #define CHECK_UNSUPPORTED() \ 106 do { \ 107 if (isUnsupportedOSOrEnvironment()) \ 108 GTEST_SKIP(); \ 109 } while (0); 110 111 TEST_F(ThreadPoolTest, AsyncBarrier) { 112 CHECK_UNSUPPORTED(); 113 // test that async & barrier work together properly. 114 115 std::atomic_int checked_in{0}; 116 117 ThreadPool Pool; 118 for (size_t i = 0; i < 5; ++i) { 119 Pool.async([this, &checked_in] { 120 waitForMainThread(); 121 ++checked_in; 122 }); 123 } 124 ASSERT_EQ(0, checked_in); 125 setMainThreadReady(); 126 Pool.wait(); 127 ASSERT_EQ(5, checked_in); 128 } 129 130 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; } 131 132 TEST_F(ThreadPoolTest, AsyncBarrierArgs) { 133 CHECK_UNSUPPORTED(); 134 // Test that async works with a function requiring multiple parameters. 135 std::atomic_int checked_in{0}; 136 137 ThreadPool Pool; 138 for (size_t i = 0; i < 5; ++i) { 139 Pool.async(TestFunc, std::ref(checked_in), i); 140 } 141 Pool.wait(); 142 ASSERT_EQ(10, checked_in); 143 } 144 145 TEST_F(ThreadPoolTest, Async) { 146 CHECK_UNSUPPORTED(); 147 ThreadPool Pool; 148 std::atomic_int i{0}; 149 Pool.async([this, &i] { 150 waitForMainThread(); 151 ++i; 152 }); 153 Pool.async([&i] { ++i; }); 154 ASSERT_NE(2, i.load()); 155 setMainThreadReady(); 156 Pool.wait(); 157 ASSERT_EQ(2, i.load()); 158 } 159 160 TEST_F(ThreadPoolTest, GetFuture) { 161 CHECK_UNSUPPORTED(); 162 ThreadPool Pool(hardware_concurrency(2)); 163 std::atomic_int i{0}; 164 Pool.async([this, &i] { 165 waitForMainThread(); 166 ++i; 167 }); 168 // Force the future using get() 169 Pool.async([&i] { ++i; }).get(); 170 ASSERT_NE(2, i.load()); 171 setMainThreadReady(); 172 Pool.wait(); 173 ASSERT_EQ(2, i.load()); 174 } 175 176 TEST_F(ThreadPoolTest, GetFutureWithResult) { 177 CHECK_UNSUPPORTED(); 178 ThreadPool Pool(hardware_concurrency(2)); 179 auto F1 = Pool.async([] { return 1; }); 180 auto F2 = Pool.async([] { return 2; }); 181 182 setMainThreadReady(); 183 Pool.wait(); 184 ASSERT_EQ(1, F1.get()); 185 ASSERT_EQ(2, F2.get()); 186 } 187 188 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) { 189 CHECK_UNSUPPORTED(); 190 ThreadPool Pool(hardware_concurrency(2)); 191 auto Fn = [](int x) { return x; }; 192 auto F1 = Pool.async(Fn, 1); 193 auto F2 = Pool.async(Fn, 2); 194 195 setMainThreadReady(); 196 Pool.wait(); 197 ASSERT_EQ(1, F1.get()); 198 ASSERT_EQ(2, F2.get()); 199 } 200 201 TEST_F(ThreadPoolTest, PoolDestruction) { 202 CHECK_UNSUPPORTED(); 203 // Test that we are waiting on destruction 204 std::atomic_int checked_in{0}; 205 { 206 ThreadPool Pool; 207 for (size_t i = 0; i < 5; ++i) { 208 Pool.async([this, &checked_in] { 209 waitForMainThread(); 210 ++checked_in; 211 }); 212 } 213 ASSERT_EQ(0, checked_in); 214 setMainThreadReady(); 215 } 216 ASSERT_EQ(5, checked_in); 217 } 218 219 // Check running tasks in different groups. 220 TEST_F(ThreadPoolTest, Groups) { 221 CHECK_UNSUPPORTED(); 222 // Need at least two threads, as the task in group2 223 // might block a thread until all tasks in group1 finish. 224 ThreadPoolStrategy S = hardware_concurrency(2); 225 if (S.compute_thread_count() < 2) 226 return; 227 ThreadPool Pool(S); 228 PhaseResetHelper Helper(this); 229 ThreadPoolTaskGroup Group1(Pool); 230 ThreadPoolTaskGroup Group2(Pool); 231 232 // Check that waiting for an empty group is a no-op. 233 Group1.wait(); 234 235 std::atomic_int checked_in1{0}; 236 std::atomic_int checked_in2{0}; 237 238 for (size_t i = 0; i < 5; ++i) { 239 Group1.async([this, &checked_in1] { 240 waitForMainThread(); 241 ++checked_in1; 242 }); 243 } 244 Group2.async([this, &checked_in2] { 245 waitForPhase(2); 246 ++checked_in2; 247 }); 248 ASSERT_EQ(0, checked_in1); 249 ASSERT_EQ(0, checked_in2); 250 // Start first group and wait for it. 251 setMainThreadReady(); 252 Group1.wait(); 253 ASSERT_EQ(5, checked_in1); 254 // Second group has not yet finished, start it and wait for it. 255 ASSERT_EQ(0, checked_in2); 256 setPhase(2); 257 Group2.wait(); 258 ASSERT_EQ(5, checked_in1); 259 ASSERT_EQ(1, checked_in2); 260 } 261 262 // Check recursive tasks. 263 TEST_F(ThreadPoolTest, RecursiveGroups) { 264 CHECK_UNSUPPORTED(); 265 ThreadPool Pool; 266 ThreadPoolTaskGroup Group(Pool); 267 268 std::atomic_int checked_in1{0}; 269 270 for (size_t i = 0; i < 5; ++i) { 271 Group.async([this, &Pool, &checked_in1] { 272 waitForMainThread(); 273 274 ThreadPoolTaskGroup LocalGroup(Pool); 275 276 // Check that waiting for an empty group is a no-op. 277 LocalGroup.wait(); 278 279 std::atomic_int checked_in2{0}; 280 for (size_t i = 0; i < 5; ++i) { 281 LocalGroup.async([&checked_in2] { ++checked_in2; }); 282 } 283 LocalGroup.wait(); 284 ASSERT_EQ(5, checked_in2); 285 286 ++checked_in1; 287 }); 288 } 289 ASSERT_EQ(0, checked_in1); 290 setMainThreadReady(); 291 Group.wait(); 292 ASSERT_EQ(5, checked_in1); 293 } 294 295 TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) { 296 CHECK_UNSUPPORTED(); 297 ThreadPoolStrategy S = hardware_concurrency(2); 298 if (S.compute_thread_count() < 2) 299 return; 300 ThreadPool Pool(S); 301 PhaseResetHelper Helper(this); 302 ThreadPoolTaskGroup Group(Pool); 303 304 // Test that a thread calling wait() for a group and is waiting for more tasks 305 // returns when the last task finishes in a different thread while the waiting 306 // thread was waiting for more tasks to process while waiting. 307 308 // Task A runs in the first thread. It finishes and leaves 309 // the background thread waiting for more tasks. 310 Group.async([this] { 311 waitForMainThread(); 312 setPhase(2); 313 }); 314 // Task B is run in a second thread, it launches yet another 315 // task C in a different group, which will be handled by the waiting 316 // thread started above. 317 Group.async([this, &Pool] { 318 waitForPhase(2); 319 ThreadPoolTaskGroup LocalGroup(Pool); 320 LocalGroup.async([this] { 321 waitForPhase(3); 322 // Give the other thread enough time to check that there's no task 323 // to process and suspend waiting for a notification. This is indeed racy, 324 // but probably the best that can be done. 325 std::this_thread::sleep_for(std::chrono::milliseconds(10)); 326 }); 327 // And task B only now will wait for the tasks in the group (=task C) 328 // to finish. This test checks that it does not deadlock. If the 329 // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place, 330 // this task B would be stuck waiting for tasks to arrive. 331 setPhase(3); 332 LocalGroup.wait(); 333 }); 334 setMainThreadReady(); 335 Group.wait(); 336 } 337 338 #if LLVM_ENABLE_THREADS == 1 339 340 // FIXME: Skip some tests below on non-Windows because multi-socket systems 341 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask() 342 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc). 343 #ifdef _WIN32 344 345 std::vector<llvm::BitVector> 346 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) { 347 llvm::SetVector<llvm::BitVector> ThreadsUsed; 348 std::mutex Lock; 349 { 350 std::condition_variable AllThreads; 351 std::mutex AllThreadsLock; 352 unsigned Active = 0; 353 354 ThreadPool Pool(S); 355 for (size_t I = 0; I < S.compute_thread_count(); ++I) { 356 Pool.async([&] { 357 { 358 std::lock_guard<std::mutex> Guard(AllThreadsLock); 359 ++Active; 360 AllThreads.notify_one(); 361 } 362 waitForMainThread(); 363 std::lock_guard<std::mutex> Guard(Lock); 364 auto Mask = llvm::get_thread_affinity_mask(); 365 ThreadsUsed.insert(Mask); 366 }); 367 } 368 EXPECT_EQ(true, ThreadsUsed.empty()); 369 { 370 std::unique_lock<std::mutex> Guard(AllThreadsLock); 371 AllThreads.wait(Guard, 372 [&]() { return Active == S.compute_thread_count(); }); 373 } 374 setMainThreadReady(); 375 } 376 return ThreadsUsed.takeVector(); 377 } 378 379 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) { 380 CHECK_UNSUPPORTED(); 381 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); 382 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 383 } 384 385 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) { 386 CHECK_UNSUPPORTED(); 387 std::vector<llvm::BitVector> ThreadsUsed = 388 RunOnAllSockets(llvm::heavyweight_hardware_concurrency()); 389 ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size()); 390 } 391 392 // From TestMain.cpp. 393 extern const char *TestMainArgv0; 394 395 // Just a reachable symbol to ease resolving of the executable's path. 396 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1"); 397 398 #ifdef _WIN32 399 #define setenv(name, var, ignore) _putenv_s(name, var) 400 #endif 401 402 TEST_F(ThreadPoolTest, AffinityMask) { 403 CHECK_UNSUPPORTED(); 404 405 // Skip this test if less than 4 threads are available. 406 if (llvm::hardware_concurrency().compute_thread_count() < 4) 407 GTEST_SKIP(); 408 409 using namespace llvm::sys; 410 if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) { 411 std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({}); 412 // Ensure the threads only ran on CPUs 0-3. 413 // NOTE: Don't use ASSERT* here because this runs in a subprocess, 414 // and will show up as un-executed in the parent. 415 assert(llvm::all_of(ThreadsUsed, 416 [](auto &T) { return T.getData().front() < 16UL; }) && 417 "Threads ran on more CPUs than expected! The affinity mask does not " 418 "seem to work."); 419 GTEST_SKIP(); 420 } 421 std::string Executable = 422 sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1); 423 StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"}; 424 425 // Add environment variable to the environment of the child process. 426 int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false); 427 ASSERT_EQ(Res, 0); 428 429 std::string Error; 430 bool ExecutionFailed; 431 BitVector Affinity; 432 Affinity.resize(4); 433 Affinity.set(0, 4); // Use CPUs 0,1,2,3. 434 int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error, 435 &ExecutionFailed, nullptr, &Affinity); 436 ASSERT_EQ(0, Ret); 437 } 438 439 #endif // #ifdef _WIN32 440 #endif // #if LLVM_ENABLE_THREADS == 1 441