1 //========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 
9 #include "llvm/Support/ThreadPool.h"
10 
11 #include "llvm/ADT/DenseSet.h"
12 #include "llvm/ADT/STLExtras.h"
13 #include "llvm/ADT/SmallVector.h"
14 #include "llvm/ADT/Triple.h"
15 #include "llvm/Support/Host.h"
16 #include "llvm/Support/TargetSelect.h"
17 #include "llvm/Support/Threading.h"
18 
19 #include "gtest/gtest.h"
20 
21 using namespace llvm;
22 
23 // Fixture for the unittests, allowing to *temporarily* disable the unittests
24 // on a particular platform
25 class ThreadPoolTest : public testing::Test {
26   Triple Host;
27   SmallVector<Triple::ArchType, 4> UnsupportedArchs;
28   SmallVector<Triple::OSType, 4> UnsupportedOSs;
29   SmallVector<Triple::EnvironmentType, 1> UnsupportedEnvironments;
30 protected:
31   // This is intended for platform as a temporary "XFAIL"
32   bool isUnsupportedOSOrEnvironment() {
33     Triple Host(Triple::normalize(sys::getProcessTriple()));
34 
35     if (find(UnsupportedEnvironments, Host.getEnvironment()) !=
36         UnsupportedEnvironments.end())
37       return true;
38 
39     if (is_contained(UnsupportedOSs, Host.getOS()))
40       return true;
41 
42     if (is_contained(UnsupportedArchs, Host.getArch()))
43       return true;
44 
45     return false;
46   }
47 
48   ThreadPoolTest() {
49     // Add unsupported configuration here, example:
50     //   UnsupportedArchs.push_back(Triple::x86_64);
51 
52     // See https://llvm.org/bugs/show_bug.cgi?id=25829
53     UnsupportedArchs.push_back(Triple::ppc64le);
54     UnsupportedArchs.push_back(Triple::ppc64);
55   }
56 
57   /// Make sure this thread not progress faster than the main thread.
58   void waitForMainThread() {
59     std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
60     WaitMainThread.wait(LockGuard, [&] { return MainThreadReady; });
61   }
62 
63   /// Set the readiness of the main thread.
64   void setMainThreadReady() {
65     {
66       std::unique_lock<std::mutex> LockGuard(WaitMainThreadMutex);
67       MainThreadReady = true;
68     }
69     WaitMainThread.notify_all();
70   }
71 
72   void SetUp() override { MainThreadReady = false; }
73 
74   void RunOnAllSockets(ThreadPoolStrategy S);
75 
76   std::condition_variable WaitMainThread;
77   std::mutex WaitMainThreadMutex;
78   bool MainThreadReady = false;
79 };
80 
81 #define CHECK_UNSUPPORTED() \
82   do { \
83     if (isUnsupportedOSOrEnvironment()) \
84       return; \
85   } while (0); \
86 
87 TEST_F(ThreadPoolTest, AsyncBarrier) {
88   CHECK_UNSUPPORTED();
89   // test that async & barrier work together properly.
90 
91   std::atomic_int checked_in{0};
92 
93   ThreadPool Pool;
94   for (size_t i = 0; i < 5; ++i) {
95     Pool.async([this, &checked_in] {
96       waitForMainThread();
97       ++checked_in;
98     });
99   }
100   ASSERT_EQ(0, checked_in);
101   setMainThreadReady();
102   Pool.wait();
103   ASSERT_EQ(5, checked_in);
104 }
105 
106 static void TestFunc(std::atomic_int &checked_in, int i) { checked_in += i; }
107 
108 TEST_F(ThreadPoolTest, AsyncBarrierArgs) {
109   CHECK_UNSUPPORTED();
110   // Test that async works with a function requiring multiple parameters.
111   std::atomic_int checked_in{0};
112 
113   ThreadPool Pool;
114   for (size_t i = 0; i < 5; ++i) {
115     Pool.async(TestFunc, std::ref(checked_in), i);
116   }
117   Pool.wait();
118   ASSERT_EQ(10, checked_in);
119 }
120 
121 TEST_F(ThreadPoolTest, Async) {
122   CHECK_UNSUPPORTED();
123   ThreadPool Pool;
124   std::atomic_int i{0};
125   Pool.async([this, &i] {
126     waitForMainThread();
127     ++i;
128   });
129   Pool.async([&i] { ++i; });
130   ASSERT_NE(2, i.load());
131   setMainThreadReady();
132   Pool.wait();
133   ASSERT_EQ(2, i.load());
134 }
135 
136 TEST_F(ThreadPoolTest, NonCopyableTask) {
137   CHECK_UNSUPPORTED();
138   ThreadPool Pool;
139   Pool.async([P = std::make_unique<int>()] {});
140   Pool.wait();
141 };
142 
143 TEST_F(ThreadPoolTest, GetFuture) {
144   CHECK_UNSUPPORTED();
145   ThreadPool Pool(hardware_concurrency(2));
146   std::atomic_int i{0};
147   Pool.async([this, &i] {
148     waitForMainThread();
149     ++i;
150   });
151   // Force the future using get()
152   Pool.async([&i] { ++i; }).get();
153   ASSERT_NE(2, i.load());
154   setMainThreadReady();
155   Pool.wait();
156   ASSERT_EQ(2, i.load());
157 }
158 
159 TEST_F(ThreadPoolTest, PoolDestruction) {
160   CHECK_UNSUPPORTED();
161   // Test that we are waiting on destruction
162   std::atomic_int checked_in{0};
163   {
164     ThreadPool Pool;
165     for (size_t i = 0; i < 5; ++i) {
166       Pool.async([this, &checked_in] {
167         waitForMainThread();
168         ++checked_in;
169       });
170     }
171     ASSERT_EQ(0, checked_in);
172     setMainThreadReady();
173   }
174   ASSERT_EQ(5, checked_in);
175 }
176 
177 #if LLVM_ENABLE_THREADS == 1
178 
179 void ThreadPoolTest::RunOnAllSockets(ThreadPoolStrategy S) {
180   // FIXME: Skip these tests on non-Windows because multi-socket system were not
181   // tested on Unix yet, and llvm::get_thread_affinity_mask() isn't implemented
182   // for Unix.
183   Triple Host(Triple::normalize(sys::getProcessTriple()));
184   if (!Host.isOSWindows())
185     return;
186 
187   llvm::DenseSet<llvm::BitVector> ThreadsUsed;
188   std::mutex Lock;
189   {
190     std::condition_variable AllThreads;
191     std::mutex AllThreadsLock;
192     unsigned Active = 0;
193 
194     ThreadPool Pool(S);
195     for (size_t I = 0; I < S.compute_thread_count(); ++I) {
196       Pool.async([&] {
197         {
198           std::lock_guard<std::mutex> Guard(AllThreadsLock);
199           ++Active;
200           AllThreads.notify_one();
201         }
202         waitForMainThread();
203         std::lock_guard<std::mutex> Guard(Lock);
204         auto Mask = llvm::get_thread_affinity_mask();
205         ThreadsUsed.insert(Mask);
206       });
207     }
208     ASSERT_EQ(true, ThreadsUsed.empty());
209     {
210       std::unique_lock<std::mutex> Guard(AllThreadsLock);
211       AllThreads.wait(Guard,
212                       [&]() { return Active == S.compute_thread_count(); });
213     }
214     setMainThreadReady();
215   }
216   ASSERT_EQ(llvm::get_cpus(), ThreadsUsed.size());
217 }
218 
219 TEST_F(ThreadPoolTest, AllThreads_UseAllRessources) {
220   CHECK_UNSUPPORTED();
221   RunOnAllSockets({});
222 }
223 
224 TEST_F(ThreadPoolTest, AllThreads_OneThreadPerCore) {
225   CHECK_UNSUPPORTED();
226   RunOnAllSockets(llvm::heavyweight_hardware_concurrency());
227 }
228 
229 #endif
230