1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 
11 #include "llvm/ADT/DenseSet.h"
12 #include "llvm/ADT/STLExtras.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/ADT/Triple.h"
15 #include "llvm/ADT/Twine.h"
16 #include "llvm/Support/Host.h"
17 #include "llvm/Support/TargetSelect.h"
18 #include "llvm/Support/Threading.h"
19 
20 #include "gtest/gtest.h"
21 
22 using namespace llvm;
23 
24 // Fixture for the unittests, allowing to *temporarily* disable the unittests
25 // on a particular platform
26 class ThreadPoolTest : public testing::Test {
27   Triple Host;
28   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
29   SmallVector<Triple::OSType, 4> UnsupportedOSs;
30   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
31 protected:
32   // This is intended for platform as a temporary "XFAIL"
33   bool isUnsupportedOSOrEnvironment() {
34     Triple Host(Triple::normalize(sys::getProcessTriple()));
35 
36     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
37         UnsupportedEnvironments.end())
38       return true;
39 
40     if (is_contained(UnsupportedOSs, Host.getOS()))
41       return true;
42 
43     if (is_contained(UnsupportedArchs, Host.getArch()))
44       return true;
45 
46     return false;
47   }
48 
49   ThreadPoolTest() {
50     // Add unsupported configuration here, example:
51     //   UnsupportedArchs.push_back(Triple::x86_64);
52 
53     // See https://llvm.org/bugs/show_bug.cgi?id=25829
54     UnsupportedArchs.push_back(Triple::ppc64le);
55     UnsupportedArchs.push_back(Triple::ppc64);
56   }
57 
58   /// Make sure this thread not progress faster than the main thread.
59   void waitForMainThread() {
60     std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
61     WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; });
62   }
63 
64   /// Set the readiness of the main thread.
65   void setMainThreadReady() {
66     {
67       std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
68       MainThreadReady = true;
69     }
70     WaitMainThread.notify_all();
71   }
72 
73   void SetUp() override { MainThreadReady = false; }
74 
75   void RunOnAllSockets(ThreadPoolStrategy S);
76 
77   std::condition_variable WaitMainThread;
78   std::mutex WaitMainThreadMutex;
79   bool MainThreadReady = false;
80 };
81 
82 #define CHECK_UNSUPPORTED() \
83   do { \
84     if (isUnsupportedOSOrEnvironment()) \
85       return; \
86   } while (0); \
87 
88 TEST_F(ThreadPoolTest, AsyncBarrier) {
89   CHECK_UNSUPPORTED();
90   // test that async & barrier work together properly.
91 
92   std::atomic_int checked_in{0};
93 
94   ThreadPool Pool;
95   for (size_t i = 0; i < 5; ++i) {
96     Pool.async([this, &checked_in] {
97       waitForMainThread();
98       ++checked_in;
99     });
100   }
101   ASSERT_EQ(0, checked_in);
102   setMainThreadReady();
103   Pool.wait();
104   ASSERT_EQ(5, checked_in);
105 }
106 
107 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
108 
109 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
110   CHECK_UNSUPPORTED();
111   // Test that async works with a function requiring multiple parameters.
112   std::atomic_int checked_in{0};
113 
114   ThreadPool Pool;
115   for (size_t i = 0; i < 5; ++i) {
116     Pool.async(TestFunc, std::ref(checked_in), i);
117   }
118   Pool.wait();
119   ASSERT_EQ(10, checked_in);
120 }
121 
122 TEST_F(ThreadPoolTest, Async) {
123   CHECK_UNSUPPORTED();
124   ThreadPool Pool;
125   std::atomic_int i{0};
126   Pool.async([this, &i] {
127     waitForMainThread();
128     ++i;
129   });
130   Pool.async([&i] { ++i; });
131   ASSERT_NE(2, i.load());
132   setMainThreadReady();
133   Pool.wait();
134   ASSERT_EQ(2, i.load());
135 }
136 
137 TEST_F(ThreadPoolTest, GetFuture) {
138   CHECK_UNSUPPORTED();
139   ThreadPool Pool(hardware_concurrency(2));
140   std::atomic_int i{0};
141   Pool.async([this, &i] {
142     waitForMainThread();
143     ++i;
144   });
145   // Force the future using get()
146   Pool.async([&i] { ++i; }).get();
147   ASSERT_NE(2, i.load());
148   setMainThreadReady();
149   Pool.wait();
150   ASSERT_EQ(2, i.load());
151 }
152 
153 TEST_F(ThreadPoolTest, PoolDestruction) {
154   CHECK_UNSUPPORTED();
155   // Test that we are waiting on destruction
156   std::atomic_int checked_in{0};
157   {
158     ThreadPool Pool;
159     for (size_t i = 0; i < 5; ++i) {
160       Pool.async([this, &checked_in] {
161         waitForMainThread();
162         ++checked_in;
163       });
164     }
165     ASSERT_EQ(0, checked_in);
166     setMainThreadReady();
167   }
168   ASSERT_EQ(5, checked_in);
169 }
170 
171 #if LLVM_ENABLE_THREADS == 1
172 
173 void ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
174   // FIXME: Skip these tests on non-Windows because multi-socket system were not
175   // tested on Unix yet, and llvm::get_thread_affinity_mask() isn't implemented
176   // for Unix.
177   Triple Host(Triple::normalize(sys::getProcessTriple()));
178   if (!Host.isOSWindows())
179     return;
180 
181   llvm::DenseSet<llvm::BitVector> ThreadsUsed;
182   std::mutex Lock;
183   {
184     std::condition_variable AllThreads;
185     std::mutex AllThreadsLock;
186     unsigned Active = 0;
187 
188     ThreadPool Pool(S);
189     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
190       Pool.async([&] {
191         {
192           std::lock_guard<std::mutex> Guard(AllThreadsLock);
193           ++Active;
194           AllThreads.notify_one();
195         }
196         waitForMainThread();
197         std::lock_guard<std::mutex> Guard(Lock);
198         auto Mask = llvm::get_thread_affinity_mask();
199         ThreadsUsed.insert(Mask);
200       });
201     }
202     ASSERT_EQ(true, ThreadsUsed.empty());
203     {
204       std::unique_lock<std::mutex> Guard(AllThreadsLock);
205       AllThreads.wait(Guard,
206                       [&]() { return Active == S.compute_thread_count(); });
207     }
208     setMainThreadReady();
209   }
210   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
211 }
212 
213 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
214   CHECK_UNSUPPORTED();
215   RunOnAllSockets({});
216 }
217 
218 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
219   CHECK_UNSUPPORTED();
220   RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
221 }
222 
223 #endif
224