1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "util/threadpool_imp.h"
11 
12 #ifndef OS_WIN
13 #  include <unistd.h>
14 #endif
15 
16 #ifdef OS_LINUX
17 #  include <sys/syscall.h>
18 #  include <sys/resource.h>
19 #endif
20 
21 #include <stdlib.h>
22 #include <algorithm>
23 #include <atomic>
24 #include <condition_variable>
25 #include <deque>
26 #include <mutex>
27 #include <sstream>
28 #include <thread>
29 #include <vector>
30 
31 #include "monitoring/thread_status_util.h"
32 #include "port/port.h"
33 #include "test_util/sync_point.h"
34 
35 namespace ROCKSDB_NAMESPACE {
36 
PthreadCall(const char * label,int result)37 void ThreadPoolImpl::PthreadCall(const char* label, int result) {
38   if (result != 0) {
39     fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
40     abort();
41   }
42 }
43 
44 struct ThreadPoolImpl::Impl {
45 
46   Impl();
47   ~Impl();
48 
49   void JoinThreads(bool wait_for_jobs_to_complete);
50 
51   void SetBackgroundThreadsInternal(int num, bool allow_reduce);
52   int GetBackgroundThreads();
53 
GetQueueLenROCKSDB_NAMESPACE::ThreadPoolImpl::Impl54   unsigned int GetQueueLen() const {
55     return queue_len_.load(std::memory_order_relaxed);
56   }
57 
58   void LowerIOPriority();
59 
60   void LowerCPUPriority();
61 
WakeUpAllThreadsROCKSDB_NAMESPACE::ThreadPoolImpl::Impl62   void WakeUpAllThreads() {
63     bgsignal_.notify_all();
64   }
65 
66   void BGThread(size_t thread_id);
67 
68   void StartBGThreads();
69 
70   void Submit(std::function<void()>&& schedule,
71     std::function<void()>&& unschedule, void* tag);
72 
73   int UnSchedule(void* arg);
74 
SetHostEnvROCKSDB_NAMESPACE::ThreadPoolImpl::Impl75   void SetHostEnv(Env* env) { env_ = env; }
76 
GetHostEnvROCKSDB_NAMESPACE::ThreadPoolImpl::Impl77   Env* GetHostEnv() const { return env_; }
78 
HasExcessiveThreadROCKSDB_NAMESPACE::ThreadPoolImpl::Impl79   bool HasExcessiveThread() const {
80     return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
81   }
82 
83   // Return true iff the current thread is the excessive thread to terminate.
84   // Always terminate the running thread that is added last, even if there are
85   // more than one thread to terminate.
IsLastExcessiveThreadROCKSDB_NAMESPACE::ThreadPoolImpl::Impl86   bool IsLastExcessiveThread(size_t thread_id) const {
87     return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
88   }
89 
IsExcessiveThreadROCKSDB_NAMESPACE::ThreadPoolImpl::Impl90   bool IsExcessiveThread(size_t thread_id) const {
91     return static_cast<int>(thread_id) >= total_threads_limit_;
92   }
93 
94   // Return the thread priority.
95   // This would allow its member-thread to know its priority.
GetThreadPriorityROCKSDB_NAMESPACE::ThreadPoolImpl::Impl96   Env::Priority GetThreadPriority() const { return priority_; }
97 
98   // Set the thread priority.
SetThreadPriorityROCKSDB_NAMESPACE::ThreadPoolImpl::Impl99   void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
100 
101 private:
102  static void BGThreadWrapper(void* arg);
103 
104  bool low_io_priority_;
105  bool low_cpu_priority_;
106  Env::Priority priority_;
107  Env* env_;
108 
109  int total_threads_limit_;
110  std::atomic_uint queue_len_;  // Queue length. Used for stats reporting
111  bool exit_all_threads_;
112  bool wait_for_jobs_to_complete_;
113 
114  // Entry per Schedule()/Submit() call
115  struct BGItem {
116    void* tag = nullptr;
117    std::function<void()> function;
118    std::function<void()> unschedFunction;
119   };
120 
121   using BGQueue = std::deque<BGItem>;
122   BGQueue       queue_;
123 
124   std::mutex               mu_;
125   std::condition_variable  bgsignal_;
126   std::vector<port::Thread> bgthreads_;
127 };
128 
129 
130 inline
Impl()131 ThreadPoolImpl::Impl::Impl()
132     :
133       low_io_priority_(false),
134       low_cpu_priority_(false),
135       priority_(Env::LOW),
136       env_(nullptr),
137       total_threads_limit_(0),
138       queue_len_(),
139       exit_all_threads_(false),
140       wait_for_jobs_to_complete_(false),
141       queue_(),
142       mu_(),
143       bgsignal_(),
144       bgthreads_() {
145 }
146 
147 inline
~Impl()148 ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
149 
JoinThreads(bool wait_for_jobs_to_complete)150 void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
151 
152   std::unique_lock<std::mutex> lock(mu_);
153   assert(!exit_all_threads_);
154 
155   wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
156   exit_all_threads_ = true;
157   // prevent threads from being recreated right after they're joined, in case
158   // the user is concurrently submitting jobs.
159   total_threads_limit_ = 0;
160 
161   lock.unlock();
162 
163   bgsignal_.notify_all();
164 
165   for (auto& th : bgthreads_) {
166     th.join();
167   }
168 
169   bgthreads_.clear();
170 
171   exit_all_threads_ = false;
172   wait_for_jobs_to_complete_ = false;
173 }
174 
175 inline
LowerIOPriority()176 void ThreadPoolImpl::Impl::LowerIOPriority() {
177   std::lock_guard<std::mutex> lock(mu_);
178   low_io_priority_ = true;
179 }
180 
181 inline
LowerCPUPriority()182 void ThreadPoolImpl::Impl::LowerCPUPriority() {
183   std::lock_guard<std::mutex> lock(mu_);
184   low_cpu_priority_ = true;
185 }
186 
BGThread(size_t thread_id)187 void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
188   bool low_io_priority = false;
189   bool low_cpu_priority = false;
190 
191   while (true) {
192     // Wait until there is an item that is ready to run
193     std::unique_lock<std::mutex> lock(mu_);
194     // Stop waiting if the thread needs to do work or needs to terminate.
195     while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
196            (queue_.empty() || IsExcessiveThread(thread_id))) {
197       bgsignal_.wait(lock);
198     }
199 
200     if (exit_all_threads_) {  // mechanism to let BG threads exit safely
201 
202       if (!wait_for_jobs_to_complete_ ||
203           queue_.empty()) {
204         break;
205        }
206     }
207 
208     if (IsLastExcessiveThread(thread_id)) {
209       // Current thread is the last generated one and is excessive.
210       // We always terminate excessive thread in the reverse order of
211       // generation time.
212       auto& terminating_thread = bgthreads_.back();
213       terminating_thread.detach();
214       bgthreads_.pop_back();
215 
216       if (HasExcessiveThread()) {
217         // There is still at least more excessive thread to terminate.
218         WakeUpAllThreads();
219       }
220       break;
221     }
222 
223     auto func = std::move(queue_.front().function);
224     queue_.pop_front();
225 
226     queue_len_.store(static_cast<unsigned int>(queue_.size()),
227                      std::memory_order_relaxed);
228 
229     bool decrease_io_priority = (low_io_priority != low_io_priority_);
230     bool decrease_cpu_priority = (low_cpu_priority != low_cpu_priority_);
231     lock.unlock();
232 
233 #ifdef OS_LINUX
234     if (decrease_cpu_priority) {
235       // 0 means current thread.
236       port::SetCpuPriority(0, CpuPriority::kLow);
237       low_cpu_priority = true;
238     }
239 
240     if (decrease_io_priority) {
241 #define IOPRIO_CLASS_SHIFT (13)
242 #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
243       // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
244       // These system calls only have an effect when used in conjunction
245       // with an I/O scheduler that supports I/O priorities. As at
246       // kernel 2.6.17 the only such scheduler is the Completely
247       // Fair Queuing (CFQ) I/O scheduler.
248       // To change scheduler:
249       //  echo cfq > /sys/block/<device_name>/queue/schedule
250       // Tunables to consider:
251       //  /sys/block/<device_name>/queue/slice_idle
252       //  /sys/block/<device_name>/queue/slice_sync
253       syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
254               0,                  // current thread
255               IOPRIO_PRIO_VALUE(3, 0));
256       low_io_priority = true;
257     }
258 #else
259     (void)decrease_io_priority;  // avoid 'unused variable' error
260     (void)decrease_cpu_priority;
261 #endif
262 
263     TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
264                              &priority_);
265 
266     func();
267   }
268 }
269 
270 // Helper struct for passing arguments when creating threads.
271 struct BGThreadMetadata {
272   ThreadPoolImpl::Impl* thread_pool_;
273   size_t thread_id_;  // Thread count in the thread.
BGThreadMetadataROCKSDB_NAMESPACE::BGThreadMetadata274   BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
275       : thread_pool_(thread_pool), thread_id_(thread_id) {}
276 };
277 
BGThreadWrapper(void * arg)278 void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
279   BGThreadMetadata* meta = reinterpret_cast<BGThreadMetadata*>(arg);
280   size_t thread_id = meta->thread_id_;
281   ThreadPoolImpl::Impl* tp = meta->thread_pool_;
282 #ifdef ROCKSDB_USING_THREAD_STATUS
283   // initialize it because compiler isn't good enough to see we don't use it
284   // uninitialized
285   ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
286   switch (tp->GetThreadPriority()) {
287     case Env::Priority::HIGH:
288       thread_type = ThreadStatus::HIGH_PRIORITY;
289       break;
290     case Env::Priority::LOW:
291       thread_type = ThreadStatus::LOW_PRIORITY;
292       break;
293     case Env::Priority::BOTTOM:
294       thread_type = ThreadStatus::BOTTOM_PRIORITY;
295       break;
296     case Env::Priority::USER:
297       thread_type = ThreadStatus::USER;
298       break;
299     case Env::Priority::TOTAL:
300       assert(false);
301       return;
302   }
303   assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
304   ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
305 #endif
306   delete meta;
307   tp->BGThread(thread_id);
308 #ifdef ROCKSDB_USING_THREAD_STATUS
309   ThreadStatusUtil::UnregisterThread();
310 #endif
311   return;
312 }
313 
SetBackgroundThreadsInternal(int num,bool allow_reduce)314 void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
315   bool allow_reduce) {
316   std::lock_guard<std::mutex> lock(mu_);
317   if (exit_all_threads_) {
318     return;
319   }
320   if (num > total_threads_limit_ ||
321       (num < total_threads_limit_ && allow_reduce)) {
322     total_threads_limit_ = std::max(0, num);
323     WakeUpAllThreads();
324     StartBGThreads();
325   }
326 }
327 
GetBackgroundThreads()328 int ThreadPoolImpl::Impl::GetBackgroundThreads() {
329   std::unique_lock<std::mutex> lock(mu_);
330   return total_threads_limit_;
331 }
332 
StartBGThreads()333 void ThreadPoolImpl::Impl::StartBGThreads() {
334   // Start background thread if necessary
335   while ((int)bgthreads_.size() < total_threads_limit_) {
336 
337     port::Thread p_t(&BGThreadWrapper,
338       new BGThreadMetadata(this, bgthreads_.size()));
339 
340 // Set the thread name to aid debugging
341 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
342 #if __GLIBC_PREREQ(2, 12)
343     auto th_handle = p_t.native_handle();
344     std::string thread_priority = Env::PriorityToString(GetThreadPriority());
345     std::ostringstream thread_name_stream;
346     thread_name_stream << "rocksdb:";
347     for (char c : thread_priority) {
348       thread_name_stream << static_cast<char>(tolower(c));
349     }
350     thread_name_stream << bgthreads_.size();
351     pthread_setname_np(th_handle, thread_name_stream.str().c_str());
352 #endif
353 #endif
354     bgthreads_.push_back(std::move(p_t));
355   }
356 }
357 
Submit(std::function<void ()> && schedule,std::function<void ()> && unschedule,void * tag)358 void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
359   std::function<void()>&& unschedule, void* tag) {
360 
361   std::lock_guard<std::mutex> lock(mu_);
362 
363   if (exit_all_threads_) {
364     return;
365   }
366 
367   StartBGThreads();
368 
369   // Add to priority queue
370   queue_.push_back(BGItem());
371 
372   auto& item = queue_.back();
373   item.tag = tag;
374   item.function = std::move(schedule);
375   item.unschedFunction = std::move(unschedule);
376 
377   queue_len_.store(static_cast<unsigned int>(queue_.size()),
378     std::memory_order_relaxed);
379 
380   if (!HasExcessiveThread()) {
381     // Wake up at least one waiting thread.
382     bgsignal_.notify_one();
383   } else {
384     // Need to wake up all threads to make sure the one woken
385     // up is not the one to terminate.
386     WakeUpAllThreads();
387   }
388 }
389 
UnSchedule(void * arg)390 int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
391   int count = 0;
392 
393   std::vector<std::function<void()>> candidates;
394   {
395     std::lock_guard<std::mutex> lock(mu_);
396 
397     // Remove from priority queue
398     BGQueue::iterator it = queue_.begin();
399     while (it != queue_.end()) {
400       if (arg == (*it).tag) {
401         if (it->unschedFunction) {
402           candidates.push_back(std::move(it->unschedFunction));
403         }
404         it = queue_.erase(it);
405         count++;
406       } else {
407         ++it;
408       }
409     }
410     queue_len_.store(static_cast<unsigned int>(queue_.size()),
411       std::memory_order_relaxed);
412   }
413 
414 
415  // Run unschedule functions outside the mutex
416   for (auto& f : candidates) {
417     f();
418   }
419 
420   return count;
421 }
422 
ThreadPoolImpl()423 ThreadPoolImpl::ThreadPoolImpl() :
424   impl_(new Impl()) {
425 }
426 
427 
~ThreadPoolImpl()428 ThreadPoolImpl::~ThreadPoolImpl() {
429 }
430 
JoinAllThreads()431 void ThreadPoolImpl::JoinAllThreads() {
432   impl_->JoinThreads(false);
433 }
434 
SetBackgroundThreads(int num)435 void ThreadPoolImpl::SetBackgroundThreads(int num) {
436   impl_->SetBackgroundThreadsInternal(num, true);
437 }
438 
GetBackgroundThreads()439 int ThreadPoolImpl::GetBackgroundThreads() {
440   return impl_->GetBackgroundThreads();
441 }
442 
GetQueueLen() const443 unsigned int ThreadPoolImpl::GetQueueLen() const {
444   return impl_->GetQueueLen();
445 }
446 
WaitForJobsAndJoinAllThreads()447 void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
448   impl_->JoinThreads(true);
449 }
450 
LowerIOPriority()451 void ThreadPoolImpl::LowerIOPriority() {
452   impl_->LowerIOPriority();
453 }
454 
LowerCPUPriority()455 void ThreadPoolImpl::LowerCPUPriority() {
456   impl_->LowerCPUPriority();
457 }
458 
IncBackgroundThreadsIfNeeded(int num)459 void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
460   impl_->SetBackgroundThreadsInternal(num, false);
461 }
462 
SubmitJob(const std::function<void ()> & job)463 void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
464   auto copy(job);
465   impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
466 }
467 
468 
SubmitJob(std::function<void ()> && job)469 void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
470   impl_->Submit(std::move(job), std::function<void()>(), nullptr);
471 }
472 
Schedule(void (* function)(void * arg1),void * arg,void * tag,void (* unschedFunction)(void * arg))473 void ThreadPoolImpl::Schedule(void(*function)(void* arg1), void* arg,
474   void* tag, void(*unschedFunction)(void* arg)) {
475   if (unschedFunction == nullptr) {
476     impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
477   } else {
478     impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
479                   tag);
480   }
481 }
482 
UnSchedule(void * arg)483 int ThreadPoolImpl::UnSchedule(void* arg) {
484   return impl_->UnSchedule(arg);
485 }
486 
SetHostEnv(Env * env)487 void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
488 
GetHostEnv() const489 Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
490 
491 // Return the thread priority.
492 // This would allow its member-thread to know its priority.
GetThreadPriority() const493 Env::Priority ThreadPoolImpl::GetThreadPriority() const {
494   return impl_->GetThreadPriority();
495 }
496 
497 // Set the thread priority.
SetThreadPriority(Env::Priority priority)498 void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
499   impl_->SetThreadPriority(priority);
500 }
501 
NewThreadPool(int num_threads)502 ThreadPool* NewThreadPool(int num_threads) {
503   ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
504   thread_pool->SetBackgroundThreads(num_threads);
505   return thread_pool;
506 }
507 
508 }  // namespace ROCKSDB_NAMESPACE
509