1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include <folly/synchronization/DistributedMutex.h>
7 #include <folly/container/Array.h>
8 #include <folly/synchronization/Baton.h>
9 
10 #ifdef OS_AIX
11 #include "gtest/gtest.h"
12 #else
13 #include <gtest/gtest.h>
14 #endif
15 
16 #if !defined(ROCKSDB_LITE) && !defined(__ARM_ARCH) && \
17     !defined(ROCKSDB_VALGRIND_RUN)
18 
19 #include <chrono>
20 #include <cmath>
21 #include <thread>
22 
23 namespace folly {
24 namespace test {
25 template <template <typename> class Atomic>
26 using TestDistributedMutex =
27     folly::detail::distributed_mutex::DistributedMutex<Atomic, false>;
28 } // namespace test
29 
30 namespace {
31 constexpr auto kStressFactor = 1000;
32 constexpr auto kStressTestSeconds = 2;
33 constexpr auto kForever = std::chrono::hours{100};
34 
sum(int n)35 int sum(int n) {
36   return (n * (n + 1)) / 2;
37 }
38 
39 template <template <typename> class Atom = std::atomic>
basicNThreads(int numThreads,int iterations=kStressFactor)40 void basicNThreads(int numThreads, int iterations = kStressFactor) {
41   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
42   auto&& barrier = std::atomic<int>{0};
43   auto&& threads = std::vector<std::thread>{};
44   auto&& result = std::vector<int>{};
45 
46   auto&& function = [&](int id) {
47     return [&, id] {
48       for (auto j = 0; j < iterations; ++j) {
49         auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
50         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
51         std::this_thread::yield();
52         result.push_back(id);
53         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
54       }
55     };
56   };
57 
58   for (auto i = 1; i <= numThreads; ++i) {
59     threads.push_back(std::thread(function(i)));
60   }
61   for (auto& thread : threads) {
62     thread.join();
63   }
64 
65   auto total = 0;
66   for (auto value : result) {
67     total += value;
68   }
69   EXPECT_EQ(total, sum(numThreads) * iterations);
70 }
71 
72 template <template <typename> class Atom = std::atomic>
lockWithTryAndTimedNThreads(int numThreads,std::chrono::seconds duration)73 void lockWithTryAndTimedNThreads(
74     int numThreads,
75     std::chrono::seconds duration) {
76   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
77   auto&& barrier = std::atomic<int>{0};
78   auto&& threads = std::vector<std::thread>{};
79   auto&& stop = std::atomic<bool>{false};
80 
81   auto&& lockUnlockFunction = [&]() {
82     while (!stop.load()) {
83       auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
84       EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
85       std::this_thread::yield();
86       EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
87     }
88   };
89 
90   auto tryLockFunction = [&]() {
91     while (!stop.load()) {
92       using Mutex = _t<std::decay<decltype(mutex)>>;
93       auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
94       if (lck.try_lock()) {
95         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
96         std::this_thread::yield();
97         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
98       }
99     }
100   };
101 
102   auto timedLockFunction = [&]() {
103     while (!stop.load()) {
104       using Mutex = _t<std::decay<decltype(mutex)>>;
105       auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
106       if (lck.try_lock_for(kForever)) {
107         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
108         std::this_thread::yield();
109         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
110       }
111     }
112   };
113 
114   for (auto i = 0; i < (numThreads / 3); ++i) {
115     threads.push_back(std::thread(lockUnlockFunction));
116   }
117   for (auto i = 0; i < (numThreads / 3); ++i) {
118     threads.push_back(std::thread(tryLockFunction));
119   }
120   for (auto i = 0; i < (numThreads / 3); ++i) {
121     threads.push_back(std::thread(timedLockFunction));
122   }
123 
124   /* sleep override */
125   std::this_thread::sleep_for(duration);
126   stop.store(true);
127   for (auto& thread : threads) {
128     thread.join();
129   }
130 }
131 
132 template <template <typename> class Atom = std::atomic>
combineNThreads(int numThreads,std::chrono::seconds duration)133 void combineNThreads(int numThreads, std::chrono::seconds duration) {
134   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
135   auto&& barrier = std::atomic<int>{0};
136   auto&& threads = std::vector<std::thread>{};
137   auto&& stop = std::atomic<bool>{false};
138 
139   auto&& function = [&]() {
140     return [&] {
141       auto&& expected = std::uint64_t{0};
142       auto&& local = std::atomic<std::uint64_t>{0};
143       auto&& result = std::atomic<std::uint64_t>{0};
144       while (!stop.load()) {
145         ++expected;
146         auto current = mutex.lock_combine([&]() {
147           result.fetch_add(1);
148           EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
149           EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
150           std::this_thread::yield();
151           SCOPE_EXIT {
152             EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
153           };
154           EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
155           return local.fetch_add(1);
156         });
157         EXPECT_EQ(current, expected - 1);
158       }
159 
160       EXPECT_EQ(expected, result.load());
161     };
162   };
163 
164   for (auto i = 1; i <= numThreads; ++i) {
165     threads.push_back(std::thread(function()));
166   }
167 
168   /* sleep override */
169   std::this_thread::sleep_for(duration);
170   stop.store(true);
171   for (auto& thread : threads) {
172     thread.join();
173   }
174 }
175 
176 template <template <typename> class Atom = std::atomic>
combineWithLockNThreads(int numThreads,std::chrono::seconds duration)177 void combineWithLockNThreads(int numThreads, std::chrono::seconds duration) {
178   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
179   auto&& barrier = std::atomic<int>{0};
180   auto&& threads = std::vector<std::thread>{};
181   auto&& stop = std::atomic<bool>{false};
182 
183   auto&& lockUnlockFunction = [&]() {
184     while (!stop.load()) {
185       auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
186       EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
187       std::this_thread::yield();
188       EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
189     }
190   };
191 
192   auto&& combineFunction = [&]() {
193     auto&& expected = std::uint64_t{0};
194     auto&& total = std::atomic<std::uint64_t>{0};
195 
196     while (!stop.load()) {
197       ++expected;
198       auto current = mutex.lock_combine([&]() {
199         auto iteration = total.fetch_add(1);
200         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
201         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
202         std::this_thread::yield();
203         SCOPE_EXIT {
204           EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
205         };
206         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
207         return iteration;
208       });
209 
210       EXPECT_EQ(expected, current + 1);
211     }
212 
213     EXPECT_EQ(expected, total.load());
214   };
215 
216   for (auto i = 1; i < (numThreads / 2); ++i) {
217     threads.push_back(std::thread(combineFunction));
218   }
219   for (auto i = 0; i < (numThreads / 2); ++i) {
220     threads.push_back(std::thread(lockUnlockFunction));
221   }
222 
223   /* sleep override */
224   std::this_thread::sleep_for(duration);
225   stop.store(true);
226   for (auto& thread : threads) {
227     thread.join();
228   }
229 }
230 
231 template <template <typename> class Atom = std::atomic>
combineWithTryLockNThreads(int numThreads,std::chrono::seconds duration)232 void combineWithTryLockNThreads(int numThreads, std::chrono::seconds duration) {
233   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
234   auto&& barrier = std::atomic<int>{0};
235   auto&& threads = std::vector<std::thread>{};
236   auto&& stop = std::atomic<bool>{false};
237 
238   auto&& lockUnlockFunction = [&]() {
239     while (!stop.load()) {
240       auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
241       EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
242       std::this_thread::yield();
243       EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
244     }
245   };
246 
247   auto&& combineFunction = [&]() {
248     auto&& expected = std::uint64_t{0};
249     auto&& total = std::atomic<std::uint64_t>{0};
250 
251     while (!stop.load()) {
252       ++expected;
253       auto current = mutex.lock_combine([&]() {
254         auto iteration = total.fetch_add(1);
255         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
256         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
257         std::this_thread::yield();
258         SCOPE_EXIT {
259           EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
260         };
261         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
262         return iteration;
263       });
264 
265       EXPECT_EQ(expected, current + 1);
266     }
267 
268     EXPECT_EQ(expected, total.load());
269   };
270 
271   auto tryLockFunction = [&]() {
272     while (!stop.load()) {
273       using Mutex = _t<std::decay<decltype(mutex)>>;
274       auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
275       if (lck.try_lock()) {
276         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
277         std::this_thread::yield();
278         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
279       }
280     }
281   };
282 
283   for (auto i = 0; i < (numThreads / 3); ++i) {
284     threads.push_back(std::thread(lockUnlockFunction));
285   }
286   for (auto i = 0; i < (numThreads / 3); ++i) {
287     threads.push_back(std::thread(combineFunction));
288   }
289   for (auto i = 0; i < (numThreads / 3); ++i) {
290     threads.push_back(std::thread(tryLockFunction));
291   }
292 
293   /* sleep override */
294   std::this_thread::sleep_for(duration);
295   stop.store(true);
296   for (auto& thread : threads) {
297     thread.join();
298   }
299 }
300 
301 template <template <typename> class Atom = std::atomic>
combineWithLockTryAndTimedNThreads(int numThreads,std::chrono::seconds duration)302 void combineWithLockTryAndTimedNThreads(
303     int numThreads,
304     std::chrono::seconds duration) {
305   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
306   auto&& barrier = std::atomic<int>{0};
307   auto&& threads = std::vector<std::thread>{};
308   auto&& stop = std::atomic<bool>{false};
309 
310   auto&& lockUnlockFunction = [&]() {
311     while (!stop.load()) {
312       auto lck = std::unique_lock<_t<std::decay<decltype(mutex)>>>{mutex};
313       EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
314       std::this_thread::yield();
315       EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
316     }
317   };
318 
319   auto&& combineFunction = [&]() {
320     auto&& expected = std::uint64_t{0};
321     auto&& total = std::atomic<std::uint64_t>{0};
322 
323     while (!stop.load()) {
324       ++expected;
325       auto current = mutex.lock_combine([&]() {
326         auto iteration = total.fetch_add(1);
327         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
328         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
329         std::this_thread::yield();
330         SCOPE_EXIT {
331           EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
332         };
333         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
334 
335         // return a non-trivially-copyable object that occupies all the
336         // storage we use to coalesce returns to test that codepath
337         return folly::make_array(
338             iteration,
339             iteration + 1,
340             iteration + 2,
341             iteration + 3,
342             iteration + 4,
343             iteration + 5);
344       });
345 
346       EXPECT_EQ(expected, current[0] + 1);
347       EXPECT_EQ(expected, current[1]);
348       EXPECT_EQ(expected, current[2] - 1);
349       EXPECT_EQ(expected, current[3] - 2);
350       EXPECT_EQ(expected, current[4] - 3);
351       EXPECT_EQ(expected, current[5] - 4);
352     }
353 
354     EXPECT_EQ(expected, total.load());
355   };
356 
357   auto tryLockFunction = [&]() {
358     while (!stop.load()) {
359       using Mutex = _t<std::decay<decltype(mutex)>>;
360       auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
361       if (lck.try_lock()) {
362         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
363         std::this_thread::yield();
364         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
365       }
366     }
367   };
368 
369   auto timedLockFunction = [&]() {
370     while (!stop.load()) {
371       using Mutex = _t<std::decay<decltype(mutex)>>;
372       auto lck = std::unique_lock<Mutex>{mutex, std::defer_lock};
373       if (lck.try_lock_for(kForever)) {
374         EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
375         std::this_thread::yield();
376         EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
377       }
378     }
379   };
380 
381   for (auto i = 0; i < (numThreads / 4); ++i) {
382     threads.push_back(std::thread(lockUnlockFunction));
383   }
384   for (auto i = 0; i < (numThreads / 4); ++i) {
385     threads.push_back(std::thread(combineFunction));
386   }
387   for (auto i = 0; i < (numThreads / 4); ++i) {
388     threads.push_back(std::thread(tryLockFunction));
389   }
390   for (auto i = 0; i < (numThreads / 4); ++i) {
391     threads.push_back(std::thread(timedLockFunction));
392   }
393 
394   /* sleep override */
395   std::this_thread::sleep_for(duration);
396   stop.store(true);
397   for (auto& thread : threads) {
398     thread.join();
399   }
400 }
401 } // namespace
402 
TEST(DistributedMutex,InternalDetailTestOne)403 TEST(DistributedMutex, InternalDetailTestOne) {
404   auto value = 0;
405   auto ptr = reinterpret_cast<std::uintptr_t>(&value);
406   EXPECT_EQ(folly::detail::distributed_mutex::extractPtr<int>(ptr), &value);
407   ptr = ptr | 0b1;
408   EXPECT_EQ(folly::detail::distributed_mutex::extractPtr<int>(ptr), &value);
409 }
410 
TEST(DistributedMutex,Basic)411 TEST(DistributedMutex, Basic) {
412   auto&& mutex = folly::DistributedMutex{};
413   auto state = mutex.lock();
414   mutex.unlock(std::move(state));
415 }
416 
TEST(DistributedMutex,BasicTryLock)417 TEST(DistributedMutex, BasicTryLock) {
418   auto&& mutex = folly::DistributedMutex{};
419 
420   while (true) {
421     auto state = mutex.try_lock();
422     if (state) {
423       mutex.unlock(std::move(state));
424       break;
425     }
426   }
427 }
428 
TEST(DistributedMutex,StressTwoThreads)429 TEST(DistributedMutex, StressTwoThreads) {
430   basicNThreads(2);
431 }
TEST(DistributedMutex,StressThreeThreads)432 TEST(DistributedMutex, StressThreeThreads) {
433   basicNThreads(3);
434 }
TEST(DistributedMutex,StressFourThreads)435 TEST(DistributedMutex, StressFourThreads) {
436   basicNThreads(4);
437 }
TEST(DistributedMutex,StressFiveThreads)438 TEST(DistributedMutex, StressFiveThreads) {
439   basicNThreads(5);
440 }
TEST(DistributedMutex,StressSixThreads)441 TEST(DistributedMutex, StressSixThreads) {
442   basicNThreads(6);
443 }
TEST(DistributedMutex,StressSevenThreads)444 TEST(DistributedMutex, StressSevenThreads) {
445   basicNThreads(7);
446 }
TEST(DistributedMutex,StressEightThreads)447 TEST(DistributedMutex, StressEightThreads) {
448   basicNThreads(8);
449 }
TEST(DistributedMutex,StressSixteenThreads)450 TEST(DistributedMutex, StressSixteenThreads) {
451   basicNThreads(16);
452 }
TEST(DistributedMutex,StressThirtyTwoThreads)453 TEST(DistributedMutex, StressThirtyTwoThreads) {
454   basicNThreads(32);
455 }
TEST(DistributedMutex,StressSixtyFourThreads)456 TEST(DistributedMutex, StressSixtyFourThreads) {
457   basicNThreads(64);
458 }
TEST(DistributedMutex,StressHundredThreads)459 TEST(DistributedMutex, StressHundredThreads) {
460   basicNThreads(100);
461 }
TEST(DistributedMutex,StressHardwareConcurrencyThreads)462 TEST(DistributedMutex, StressHardwareConcurrencyThreads) {
463   basicNThreads(std::thread::hardware_concurrency());
464 }
465 
TEST(DistributedMutex,StressThreeThreadsLockTryAndTimed)466 TEST(DistributedMutex, StressThreeThreadsLockTryAndTimed) {
467   lockWithTryAndTimedNThreads(3, std::chrono::seconds{kStressTestSeconds});
468 }
TEST(DistributedMutex,StressSixThreadsLockTryAndTimed)469 TEST(DistributedMutex, StressSixThreadsLockTryAndTimed) {
470   lockWithTryAndTimedNThreads(6, std::chrono::seconds{kStressTestSeconds});
471 }
TEST(DistributedMutex,StressTwelveThreadsLockTryAndTimed)472 TEST(DistributedMutex, StressTwelveThreadsLockTryAndTimed) {
473   lockWithTryAndTimedNThreads(12, std::chrono::seconds{kStressTestSeconds});
474 }
TEST(DistributedMutex,StressTwentyFourThreadsLockTryAndTimed)475 TEST(DistributedMutex, StressTwentyFourThreadsLockTryAndTimed) {
476   lockWithTryAndTimedNThreads(24, std::chrono::seconds{kStressTestSeconds});
477 }
TEST(DistributedMutex,StressFourtyEightThreadsLockTryAndTimed)478 TEST(DistributedMutex, StressFourtyEightThreadsLockTryAndTimed) {
479   lockWithTryAndTimedNThreads(48, std::chrono::seconds{kStressTestSeconds});
480 }
TEST(DistributedMutex,StressSixtyFourThreadsLockTryAndTimed)481 TEST(DistributedMutex, StressSixtyFourThreadsLockTryAndTimed) {
482   lockWithTryAndTimedNThreads(64, std::chrono::seconds{kStressTestSeconds});
483 }
TEST(DistributedMutex,StressHwConcThreadsLockTryAndTimed)484 TEST(DistributedMutex, StressHwConcThreadsLockTryAndTimed) {
485   lockWithTryAndTimedNThreads(
486       std::thread::hardware_concurrency(),
487       std::chrono::seconds{kStressTestSeconds});
488 }
489 
TEST(DistributedMutex,StressTwoThreadsCombine)490 TEST(DistributedMutex, StressTwoThreadsCombine) {
491   combineNThreads(2, std::chrono::seconds{kStressTestSeconds});
492 }
TEST(DistributedMutex,StressThreeThreadsCombine)493 TEST(DistributedMutex, StressThreeThreadsCombine) {
494   combineNThreads(3, std::chrono::seconds{kStressTestSeconds});
495 }
TEST(DistributedMutex,StressFourThreadsCombine)496 TEST(DistributedMutex, StressFourThreadsCombine) {
497   combineNThreads(4, std::chrono::seconds{kStressTestSeconds});
498 }
TEST(DistributedMutex,StressFiveThreadsCombine)499 TEST(DistributedMutex, StressFiveThreadsCombine) {
500   combineNThreads(5, std::chrono::seconds{kStressTestSeconds});
501 }
TEST(DistributedMutex,StressSixThreadsCombine)502 TEST(DistributedMutex, StressSixThreadsCombine) {
503   combineNThreads(6, std::chrono::seconds{kStressTestSeconds});
504 }
TEST(DistributedMutex,StressSevenThreadsCombine)505 TEST(DistributedMutex, StressSevenThreadsCombine) {
506   combineNThreads(7, std::chrono::seconds{kStressTestSeconds});
507 }
TEST(DistributedMutex,StressEightThreadsCombine)508 TEST(DistributedMutex, StressEightThreadsCombine) {
509   combineNThreads(8, std::chrono::seconds{kStressTestSeconds});
510 }
TEST(DistributedMutex,StressSixteenThreadsCombine)511 TEST(DistributedMutex, StressSixteenThreadsCombine) {
512   combineNThreads(16, std::chrono::seconds{kStressTestSeconds});
513 }
TEST(DistributedMutex,StressThirtyTwoThreadsCombine)514 TEST(DistributedMutex, StressThirtyTwoThreadsCombine) {
515   combineNThreads(32, std::chrono::seconds{kStressTestSeconds});
516 }
TEST(DistributedMutex,StressSixtyFourThreadsCombine)517 TEST(DistributedMutex, StressSixtyFourThreadsCombine) {
518   combineNThreads(64, std::chrono::seconds{kStressTestSeconds});
519 }
TEST(DistributedMutex,StressHundredThreadsCombine)520 TEST(DistributedMutex, StressHundredThreadsCombine) {
521   combineNThreads(100, std::chrono::seconds{kStressTestSeconds});
522 }
TEST(DistributedMutex,StressHardwareConcurrencyThreadsCombine)523 TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombine) {
524   combineNThreads(
525       std::thread::hardware_concurrency(),
526       std::chrono::seconds{kStressTestSeconds});
527 }
528 
TEST(DistributedMutex,StressTwoThreadsCombineAndLock)529 TEST(DistributedMutex, StressTwoThreadsCombineAndLock) {
530   combineWithLockNThreads(2, std::chrono::seconds{kStressTestSeconds});
531 }
TEST(DistributedMutex,StressFourThreadsCombineAndLock)532 TEST(DistributedMutex, StressFourThreadsCombineAndLock) {
533   combineWithLockNThreads(4, std::chrono::seconds{kStressTestSeconds});
534 }
TEST(DistributedMutex,StressEightThreadsCombineAndLock)535 TEST(DistributedMutex, StressEightThreadsCombineAndLock) {
536   combineWithLockNThreads(8, std::chrono::seconds{kStressTestSeconds});
537 }
TEST(DistributedMutex,StressSixteenThreadsCombineAndLock)538 TEST(DistributedMutex, StressSixteenThreadsCombineAndLock) {
539   combineWithLockNThreads(16, std::chrono::seconds{kStressTestSeconds});
540 }
TEST(DistributedMutex,StressThirtyTwoThreadsCombineAndLock)541 TEST(DistributedMutex, StressThirtyTwoThreadsCombineAndLock) {
542   combineWithLockNThreads(32, std::chrono::seconds{kStressTestSeconds});
543 }
TEST(DistributedMutex,StressSixtyFourThreadsCombineAndLock)544 TEST(DistributedMutex, StressSixtyFourThreadsCombineAndLock) {
545   combineWithLockNThreads(64, std::chrono::seconds{kStressTestSeconds});
546 }
TEST(DistributedMutex,StressHardwareConcurrencyThreadsCombineAndLock)547 TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombineAndLock) {
548   combineWithLockNThreads(
549       std::thread::hardware_concurrency(),
550       std::chrono::seconds{kStressTestSeconds});
551 }
552 
TEST(DistributedMutex,StressThreeThreadsCombineTryLockAndLock)553 TEST(DistributedMutex, StressThreeThreadsCombineTryLockAndLock) {
554   combineWithTryLockNThreads(3, std::chrono::seconds{kStressTestSeconds});
555 }
TEST(DistributedMutex,StressSixThreadsCombineTryLockAndLock)556 TEST(DistributedMutex, StressSixThreadsCombineTryLockAndLock) {
557   combineWithTryLockNThreads(6, std::chrono::seconds{kStressTestSeconds});
558 }
TEST(DistributedMutex,StressTwelveThreadsCombineTryLockAndLock)559 TEST(DistributedMutex, StressTwelveThreadsCombineTryLockAndLock) {
560   combineWithTryLockNThreads(12, std::chrono::seconds{kStressTestSeconds});
561 }
TEST(DistributedMutex,StressTwentyFourThreadsCombineTryLockAndLock)562 TEST(DistributedMutex, StressTwentyFourThreadsCombineTryLockAndLock) {
563   combineWithTryLockNThreads(24, std::chrono::seconds{kStressTestSeconds});
564 }
TEST(DistributedMutex,StressFourtyEightThreadsCombineTryLockAndLock)565 TEST(DistributedMutex, StressFourtyEightThreadsCombineTryLockAndLock) {
566   combineWithTryLockNThreads(48, std::chrono::seconds{kStressTestSeconds});
567 }
TEST(DistributedMutex,StressSixtyFourThreadsCombineTryLockAndLock)568 TEST(DistributedMutex, StressSixtyFourThreadsCombineTryLockAndLock) {
569   combineWithTryLockNThreads(64, std::chrono::seconds{kStressTestSeconds});
570 }
TEST(DistributedMutex,StressHardwareConcurrencyThreadsCombineTryLockAndLock)571 TEST(DistributedMutex, StressHardwareConcurrencyThreadsCombineTryLockAndLock) {
572   combineWithTryLockNThreads(
573       std::thread::hardware_concurrency(),
574       std::chrono::seconds{kStressTestSeconds});
575 }
576 
TEST(DistributedMutex,StressThreeThreadsCombineTryLockLockAndTimed)577 TEST(DistributedMutex, StressThreeThreadsCombineTryLockLockAndTimed) {
578   combineWithLockTryAndTimedNThreads(
579       3, std::chrono::seconds{kStressTestSeconds});
580 }
TEST(DistributedMutex,StressSixThreadsCombineTryLockLockAndTimed)581 TEST(DistributedMutex, StressSixThreadsCombineTryLockLockAndTimed) {
582   combineWithLockTryAndTimedNThreads(
583       6, std::chrono::seconds{kStressTestSeconds});
584 }
TEST(DistributedMutex,StressTwelveThreadsCombineTryLockLockAndTimed)585 TEST(DistributedMutex, StressTwelveThreadsCombineTryLockLockAndTimed) {
586   combineWithLockTryAndTimedNThreads(
587       12, std::chrono::seconds{kStressTestSeconds});
588 }
TEST(DistributedMutex,StressTwentyFourThreadsCombineTryLockLockAndTimed)589 TEST(DistributedMutex, StressTwentyFourThreadsCombineTryLockLockAndTimed) {
590   combineWithLockTryAndTimedNThreads(
591       24, std::chrono::seconds{kStressTestSeconds});
592 }
TEST(DistributedMutex,StressFourtyEightThreadsCombineTryLockLockAndTimed)593 TEST(DistributedMutex, StressFourtyEightThreadsCombineTryLockLockAndTimed) {
594   combineWithLockTryAndTimedNThreads(
595       48, std::chrono::seconds{kStressTestSeconds});
596 }
TEST(DistributedMutex,StressSixtyFourThreadsCombineTryLockLockAndTimed)597 TEST(DistributedMutex, StressSixtyFourThreadsCombineTryLockLockAndTimed) {
598   combineWithLockTryAndTimedNThreads(
599       64, std::chrono::seconds{kStressTestSeconds});
600 }
TEST(DistributedMutex,StressHwConcurrencyThreadsCombineTryLockLockAndTimed)601 TEST(DistributedMutex, StressHwConcurrencyThreadsCombineTryLockLockAndTimed) {
602   combineWithLockTryAndTimedNThreads(
603       std::thread::hardware_concurrency(),
604       std::chrono::seconds{kStressTestSeconds});
605 }
606 
TEST(DistributedMutex,StressTryLock)607 TEST(DistributedMutex, StressTryLock) {
608   auto&& mutex = folly::DistributedMutex{};
609 
610   for (auto i = 0; i < kStressFactor; ++i) {
611     while (true) {
612       auto state = mutex.try_lock();
613       if (state) {
614         mutex.unlock(std::move(state));
615         break;
616       }
617     }
618   }
619 }
620 
TEST(DistributedMutex,TimedLockTimeout)621 TEST(DistributedMutex, TimedLockTimeout) {
622   auto&& mutex = folly::DistributedMutex{};
623   auto&& start = folly::Baton<>{};
624   auto&& done = folly::Baton<>{};
625 
626   auto thread = std::thread{[&]() {
627     auto state = mutex.lock();
628     start.post();
629     done.wait();
630     mutex.unlock(std::move(state));
631   }};
632 
633   start.wait();
634   auto result = mutex.try_lock_for(std::chrono::milliseconds{10});
635   EXPECT_FALSE(result);
636   done.post();
637   thread.join();
638 }
639 
TEST(DistributedMutex,TimedLockAcquireAfterUnlock)640 TEST(DistributedMutex, TimedLockAcquireAfterUnlock) {
641   auto&& mutex = folly::DistributedMutex{};
642   auto&& start = folly::Baton<>{};
643 
644   auto thread = std::thread{[&]() {
645     auto state = mutex.lock();
646     start.post();
647     /* sleep override */
648     std::this_thread::sleep_for(std::chrono::milliseconds{10});
649     mutex.unlock(std::move(state));
650   }};
651 
652   start.wait();
653   auto result = mutex.try_lock_for(kForever);
654   EXPECT_TRUE(result);
655   thread.join();
656 }
657 
658 namespace {
659 template <template <typename> class Atom = std::atomic>
stressTryLockWithConcurrentLocks(int numThreads,int iterations=kStressFactor)660 void stressTryLockWithConcurrentLocks(
661     int numThreads,
662     int iterations = kStressFactor) {
663   auto&& threads = std::vector<std::thread>{};
664   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
665   auto&& atomic = std::atomic<std::uint64_t>{0};
666 
667   for (auto i = 0; i < numThreads; ++i) {
668     threads.push_back(std::thread([&] {
669       for (auto j = 0; j < iterations; ++j) {
670         auto state = mutex.lock();
671         EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
672         EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
673         mutex.unlock(std::move(state));
674       }
675     }));
676   }
677 
678   for (auto i = 0; i < iterations; ++i) {
679     if (auto state = mutex.try_lock()) {
680       EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
681       EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
682       mutex.unlock(std::move(state));
683     }
684   }
685 
686   for (auto& thread : threads) {
687     thread.join();
688   }
689 }
690 } // namespace
691 
TEST(DistributedMutex,StressTryLockWithConcurrentLocksTwoThreads)692 TEST(DistributedMutex, StressTryLockWithConcurrentLocksTwoThreads) {
693   stressTryLockWithConcurrentLocks(2);
694 }
TEST(DistributedMutex,StressTryLockWithConcurrentLocksFourThreads)695 TEST(DistributedMutex, StressTryLockWithConcurrentLocksFourThreads) {
696   stressTryLockWithConcurrentLocks(4);
697 }
TEST(DistributedMutex,StressTryLockWithConcurrentLocksEightThreads)698 TEST(DistributedMutex, StressTryLockWithConcurrentLocksEightThreads) {
699   stressTryLockWithConcurrentLocks(8);
700 }
TEST(DistributedMutex,StressTryLockWithConcurrentLocksSixteenThreads)701 TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixteenThreads) {
702   stressTryLockWithConcurrentLocks(16);
703 }
TEST(DistributedMutex,StressTryLockWithConcurrentLocksThirtyTwoThreads)704 TEST(DistributedMutex, StressTryLockWithConcurrentLocksThirtyTwoThreads) {
705   stressTryLockWithConcurrentLocks(32);
706 }
TEST(DistributedMutex,StressTryLockWithConcurrentLocksSixtyFourThreads)707 TEST(DistributedMutex, StressTryLockWithConcurrentLocksSixtyFourThreads) {
708   stressTryLockWithConcurrentLocks(64);
709 }
710 
711 namespace {
712 template <template <typename> class Atom = std::atomic>
concurrentTryLocks(int numThreads,int iterations=kStressFactor)713 void concurrentTryLocks(int numThreads, int iterations = kStressFactor) {
714   auto&& threads = std::vector<std::thread>{};
715   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
716   auto&& atomic = std::atomic<std::uint64_t>{0};
717 
718   for (auto i = 0; i < numThreads; ++i) {
719     threads.push_back(std::thread([&] {
720       for (auto j = 0; j < iterations; ++j) {
721         if (auto state = mutex.try_lock()) {
722           EXPECT_EQ(atomic.fetch_add(1, std::memory_order_relaxed), 0);
723           EXPECT_EQ(atomic.fetch_sub(1, std::memory_order_relaxed), 1);
724           mutex.unlock(std::move(state));
725         }
726       }
727     }));
728   }
729 
730   for (auto& thread : threads) {
731     thread.join();
732   }
733 }
734 } // namespace
735 
TEST(DistributedMutex,StressTryLockWithTwoThreads)736 TEST(DistributedMutex, StressTryLockWithTwoThreads) {
737   concurrentTryLocks(2);
738 }
TEST(DistributedMutex,StressTryLockFourThreads)739 TEST(DistributedMutex, StressTryLockFourThreads) {
740   concurrentTryLocks(4);
741 }
TEST(DistributedMutex,StressTryLockEightThreads)742 TEST(DistributedMutex, StressTryLockEightThreads) {
743   concurrentTryLocks(8);
744 }
TEST(DistributedMutex,StressTryLockSixteenThreads)745 TEST(DistributedMutex, StressTryLockSixteenThreads) {
746   concurrentTryLocks(16);
747 }
TEST(DistributedMutex,StressTryLockThirtyTwoThreads)748 TEST(DistributedMutex, StressTryLockThirtyTwoThreads) {
749   concurrentTryLocks(32);
750 }
TEST(DistributedMutex,StressTryLockSixtyFourThreads)751 TEST(DistributedMutex, StressTryLockSixtyFourThreads) {
752   concurrentTryLocks(64);
753 }
754 
755 namespace {
756 class TestConstruction {
757  public:
758   TestConstruction() = delete;
TestConstruction(int)759   explicit TestConstruction(int) {
760     defaultConstructs().fetch_add(1, std::memory_order_relaxed);
761   }
TestConstruction(TestConstruction &&)762   TestConstruction(TestConstruction&&) noexcept {
763     moveConstructs().fetch_add(1, std::memory_order_relaxed);
764   }
TestConstruction(const TestConstruction &)765   TestConstruction(const TestConstruction&) {
766     copyConstructs().fetch_add(1, std::memory_order_relaxed);
767   }
operator =(const TestConstruction &)768   TestConstruction& operator=(const TestConstruction&) {
769     copyAssigns().fetch_add(1, std::memory_order_relaxed);
770     return *this;
771   }
operator =(TestConstruction &&)772   TestConstruction& operator=(TestConstruction&&) {
773     moveAssigns().fetch_add(1, std::memory_order_relaxed);
774     return *this;
775   }
~TestConstruction()776   ~TestConstruction() {
777     destructs().fetch_add(1, std::memory_order_relaxed);
778   }
779 
defaultConstructs()780   static std::atomic<std::uint64_t>& defaultConstructs() {
781     static auto&& atomic = std::atomic<std::uint64_t>{0};
782     return atomic;
783   }
moveConstructs()784   static std::atomic<std::uint64_t>& moveConstructs() {
785     static auto&& atomic = std::atomic<std::uint64_t>{0};
786     return atomic;
787   }
copyConstructs()788   static std::atomic<std::uint64_t>& copyConstructs() {
789     static auto&& atomic = std::atomic<std::uint64_t>{0};
790     return atomic;
791   }
moveAssigns()792   static std::atomic<std::uint64_t>& moveAssigns() {
793     static auto&& atomic = std::atomic<std::uint64_t>{0};
794     return atomic;
795   }
copyAssigns()796   static std::atomic<std::uint64_t>& copyAssigns() {
797     static auto&& atomic = std::atomic<std::uint64_t>{0};
798     return atomic;
799   }
destructs()800   static std::atomic<std::uint64_t>& destructs() {
801     static auto&& atomic = std::atomic<std::uint64_t>{0};
802     return atomic;
803   }
804 
reset()805   static void reset() {
806     defaultConstructs().store(0);
807     moveConstructs().store(0);
808     copyConstructs().store(0);
809     copyAssigns().store(0);
810     destructs().store(0);
811   }
812 };
813 } // namespace
814 
TEST(DistributedMutex,TestAppropriateDestructionAndConstructionWithCombine)815 TEST(DistributedMutex, TestAppropriateDestructionAndConstructionWithCombine) {
816   auto&& mutex = folly::DistributedMutex{};
817   auto&& stop = std::atomic<bool>{false};
818 
819   // test the simple return path to make sure that in the absence of
820   // contention, we get the right number of constructs and destructs
821   mutex.lock_combine([]() { return TestConstruction{1}; });
822   auto moves = TestConstruction::moveConstructs().load();
823   auto defaults = TestConstruction::defaultConstructs().load();
824   EXPECT_EQ(TestConstruction::defaultConstructs().load(), 1);
825   EXPECT_TRUE(moves == 0 || moves == 1);
826   EXPECT_EQ(TestConstruction::destructs().load(), moves + defaults);
827 
828   // loop and make sure we were able to test the path where the critical
829   // section of the thread gets combined, and assert that we see the expected
830   // number of constructions and destructions
831   //
832   // this implements a timed backoff to test the combined path, so we use the
833   // smallest possible delay in tests
834   auto thread = std::thread{[&]() {
835     auto&& duration = std::chrono::milliseconds{10};
836     while (!stop.load()) {
837       TestConstruction::reset();
838       auto&& ready = folly::Baton<>{};
839       auto&& release = folly::Baton<>{};
840 
841       // make one thread start it's critical section, signal and wait for
842       // another thread to enqueue, to test the
843       auto innerThread = std::thread{[&]() {
844         mutex.lock_combine([&]() {
845           ready.post();
846           release.wait();
847           /* sleep override */
848           std::this_thread::sleep_for(duration);
849         });
850       }};
851 
852       // wait for the thread to get in its critical section, then tell it to go
853       ready.wait();
854       release.post();
855       mutex.lock_combine([&]() { return TestConstruction{1}; });
856 
857       innerThread.join();
858 
859       // at this point we should have only one default construct, either 3
860       // or 4 move constructs the same number of destructions as
861       // constructions
862       auto innerDefaults = TestConstruction::defaultConstructs().load();
863       auto innerMoves = TestConstruction::moveConstructs().load();
864       auto destructs = TestConstruction::destructs().load();
865       EXPECT_EQ(innerDefaults, 1);
866       EXPECT_TRUE(innerMoves == 3 || innerMoves == 4 || innerMoves == 1);
867       EXPECT_EQ(destructs, innerMoves + innerDefaults);
868       EXPECT_EQ(TestConstruction::moveAssigns().load(), 0);
869       EXPECT_EQ(TestConstruction::copyAssigns().load(), 0);
870 
871       // increase duration by 100ms each iteration
872       duration = duration + std::chrono::milliseconds{100};
873     }
874   }};
875 
876   /* sleep override */
877   std::this_thread::sleep_for(std::chrono::seconds{kStressTestSeconds});
878   stop.store(true);
879   thread.join();
880 }
881 
882 namespace {
883 template <template <typename> class Atom = std::atomic>
concurrentLocksManyMutexes(int numThreads,std::chrono::seconds duration)884 void concurrentLocksManyMutexes(int numThreads, std::chrono::seconds duration) {
885   using DMutex = folly::detail::distributed_mutex::DistributedMutex<Atom>;
886   const auto&& kNumMutexes = 10;
887   auto&& threads = std::vector<std::thread>{};
888   auto&& mutexes = std::vector<DMutex>(kNumMutexes);
889   auto&& barriers = std::vector<std::atomic<std::uint64_t>>(kNumMutexes);
890   auto&& stop = std::atomic<bool>{false};
891 
892   for (auto i = 0; i < numThreads; ++i) {
893     threads.push_back(std::thread([&] {
894       auto&& total = std::atomic<std::uint64_t>{0};
895       auto&& expected = std::uint64_t{0};
896 
897       for (auto j = 0; !stop.load(std::memory_order_relaxed); ++j) {
898         auto& mutex = mutexes[j % kNumMutexes];
899         auto& barrier = barriers[j % kNumMutexes];
900 
901         ++expected;
902         auto result = mutex.lock_combine([&]() {
903           EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
904           EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
905           std::this_thread::yield();
906           SCOPE_EXIT {
907             EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
908           };
909           EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
910           return total.fetch_add(1, std::memory_order_relaxed);
911         });
912         EXPECT_EQ(result, expected - 1);
913       }
914 
915       EXPECT_EQ(total.load(), expected);
916     }));
917   }
918 
919   /* sleep override */
920   std::this_thread::sleep_for(duration);
921   stop.store(true);
922   for (auto& thread : threads) {
923     thread.join();
924   }
925 }
926 } // namespace
927 
TEST(DistributedMutex,StressWithManyMutexesAlternatingTwoThreads)928 TEST(DistributedMutex, StressWithManyMutexesAlternatingTwoThreads) {
929   concurrentLocksManyMutexes(2, std::chrono::seconds{kStressTestSeconds});
930 }
TEST(DistributedMutex,StressWithManyMutexesAlternatingFourThreads)931 TEST(DistributedMutex, StressWithManyMutexesAlternatingFourThreads) {
932   concurrentLocksManyMutexes(4, std::chrono::seconds{kStressTestSeconds});
933 }
TEST(DistributedMutex,StressWithManyMutexesAlternatingEightThreads)934 TEST(DistributedMutex, StressWithManyMutexesAlternatingEightThreads) {
935   concurrentLocksManyMutexes(8, std::chrono::seconds{kStressTestSeconds});
936 }
TEST(DistributedMutex,StressWithManyMutexesAlternatingSixteenThreads)937 TEST(DistributedMutex, StressWithManyMutexesAlternatingSixteenThreads) {
938   concurrentLocksManyMutexes(16, std::chrono::seconds{kStressTestSeconds});
939 }
TEST(DistributedMutex,StressWithManyMutexesAlternatingThirtyTwoThreads)940 TEST(DistributedMutex, StressWithManyMutexesAlternatingThirtyTwoThreads) {
941   concurrentLocksManyMutexes(32, std::chrono::seconds{kStressTestSeconds});
942 }
TEST(DistributedMutex,StressWithManyMutexesAlternatingSixtyFourThreads)943 TEST(DistributedMutex, StressWithManyMutexesAlternatingSixtyFourThreads) {
944   concurrentLocksManyMutexes(64, std::chrono::seconds{kStressTestSeconds});
945 }
946 
947 namespace {
948 class ExceptionWithConstructionTrack : public std::exception {
949  public:
ExceptionWithConstructionTrack(int id)950   explicit ExceptionWithConstructionTrack(int id)
951       : id_{std::to_string(id)}, constructionTrack_{id} {}
952 
what() const953   const char* what() const noexcept override {
954     return id_.c_str();
955   }
956 
957  private:
958   std::string id_;
959   TestConstruction constructionTrack_;
960 };
961 } // namespace
962 
TEST(DistributedMutex,TestExceptionPropagationUncontended)963 TEST(DistributedMutex, TestExceptionPropagationUncontended) {
964   TestConstruction::reset();
965   auto&& mutex = folly::DistributedMutex{};
966   auto&& thread = std::thread{[&]() {
967     try {
968       mutex.lock_combine([&]() { throw ExceptionWithConstructionTrack{46}; });
969     } catch (std::exception& exc) {
970       auto integer = std::stoi(exc.what());
971       EXPECT_EQ(integer, 46);
972       EXPECT_GT(TestConstruction::defaultConstructs(), 0);
973     }
974     EXPECT_EQ(
975         TestConstruction::defaultConstructs(), TestConstruction::destructs());
976   }};
977   thread.join();
978 }
979 
980 namespace {
981 template <template <typename> class Atom = std::atomic>
concurrentExceptionPropagationStress(int numThreads,std::chrono::milliseconds t)982 void concurrentExceptionPropagationStress(
983     int numThreads,
984     std::chrono::milliseconds t) {
985   // this test fails under with a false negative under older versions of TSAN
986   // for some reason so disable it when TSAN is enabled
987   if (folly::kIsSanitizeThread) {
988     return;
989   }
990 
991   TestConstruction::reset();
992   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
993   auto&& threads = std::vector<std::thread>{};
994   auto&& stop = std::atomic<bool>{false};
995   auto&& barrier = std::atomic<std::uint64_t>{0};
996 
997   for (auto i = 0; i < numThreads; ++i) {
998     threads.push_back(std::thread([&]() {
999       for (auto j = 0; !stop.load(); ++j) {
1000         auto value = int{0};
1001         try {
1002           value = mutex.lock_combine([&]() {
1003             EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
1004             EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
1005             std::this_thread::yield();
1006             SCOPE_EXIT {
1007               EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
1008             };
1009             EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
1010 
1011             // we only throw an exception once every 3 times
1012             if (!(j % 3)) {
1013               throw ExceptionWithConstructionTrack{j};
1014             }
1015 
1016             return j;
1017           });
1018         } catch (std::exception& exc) {
1019           value = std::stoi(exc.what());
1020         }
1021 
1022         EXPECT_EQ(value, j);
1023       }
1024     }));
1025   }
1026 
1027   /* sleep override */
1028   std::this_thread::sleep_for(t);
1029   stop.store(true);
1030   for (auto& thread : threads) {
1031     thread.join();
1032   }
1033 }
1034 } // namespace
1035 
TEST(DistributedMutex,TestExceptionPropagationStressTwoThreads)1036 TEST(DistributedMutex, TestExceptionPropagationStressTwoThreads) {
1037   concurrentExceptionPropagationStress(
1038       2, std::chrono::seconds{kStressTestSeconds});
1039 }
TEST(DistributedMutex,TestExceptionPropagationStressFourThreads)1040 TEST(DistributedMutex, TestExceptionPropagationStressFourThreads) {
1041   concurrentExceptionPropagationStress(
1042       4, std::chrono::seconds{kStressTestSeconds});
1043 }
TEST(DistributedMutex,TestExceptionPropagationStressEightThreads)1044 TEST(DistributedMutex, TestExceptionPropagationStressEightThreads) {
1045   concurrentExceptionPropagationStress(
1046       8, std::chrono::seconds{kStressTestSeconds});
1047 }
TEST(DistributedMutex,TestExceptionPropagationStressSixteenThreads)1048 TEST(DistributedMutex, TestExceptionPropagationStressSixteenThreads) {
1049   concurrentExceptionPropagationStress(
1050       16, std::chrono::seconds{kStressTestSeconds});
1051 }
TEST(DistributedMutex,TestExceptionPropagationStressThirtyTwoThreads)1052 TEST(DistributedMutex, TestExceptionPropagationStressThirtyTwoThreads) {
1053   concurrentExceptionPropagationStress(
1054       32, std::chrono::seconds{kStressTestSeconds});
1055 }
TEST(DistributedMutex,TestExceptionPropagationStressSixtyFourThreads)1056 TEST(DistributedMutex, TestExceptionPropagationStressSixtyFourThreads) {
1057   concurrentExceptionPropagationStress(
1058       64, std::chrono::seconds{kStressTestSeconds});
1059 }
1060 
1061 namespace {
makeMonotonicArray(int start)1062 std::array<std::uint64_t, 8> makeMonotonicArray(int start) {
1063   auto array = std::array<std::uint64_t, 8>{};
1064   for (auto& element : array) { element = start++; }
1065   return array;
1066 }
1067 
1068 template <template <typename> class Atom = std::atomic>
concurrentBigValueReturnStress(int numThreads,std::chrono::milliseconds t)1069 void concurrentBigValueReturnStress(
1070     int numThreads,
1071     std::chrono::milliseconds t) {
1072   auto&& mutex = folly::detail::distributed_mutex::DistributedMutex<Atom>{};
1073   auto&& threads = std::vector<std::thread>{};
1074   auto&& stop = std::atomic<bool>{false};
1075   auto&& barrier = std::atomic<std::uint64_t>{0};
1076 
1077   for (auto i = 0; i < numThreads; ++i) {
1078     threads.push_back(std::thread([&]() {
1079       auto&& value = std::atomic<std::uint64_t>{0};
1080 
1081       for (auto j = 0; !stop.load(); ++j) {
1082         auto returned = mutex.lock_combine([&]() {
1083           EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 0);
1084           EXPECT_EQ(barrier.fetch_add(1, std::memory_order_relaxed), 1);
1085           std::this_thread::yield();
1086           // return an entire cacheline worth of data
1087           auto current = value.fetch_add(1, std::memory_order_relaxed);
1088           SCOPE_EXIT {
1089             EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 1);
1090           };
1091           EXPECT_EQ(barrier.fetch_sub(1, std::memory_order_relaxed), 2);
1092           return makeMonotonicArray(static_cast<int>(current));
1093         });
1094 
1095         auto expected = value.load() - 1;
1096         for (auto& element : returned) {
1097           EXPECT_EQ(element, expected++);
1098         }
1099       }
1100     }));
1101   }
1102 
1103   /* sleep override */
1104   std::this_thread::sleep_for(t);
1105   stop.store(true);
1106   for (auto& thread : threads) {
1107     thread.join();
1108   }
1109 }
1110 } // namespace
1111 
TEST(DistributedMutex,StressBigValueReturnTwoThreads)1112 TEST(DistributedMutex, StressBigValueReturnTwoThreads) {
1113   concurrentBigValueReturnStress(2, std::chrono::seconds{kStressTestSeconds});
1114 }
TEST(DistributedMutex,StressBigValueReturnFourThreads)1115 TEST(DistributedMutex, StressBigValueReturnFourThreads) {
1116   concurrentBigValueReturnStress(4, std::chrono::seconds{kStressTestSeconds});
1117 }
TEST(DistributedMutex,StressBigValueReturnEightThreads)1118 TEST(DistributedMutex, StressBigValueReturnEightThreads) {
1119   concurrentBigValueReturnStress(8, std::chrono::seconds{kStressTestSeconds});
1120 }
TEST(DistributedMutex,StressBigValueReturnSixteenThreads)1121 TEST(DistributedMutex, StressBigValueReturnSixteenThreads) {
1122   concurrentBigValueReturnStress(16, std::chrono::seconds{kStressTestSeconds});
1123 }
TEST(DistributedMutex,StressBigValueReturnThirtyTwoThreads)1124 TEST(DistributedMutex, StressBigValueReturnThirtyTwoThreads) {
1125   concurrentBigValueReturnStress(32, std::chrono::seconds{kStressTestSeconds});
1126 }
TEST(DistributedMutex,StressBigValueReturnSixtyFourThreads)1127 TEST(DistributedMutex, StressBigValueReturnSixtyFourThreads) {
1128   concurrentBigValueReturnStress(64, std::chrono::seconds{kStressTestSeconds});
1129 }
1130 
1131 }  // namespace folly
1132 
main(int argc,char ** argv)1133 int main(int argc, char** argv) {
1134   ::testing::InitGoogleTest(&argc, argv);
1135   return RUN_ALL_TESTS();
1136 }
1137 
1138 #else
main(int,char **)1139 int main(int /*argc*/, char** /*argv*/) {
1140   printf(
1141       "DistributedMutex is not supported in ROCKSDB_LITE, on ARM, or in "
1142       "valgrind_test runs\n");
1143   return 0;
1144 }
1145 #endif  // !ROCKSDB_LITE && !__ARM_ARCH && !ROCKSDB_VALGRIND_RUN
1146