1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 
11 #include "llvm/ADT/STLExtras.h"
12 #include "llvm/ADT/SetVector.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/ADT/Triple.h"
15 #include "llvm/Support/CommandLine.h"
16 #include "llvm/Support/Host.h"
17 #include "llvm/Support/Program.h"
18 #include "llvm/Support/TargetSelect.h"
19 #include "llvm/Support/Threading.h"
20 
21 #include <chrono>
22 #include <thread>
23 
24 #include "gtest/gtest.h"
25 
26 using namespace llvm;
27 
28 // Fixture for the unittests, allowing to *temporarily* disable the unittests
29 // on a particular platform
30 class ThreadPoolTest : public testing::Test {
31   Triple Host;
32   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
33   SmallVector<Triple::OSType, 4> UnsupportedOSs;
34   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
35 
36 protected:
37   // This is intended for platform as a temporary "XFAIL"
38   bool isUnsupportedOSOrEnvironment() {
39     Triple Host(Triple::normalize(sys::getProcessTriple()));
40 
41     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
42         UnsupportedEnvironments.end())
43       return true;
44 
45     if (is_contained(UnsupportedOSs, Host.getOS()))
46       return true;
47 
48     if (is_contained(UnsupportedArchs, Host.getArch()))
49       return true;
50 
51     return false;
52   }
53 
54   ThreadPoolTest() {
55     // Add unsupported configuration here, example:
56     //   UnsupportedArchs.push_back(Triple::x86_64);
57 
58     // See https://llvm.org/bugs/show_bug.cgi?id=25829
59     UnsupportedArchs.push_back(Triple::ppc64le);
60     UnsupportedArchs.push_back(Triple::ppc64);
61   }
62 
63   /// Make sure this thread not progress faster than the main thread.
64   void waitForMainThread() { waitForPhase(1); }
65 
66   /// Set the readiness of the main thread.
67   void setMainThreadReady() { setPhase(1); }
68 
69   /// Wait until given phase is set using setPhase(); first "main" phase is 1.
70   /// See also PhaseResetHelper below.
71   void waitForPhase(int Phase) {
72     std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
73     CurrentPhaseCondition.wait(
74         LockGuard, [&] { return CurrentPhase == Phase || CurrentPhase < 0; });
75   }
76   /// If a thread waits on another phase, the test could bail out on a failed
77   /// assertion and ThreadPool destructor would wait() on all threads, which
78   /// would deadlock on the task waiting. Create this helper to automatically
79   /// reset the phase and unblock such threads.
80   struct PhaseResetHelper {
81     PhaseResetHelper(ThreadPoolTest *test) : test(test) {}
82     ~PhaseResetHelper() { test->setPhase(-1); }
83     ThreadPoolTest *test;
84   };
85 
86   /// Advance to the given phase.
87   void setPhase(int Phase) {
88     {
89       std::unique_lock<std::mutex> LockGuard(CurrentPhaseMutex);
90       assert(Phase == CurrentPhase + 1 || Phase < 0);
91       CurrentPhase = Phase;
92     }
93     CurrentPhaseCondition.notify_all();
94   }
95 
96   void SetUp() override { CurrentPhase = 0; }
97 
98   std::vector<llvm::BitVector> RunOnAllSockets(ThreadPoolStrategy S);
99 
100   std::condition_variable CurrentPhaseCondition;
101   std::mutex CurrentPhaseMutex;
102   int CurrentPhase; // -1 = error, 0 = setup, 1 = ready, 2+ = custom
103 };
104 
105 #define CHECK_UNSUPPORTED()                                                    \
106   do {                                                                         \
107     if (isUnsupportedOSOrEnvironment())                                        \
108       GTEST_SKIP();                                                            \
109   } while (0);
110 
111 TEST_F(ThreadPoolTest, AsyncBarrier) {
112   CHECK_UNSUPPORTED();
113   // test that async & barrier work together properly.
114 
115   std::atomic_int checked_in{0};
116 
117   ThreadPool Pool;
118   for (size_t i = 0; i < 5; ++i) {
119     Pool.async([this, &checked_in] {
120       waitForMainThread();
121       ++checked_in;
122     });
123   }
124   ASSERT_EQ(0, checked_in);
125   setMainThreadReady();
126   Pool.wait();
127   ASSERT_EQ(5, checked_in);
128 }
129 
130 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
131 
132 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
133   CHECK_UNSUPPORTED();
134   // Test that async works with a function requiring multiple parameters.
135   std::atomic_int checked_in{0};
136 
137   ThreadPool Pool;
138   for (size_t i = 0; i < 5; ++i) {
139     Pool.async(TestFunc, std::ref(checked_in), i);
140   }
141   Pool.wait();
142   ASSERT_EQ(10, checked_in);
143 }
144 
145 TEST_F(ThreadPoolTest, Async) {
146   CHECK_UNSUPPORTED();
147   ThreadPool Pool;
148   std::atomic_int i{0};
149   Pool.async([this, &i] {
150     waitForMainThread();
151     ++i;
152   });
153   Pool.async([&i] { ++i; });
154   ASSERT_NE(2, i.load());
155   setMainThreadReady();
156   Pool.wait();
157   ASSERT_EQ(2, i.load());
158 }
159 
160 TEST_F(ThreadPoolTest, GetFuture) {
161   CHECK_UNSUPPORTED();
162   ThreadPool Pool(hardware_concurrency(2));
163   std::atomic_int i{0};
164   Pool.async([this, &i] {
165     waitForMainThread();
166     ++i;
167   });
168   // Force the future using get()
169   Pool.async([&i] { ++i; }).get();
170   ASSERT_NE(2, i.load());
171   setMainThreadReady();
172   Pool.wait();
173   ASSERT_EQ(2, i.load());
174 }
175 
176 TEST_F(ThreadPoolTest, GetFutureWithResult) {
177   CHECK_UNSUPPORTED();
178   ThreadPool Pool(hardware_concurrency(2));
179   auto F1 = Pool.async([] { return 1; });
180   auto F2 = Pool.async([] { return 2; });
181 
182   setMainThreadReady();
183   Pool.wait();
184   ASSERT_EQ(1, F1.get());
185   ASSERT_EQ(2, F2.get());
186 }
187 
188 TEST_F(ThreadPoolTest, GetFutureWithResultAndArgs) {
189   CHECK_UNSUPPORTED();
190   ThreadPool Pool(hardware_concurrency(2));
191   auto Fn = [](int x) { return x; };
192   auto F1 = Pool.async(Fn, 1);
193   auto F2 = Pool.async(Fn, 2);
194 
195   setMainThreadReady();
196   Pool.wait();
197   ASSERT_EQ(1, F1.get());
198   ASSERT_EQ(2, F2.get());
199 }
200 
201 TEST_F(ThreadPoolTest, PoolDestruction) {
202   CHECK_UNSUPPORTED();
203   // Test that we are waiting on destruction
204   std::atomic_int checked_in{0};
205   {
206     ThreadPool Pool;
207     for (size_t i = 0; i < 5; ++i) {
208       Pool.async([this, &checked_in] {
209         waitForMainThread();
210         ++checked_in;
211       });
212     }
213     ASSERT_EQ(0, checked_in);
214     setMainThreadReady();
215   }
216   ASSERT_EQ(5, checked_in);
217 }
218 
219 // Check running tasks in different groups.
220 TEST_F(ThreadPoolTest, Groups) {
221   CHECK_UNSUPPORTED();
222   // Need at least two threads, as the task in group2
223   // might block a thread until all tasks in group1 finish.
224   ThreadPoolStrategy S = hardware_concurrency(2);
225   if (S.compute_thread_count() < 2)
226     return;
227   ThreadPool Pool(S);
228   PhaseResetHelper Helper(this);
229   ThreadPoolTaskGroup Group1(Pool);
230   ThreadPoolTaskGroup Group2(Pool);
231 
232   // Check that waiting for an empty group is a no-op.
233   Group1.wait();
234 
235   std::atomic_int checked_in1{0};
236   std::atomic_int checked_in2{0};
237 
238   for (size_t i = 0; i < 5; ++i) {
239     Group1.async([this, &checked_in1] {
240       waitForMainThread();
241       ++checked_in1;
242     });
243   }
244   Group2.async([this, &checked_in2] {
245     waitForPhase(2);
246     ++checked_in2;
247   });
248   ASSERT_EQ(0, checked_in1);
249   ASSERT_EQ(0, checked_in2);
250   // Start first group and wait for it.
251   setMainThreadReady();
252   Group1.wait();
253   ASSERT_EQ(5, checked_in1);
254   // Second group has not yet finished, start it and wait for it.
255   ASSERT_EQ(0, checked_in2);
256   setPhase(2);
257   Group2.wait();
258   ASSERT_EQ(5, checked_in1);
259   ASSERT_EQ(1, checked_in2);
260 }
261 
262 // Check recursive tasks.
263 TEST_F(ThreadPoolTest, RecursiveGroups) {
264   CHECK_UNSUPPORTED();
265   ThreadPool Pool;
266   ThreadPoolTaskGroup Group(Pool);
267 
268   std::atomic_int checked_in1{0};
269 
270   for (size_t i = 0; i < 5; ++i) {
271     Group.async([this, &Pool, &checked_in1] {
272       waitForMainThread();
273 
274       ThreadPoolTaskGroup LocalGroup(Pool);
275 
276       // Check that waiting for an empty group is a no-op.
277       LocalGroup.wait();
278 
279       std::atomic_int checked_in2{0};
280       for (size_t i = 0; i < 5; ++i) {
281         LocalGroup.async([&checked_in2] { ++checked_in2; });
282       }
283       LocalGroup.wait();
284       ASSERT_EQ(5, checked_in2);
285 
286       ++checked_in1;
287     });
288   }
289   ASSERT_EQ(0, checked_in1);
290   setMainThreadReady();
291   Group.wait();
292   ASSERT_EQ(5, checked_in1);
293 }
294 
295 TEST_F(ThreadPoolTest, RecursiveWaitDeadlock) {
296   CHECK_UNSUPPORTED();
297   ThreadPoolStrategy S = hardware_concurrency(2);
298   if (S.compute_thread_count() < 2)
299     return;
300   ThreadPool Pool(S);
301   PhaseResetHelper Helper(this);
302   ThreadPoolTaskGroup Group(Pool);
303 
304   // Test that a thread calling wait() for a group and is waiting for more tasks
305   // returns when the last task finishes in a different thread while the waiting
306   // thread was waiting for more tasks to process while waiting.
307 
308   // Task A runs in the first thread. It finishes and leaves
309   // the background thread waiting for more tasks.
310   Group.async([this] {
311     waitForMainThread();
312     setPhase(2);
313   });
314   // Task B is run in a second thread, it launches yet another
315   // task C in a different group, which will be handled by the waiting
316   // thread started above.
317   Group.async([this, &Pool] {
318     waitForPhase(2);
319     ThreadPoolTaskGroup LocalGroup(Pool);
320     LocalGroup.async([this] {
321       waitForPhase(3);
322       // Give the other thread enough time to check that there's no task
323       // to process and suspend waiting for a notification. This is indeed racy,
324       // but probably the best that can be done.
325       std::this_thread::sleep_for(std::chrono::milliseconds(10));
326     });
327     // And task B only now will wait for the tasks in the group (=task C)
328     // to finish. This test checks that it does not deadlock. If the
329     // `NotifyGroup` handling in ThreadPool::processTasks() didn't take place,
330     // this task B would be stuck waiting for tasks to arrive.
331     setPhase(3);
332     LocalGroup.wait();
333   });
334   setMainThreadReady();
335   Group.wait();
336 }
337 
338 #if LLVM_ENABLE_THREADS == 1
339 
340 // FIXME: Skip some tests below on non-Windows because multi-socket systems
341 // were not fully tested on Unix yet, and llvm::get_thread_affinity_mask()
342 // isn't implemented for Unix (need AffinityMask in Support/Unix/Program.inc).
343 #ifdef _WIN32
344 
345 std::vector<llvm::BitVector>
346 ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
347   llvm::SetVector<llvm::BitVector> ThreadsUsed;
348   std::mutex Lock;
349   {
350     std::condition_variable AllThreads;
351     std::mutex AllThreadsLock;
352     unsigned Active = 0;
353 
354     ThreadPool Pool(S);
355     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
356       Pool.async([&] {
357         {
358           std::lock_guard<std::mutex> Guard(AllThreadsLock);
359           ++Active;
360           AllThreads.notify_one();
361         }
362         waitForMainThread();
363         std::lock_guard<std::mutex> Guard(Lock);
364         auto Mask = llvm::get_thread_affinity_mask();
365         ThreadsUsed.insert(Mask);
366       });
367     }
368     EXPECT_EQ(true, ThreadsUsed.empty());
369     {
370       std::unique_lock<std::mutex> Guard(AllThreadsLock);
371       AllThreads.wait(Guard,
372                       [&]() { return Active == S.compute_thread_count(); });
373     }
374     setMainThreadReady();
375   }
376   return ThreadsUsed.takeVector();
377 }
378 
379 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
380   CHECK_UNSUPPORTED();
381   std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
382   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
383 }
384 
385 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
386   CHECK_UNSUPPORTED();
387   std::vector<llvm::BitVector> ThreadsUsed =
388       RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
389   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
390 }
391 
392 // From TestMain.cpp.
393 extern const char *TestMainArgv0;
394 
395 // Just a reachable symbol to ease resolving of the executable's path.
396 static cl::opt<std::string> ThreadPoolTestStringArg1("thread-pool-string-arg1");
397 
398 #ifdef _WIN32
399 #define setenv(name, var, ignore) _putenv_s(name, var)
400 #endif
401 
402 TEST_F(ThreadPoolTest, AffinityMask) {
403   CHECK_UNSUPPORTED();
404 
405   // Skip this test if less than 4 threads are available.
406   if (llvm::hardware_concurrency().compute_thread_count() < 4)
407     GTEST_SKIP();
408 
409   using namespace llvm::sys;
410   if (getenv("LLVM_THREADPOOL_AFFINITYMASK")) {
411     std::vector<llvm::BitVector> ThreadsUsed = RunOnAllSockets({});
412     // Ensure the threads only ran on CPUs 0-3.
413     // NOTE: Don't use ASSERT* here because this runs in a subprocess,
414     // and will show up as un-executed in the parent.
415     assert(llvm::all_of(ThreadsUsed,
416                         [](auto &T) { return T.getData().front() < 16UL; }) &&
417            "Threads ran on more CPUs than expected! The affinity mask does not "
418            "seem to work.");
419     GTEST_SKIP();
420   }
421   std::string Executable =
422       sys::fs::getMainExecutable(TestMainArgv0, &ThreadPoolTestStringArg1);
423   StringRef argv[] = {Executable, "--gtest_filter=ThreadPoolTest.AffinityMask"};
424 
425   // Add environment variable to the environment of the child process.
426   int Res = setenv("LLVM_THREADPOOL_AFFINITYMASK", "1", false);
427   ASSERT_EQ(Res, 0);
428 
429   std::string Error;
430   bool ExecutionFailed;
431   BitVector Affinity;
432   Affinity.resize(4);
433   Affinity.set(0, 4); // Use CPUs 0,1,2,3.
434   int Ret = sys::ExecuteAndWait(Executable, argv, {}, {}, 0, 0, &Error,
435                                 &ExecutionFailed, nullptr, &Affinity);
436   ASSERT_EQ(0, Ret);
437 }
438 
439 #endif // #ifdef _WIN32
440 #endif // #if LLVM_ENABLE_THREADS == 1
441