1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 
11 #include "llvm/ADT/STLExtras.h"
12 #include "llvm/ADT/SetVector.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/ADT/Triple.h"
15 #include "llvm/Support/CommandLine.h"
16 #include "llvm/Support/Host.h"
17 #include "llvm/Support/Program.h"
18 #include "llvm/Support/TargetSelect.h"
19 #include "llvm/Support/Threading.h"
20 
21 #include "gtest/gtest.h"
22 
23 using namespace llvm;
24 
25 // Fixture for the unittests, allowing to *temporarily* disable the unittests
26 // on a particular platform
27 class ThreadPoolTest : public testing::Test {
28   Triple Host;
29   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
30   SmallVector<Triple::OSType, 4> UnsupportedOSs;
31   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
32 protected:
33   // This is intended for platform as a temporary "XFAIL"
34   bool isUnsupportedOSOrEnvironment() {
35     Triple Host(Triple::normalize(sys::getProcessTriple()));
36 
37     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
38         UnsupportedEnvironments.end())
39       return true;
40 
41     if (is_contained(UnsupportedOSs, Host.getOS()))
42       return true;
43 
44     if (is_contained(UnsupportedArchs, Host.getArch()))
45       return true;
46 
47     return false;
48   }
49 
50   bool isWindows() {
51     // FIXME: Skip some tests below on non-Windows because multi-socket systems
52     // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
53     // isn't implemented for Unix.
54     Triple Host(Triple::normalize(sys::getProcessTriple()));
55     return Host.isOSWindows();
56   }
57 
58   ThreadPoolTest() {
59     // Add unsupported configuration here, example:
60     //   UnsupportedArchs.push_back(Triple::x86_64);
61 
62     // See https://llvm.org/bugs/show_bug.cgi?id=25829
63     UnsupportedArchs.push_back(Triple::ppc64le);
64     UnsupportedArchs.push_back(Triple::ppc64);
65   }
66 
67   /// Make sure this thread not progress faster than the main thread.
68   void waitForMainThread() {
69     std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
70     WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; });
71   }
72 
73   /// Set the readiness of the main thread.
74   void setMainThreadReady() {
75     {
76       std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
77       MainThreadReady = true;
78     }
79     WaitMainThread.notify_all();
80   }
81 
82   void SetUp() override { MainThreadReady = false; }
83 
84   std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
85 
86   std::condition_variable WaitMainThread;
87   std::mutex WaitMainThreadMutex;
88   bool MainThreadReady = false;
89 };
90 
91 #define CHECK_UNSUPPORTED()                                                    \
92   do {                                                                         \
93     if (isUnsupportedOSOrEnvironment())                                        \
94       return;                                                                  \
95   } while (0);
96 
97 #define SKIP_NON_WINDOWS()                                                     \
98   do {                                                                         \
99     if (!isWindows())                                                          \
100       return;                                                                  \
101   } while (0);
102 
103 TEST_F(ThreadPoolTest, AsyncBarrier) {
104   CHECK_UNSUPPORTED();
105   // test that async & barrier work together properly.
106 
107   std::atomic_int checked_in{0};
108 
109   ThreadPool Pool;
110   for (size_t i = 0; i < 5; ++i) {
111     Pool.async([this, &checked_in] {
112       waitForMainThread();
113       ++checked_in;
114     });
115   }
116   ASSERT_EQ(0, checked_in);
117   setMainThreadReady();
118   Pool.wait();
119   ASSERT_EQ(5, checked_in);
120 }
121 
122 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
123 
124 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
125   CHECK_UNSUPPORTED();
126   // Test that async works with a function requiring multiple parameters.
127   std::atomic_int checked_in{0};
128 
129   ThreadPool Pool;
130   for (size_t i = 0; i < 5; ++i) {
131     Pool.async(TestFunc, std::ref(checked_in), i);
132   }
133   Pool.wait();
134   ASSERT_EQ(10, checked_in);
135 }
136 
137 TEST_F(ThreadPoolTest, Async) {
138   CHECK_UNSUPPORTED();
139   ThreadPool Pool;
140   std::atomic_int i{0};
141   Pool.async([this, &i] {
142     waitForMainThread();
143     ++i;
144   });
145   Pool.async([&i] { ++i; });
146   ASSERT_NE(2, i.load());
147   setMainThreadReady();
148   Pool.wait();
149   ASSERT_EQ(2, i.load());
150 }
151 
152 TEST_F(ThreadPoolTest, GetFuture) {
153   CHECK_UNSUPPORTED();
154   ThreadPool Pool(hardware_concurrency(2));
155   std::atomic_int i{0};
156   Pool.async([this, &i] {
157     waitForMainThread();
158     ++i;
159   });
160   // Force the future using get()
161   Pool.async([&i] { ++i; }).get();
162   ASSERT_NE(2, i.load());
163   setMainThreadReady();
164   Pool.wait();
165   ASSERT_EQ(2, i.load());
166 }
167 
168 TEST_F(ThreadPoolTest, PoolDestruction) {
169   CHECK_UNSUPPORTED();
170   // Test that we are waiting on destruction
171   std::atomic_int checked_in{0};
172   {
173     ThreadPool Pool;
174     for (size_t i = 0; i < 5; ++i) {
175       Pool.async([this, &checked_in] {
176         waitForMainThread();
177         ++checked_in;
178       });
179     }
180     ASSERT_EQ(0, checked_in);
181     setMainThreadReady();
182   }
183   ASSERT_EQ(5, checked_in);
184 }
185 
186 #if LLVM_ENABLE_THREADS == 1
187 
188 std::vector<llvm::BitVector>
189 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
190   llvm::SetVector<llvm::BitVector> ThreadsUsed;
191   std::mutex Lock;
192   {
193     std::condition_variable AllThreads;
194     std::mutex AllThreadsLock;
195     unsigned Active = 0;
196 
197     ThreadPool Pool(S);
198     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
199       Pool.async([&] {
200         {
201           std::lock_guard<std::mutex> Guard(AllThreadsLock);
202           ++Active;
203           AllThreads.notify_one();
204         }
205         waitForMainThread();
206         std::lock_guard<std::mutex> Guard(Lock);
207         auto Mask = llvm::get_thread_affinity_mask();
208         ThreadsUsed.insert(Mask);
209       });
210     }
211     EXPECT_EQ(true, ThreadsUsed.empty());
212     {
213       std::unique_lock<std::mutex> Guard(AllThreadsLock);
214       AllThreads.wait(Guard,
215                       [&]() { return Active == S.compute_thread_count(); });
216     }
217     setMainThreadReady();
218   }
219   return ThreadsUsed.takeVector();
220 }
221 
222 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
223   CHECK_UNSUPPORTED();
224   SKIP_NON_WINDOWS();
225   std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
226   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
227 }
228 
229 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
230   CHECK_UNSUPPORTED();
231   SKIP_NON_WINDOWS();
232   std::vector<llvm::BitVector> ThreadsUsed =
233       RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
234   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
235 }
236 
237 // From TestMain.cpp.
238 extern const char *TestMainArgv0;
239 
240 // Just a reachable symbol to ease resolving of the executable's path.
241 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
242 
243 #ifdef _MSC_VER
244 #define setenv(name, var, ignore) _putenv_s(name, var)
245 #endif
246 
247 TEST_F(ThreadPoolTest, AffinityMask) {
248   CHECK_UNSUPPORTED();
249 
250   // FIXME: implement AffinityMask in Support/Unix/Program.inc
251   SKIP_NON_WINDOWS();
252 
253   // Skip this test if less than 4 threads are available.
254   if (llvm::hardware_concurrency().compute_thread_count() < 4)
255     return;
256 
257   using namespace llvm::sys;
258   if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
259     std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
260     // Ensure the threads only ran on CPUs 0-3.
261     for (auto &It : ThreadsUsed)
262       ASSERT_LT(It.getData().front(), 16UL);
263     return;
264   }
265   std::string Executable =
266       sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
267   StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
268 
269   // Add environment variable to the environment of the child process.
270   int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
271   ASSERT_EQ(Res, 0);
272 
273   std::string Error;
274   bool ExecutionFailed;
275   BitVector Affinity;
276   Affinity.resize(4);
277   Affinity.set(0, 4); // Use CPUs 0,1,2,3.
278   int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
279                                 &ExecutionFailed, nullptr, &Affinity);
280   ASSERT_EQ(0, Ret);
281 }
282 
283 #endif // #if LLVM_ENABLE_THREADS == 1
284