1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9 #include <dirent.h>
10 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
11 #include <dlfcn.h>
12 #endif
13 #include <errno.h>
14 #include <fcntl.h>
15 
16 #if defined(OS_LINUX)
17 #include <linux/fs.h>
18 #endif
19 #if defined(ROCKSDB_IOURING_PRESENT)
20 #include <liburing.h>
21 #endif
22 #include <pthread.h>
23 #include <signal.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/ioctl.h>
28 #include <sys/mman.h>
29 #include <sys/stat.h>
30 #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
31 #include <sys/statfs.h>
32 #include <sys/syscall.h>
33 #include <sys/sysmacros.h>
34 #endif
35 #include <sys/statvfs.h>
36 #include <sys/time.h>
37 #include <sys/types.h>
38 #if defined(ROCKSDB_IOURING_PRESENT)
39 #include <sys/uio.h>
40 #endif
41 #include <time.h>
42 #include <algorithm>
43 // Get nano time includes
44 #if defined(OS_LINUX) || defined(OS_FREEBSD)
45 #elif defined(__MACH__)
46 #include <Availability.h>
47 #include <mach/clock.h>
48 #include <mach/mach.h>
49 #else
50 #include <chrono>
51 #endif
52 #include <deque>
53 #include <set>
54 #include <vector>
55 
56 #include "env/composite_env_wrapper.h"
57 #include "env/io_posix.h"
58 #include "logging/logging.h"
59 #include "logging/posix_logger.h"
60 #include "monitoring/iostats_context_imp.h"
61 #include "monitoring/thread_status_updater.h"
62 #include "port/port.h"
63 #include "rocksdb/options.h"
64 #include "rocksdb/slice.h"
65 #include "test_util/sync_point.h"
66 #include "util/coding.h"
67 #include "util/compression_context_cache.h"
68 #include "util/random.h"
69 #include "util/string_util.h"
70 #include "util/thread_local.h"
71 #include "util/threadpool_imp.h"
72 
73 #if !defined(TMPFS_MAGIC)
74 #define TMPFS_MAGIC 0x01021994
75 #endif
76 #if !defined(XFS_SUPER_MAGIC)
77 #define XFS_SUPER_MAGIC 0x58465342
78 #endif
79 #if !defined(EXT4_SUPER_MAGIC)
80 #define EXT4_SUPER_MAGIC 0xEF53
81 #endif
82 
83 namespace ROCKSDB_NAMESPACE {
84 #if defined(OS_WIN)
85 static const std::string kSharedLibExt = ".dll";
86 static const char kPathSeparator = ';';
87 #else
88 static const char kPathSeparator = ':';
89 #if defined(OS_MACOSX)
90 static const std::string kSharedLibExt = ".dylib";
91 #else
92 static const std::string kSharedLibExt = ".so";
93 #endif
94 #endif
95 
96 namespace {
97 
CreateThreadStatusUpdater()98 ThreadStatusUpdater* CreateThreadStatusUpdater() {
99   return new ThreadStatusUpdater();
100 }
101 
102 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
103 class PosixDynamicLibrary : public DynamicLibrary {
104  public:
PosixDynamicLibrary(const std::string & name,void * handle)105   PosixDynamicLibrary(const std::string& name, void* handle)
106       : name_(name), handle_(handle) {}
~PosixDynamicLibrary()107   ~PosixDynamicLibrary() override { dlclose(handle_); }
108 
LoadSymbol(const std::string & sym_name,void ** func)109   Status LoadSymbol(const std::string& sym_name, void** func) override {
110     assert(nullptr != func);
111     dlerror();  // Clear any old error
112     *func = dlsym(handle_, sym_name.c_str());
113     if (*func != nullptr) {
114       return Status::OK();
115     } else {
116       char* err = dlerror();
117       return Status::NotFound("Error finding symbol: " + sym_name, err);
118     }
119   }
120 
Name() const121   const char* Name() const override { return name_.c_str(); }
122 
123  private:
124   std::string name_;
125   void* handle_;
126 };
127 #endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
128 
129 class PosixEnv : public CompositeEnvWrapper {
130  public:
131   // This constructor is for constructing non-default Envs, mainly by
132   // NewCompositeEnv(). It allows new instances to share the same
133   // threadpool and other resources as the default Env, while allowing
134   // a non-default FileSystem implementation
135   PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs);
136 
~PosixEnv()137   ~PosixEnv() override {
138     if (this == Env::Default()) {
139       for (const auto tid : threads_to_join_) {
140         pthread_join(tid, nullptr);
141       }
142       for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
143         thread_pools_[pool_id].JoinAllThreads();
144       }
145       // Do not delete the thread_status_updater_ in order to avoid the
146       // free after use when Env::Default() is destructed while some other
147       // child threads are still trying to update thread status. All
148       // PosixEnv instances use the same thread_status_updater_, so never
149       // explicitly delete it.
150     }
151   }
152 
SetFD_CLOEXEC(int fd,const EnvOptions * options)153   void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
154     if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
155       fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
156     }
157   }
158 
159 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
160   // Loads the named library into the result.
161   // If the input name is empty, the current executable is loaded
162   // On *nix systems, a "lib" prefix is added to the name if one is not supplied
163   // Comparably, the appropriate shared library extension is added to the name
164   // if not supplied. If search_path is not specified, the shared library will
165   // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
166   // specified, the shared library will be searched for in the directories
167   // provided by the search path
LoadLibrary(const std::string & name,const std::string & path,std::shared_ptr<DynamicLibrary> * result)168   Status LoadLibrary(const std::string& name, const std::string& path,
169                      std::shared_ptr<DynamicLibrary>* result) override {
170     Status status;
171     assert(result != nullptr);
172     if (name.empty()) {
173       void* hndl = dlopen(NULL, RTLD_NOW);
174       if (hndl != nullptr) {
175         result->reset(new PosixDynamicLibrary(name, hndl));
176         return Status::OK();
177       }
178     } else {
179       std::string library_name = name;
180       if (library_name.find(kSharedLibExt) == std::string::npos) {
181         library_name = library_name + kSharedLibExt;
182       }
183 #if !defined(OS_WIN)
184       if (library_name.find('/') == std::string::npos &&
185           library_name.compare(0, 3, "lib") != 0) {
186         library_name = "lib" + library_name;
187       }
188 #endif
189       if (path.empty()) {
190         void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
191         if (hndl != nullptr) {
192           result->reset(new PosixDynamicLibrary(library_name, hndl));
193           return Status::OK();
194         }
195       } else {
196         std::string local_path;
197         std::stringstream ss(path);
198         while (getline(ss, local_path, kPathSeparator)) {
199           if (!path.empty()) {
200             std::string full_name = local_path + "/" + library_name;
201             void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
202             if (hndl != nullptr) {
203               result->reset(new PosixDynamicLibrary(full_name, hndl));
204               return Status::OK();
205             }
206           }
207         }
208       }
209     }
210     return Status::IOError(
211         IOErrorMsg("Failed to open shared library: xs", name), dlerror());
212   }
213 #endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
214 
215   void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
216                 void* tag = nullptr,
217                 void (*unschedFunction)(void* arg) = nullptr) override;
218 
219   int UnSchedule(void* arg, Priority pri) override;
220 
221   void StartThread(void (*function)(void* arg), void* arg) override;
222 
223   void WaitForJoin() override;
224 
225   unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
226 
GetTestDirectory(std::string * result)227   Status GetTestDirectory(std::string* result) override {
228     const char* env = getenv("TEST_TMPDIR");
229     if (env && env[0] != '\0') {
230       *result = env;
231     } else {
232       char buf[100];
233       snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
234       *result = buf;
235     }
236     // Directory may already exist
237     CreateDir(*result);
238     return Status::OK();
239   }
240 
GetThreadList(std::vector<ThreadStatus> * thread_list)241   Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
242     assert(thread_status_updater_);
243     return thread_status_updater_->GetThreadList(thread_list);
244   }
245 
gettid(pthread_t tid)246   static uint64_t gettid(pthread_t tid) {
247     uint64_t thread_id = 0;
248     memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
249     return thread_id;
250   }
251 
gettid()252   static uint64_t gettid() {
253     pthread_t tid = pthread_self();
254     return gettid(tid);
255   }
256 
GetThreadID() const257   uint64_t GetThreadID() const override { return gettid(pthread_self()); }
258 
NowMicros()259   uint64_t NowMicros() override {
260     struct timeval tv;
261     gettimeofday(&tv, nullptr);
262     return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
263   }
264 
NowNanos()265   uint64_t NowNanos() override {
266 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
267     struct timespec ts;
268     clock_gettime(CLOCK_MONOTONIC, &ts);
269     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
270 #elif defined(OS_SOLARIS)
271     return gethrtime();
272 #elif defined(__MACH__)
273     clock_serv_t cclock;
274     mach_timespec_t ts;
275     host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
276     clock_get_time(cclock, &ts);
277     mach_port_deallocate(mach_task_self(), cclock);
278     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
279 #else
280     return std::chrono::duration_cast<std::chrono::nanoseconds>(
281        std::chrono::steady_clock::now().time_since_epoch()).count();
282 #endif
283   }
284 
NowCPUNanos()285   uint64_t NowCPUNanos() override {
286 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
287     (defined(__MACH__) && defined(__MAC_10_12))
288     struct timespec ts;
289     clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
290     return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
291 #endif
292     return 0;
293   }
294 
SleepForMicroseconds(int micros)295   void SleepForMicroseconds(int micros) override { usleep(micros); }
296 
GetHostName(char * name,uint64_t len)297   Status GetHostName(char* name, uint64_t len) override {
298     int ret = gethostname(name, static_cast<size_t>(len));
299     if (ret < 0) {
300       if (errno == EFAULT || errno == EINVAL) {
301         return Status::InvalidArgument(strerror(errno));
302       } else {
303         return IOError("GetHostName", name, errno);
304       }
305     }
306     return Status::OK();
307   }
308 
GetCurrentTime(int64_t * unix_time)309   Status GetCurrentTime(int64_t* unix_time) override {
310     time_t ret = time(nullptr);
311     if (ret == (time_t) -1) {
312       return IOError("GetCurrentTime", "", errno);
313     }
314     *unix_time = (int64_t) ret;
315     return Status::OK();
316   }
317 
GetThreadStatusUpdater() const318   ThreadStatusUpdater* GetThreadStatusUpdater() const override {
319     return Env::GetThreadStatusUpdater();
320   }
321 
GenerateUniqueId()322   std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
323 
324   // Allow increasing the number of worker threads.
SetBackgroundThreads(int num,Priority pri)325   void SetBackgroundThreads(int num, Priority pri) override {
326     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
327     thread_pools_[pri].SetBackgroundThreads(num);
328   }
329 
GetBackgroundThreads(Priority pri)330   int GetBackgroundThreads(Priority pri) override {
331     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
332     return thread_pools_[pri].GetBackgroundThreads();
333   }
334 
SetAllowNonOwnerAccess(bool allow_non_owner_access)335   Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
336     allow_non_owner_access_ = allow_non_owner_access;
337     return Status::OK();
338   }
339 
340   // Allow increasing the number of worker threads.
IncBackgroundThreadsIfNeeded(int num,Priority pri)341   void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
342     assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
343     thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
344   }
345 
LowerThreadPoolIOPriority(Priority pool=LOW)346   void LowerThreadPoolIOPriority(Priority pool = LOW) override {
347     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
348 #ifdef OS_LINUX
349     thread_pools_[pool].LowerIOPriority();
350 #else
351     (void)pool;
352 #endif
353   }
354 
LowerThreadPoolCPUPriority(Priority pool=LOW)355   void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
356     assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
357 #ifdef OS_LINUX
358     thread_pools_[pool].LowerCPUPriority();
359 #else
360     (void)pool;
361 #endif
362   }
363 
TimeToString(uint64_t secondsSince1970)364   std::string TimeToString(uint64_t secondsSince1970) override {
365     const time_t seconds = (time_t)secondsSince1970;
366     struct tm t;
367     int maxsize = 64;
368     std::string dummy;
369     dummy.reserve(maxsize);
370     dummy.resize(maxsize);
371     char* p = &dummy[0];
372     localtime_r(&seconds, &t);
373     snprintf(p, maxsize,
374              "%04d/%02d/%02d-%02d:%02d:%02d ",
375              t.tm_year + 1900,
376              t.tm_mon + 1,
377              t.tm_mday,
378              t.tm_hour,
379              t.tm_min,
380              t.tm_sec);
381     return dummy;
382   }
383 
384  private:
385   friend Env* Env::Default();
386   // Constructs the default Env, a singleton
387   PosixEnv();
388 
389   // The below 4 members are only used by the default PosixEnv instance.
390   // Non-default instances simply maintain references to the backing
391   // members in te default instance
392   std::vector<ThreadPoolImpl> thread_pools_storage_;
393   pthread_mutex_t mu_storage_;
394   std::vector<pthread_t> threads_to_join_storage_;
395   bool allow_non_owner_access_storage_;
396 
397   std::vector<ThreadPoolImpl>& thread_pools_;
398   pthread_mutex_t& mu_;
399   std::vector<pthread_t>& threads_to_join_;
400   // If true, allow non owner read access for db files. Otherwise, non-owner
401   //  has no access to db files.
402   bool& allow_non_owner_access_;
403 };
404 
PosixEnv()405 PosixEnv::PosixEnv()
406     : CompositeEnvWrapper(this, FileSystem::Default()),
407       thread_pools_storage_(Priority::TOTAL),
408       allow_non_owner_access_storage_(true),
409       thread_pools_(thread_pools_storage_),
410       mu_(mu_storage_),
411       threads_to_join_(threads_to_join_storage_),
412       allow_non_owner_access_(allow_non_owner_access_storage_) {
413   ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
414   for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
415     thread_pools_[pool_id].SetThreadPriority(
416         static_cast<Env::Priority>(pool_id));
417     // This allows later initializing the thread-local-env of each thread.
418     thread_pools_[pool_id].SetHostEnv(this);
419   }
420   thread_status_updater_ = CreateThreadStatusUpdater();
421 }
422 
PosixEnv(const PosixEnv * default_env,std::shared_ptr<FileSystem> fs)423 PosixEnv::PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs)
424   : CompositeEnvWrapper(this, fs),
425     thread_pools_(default_env->thread_pools_),
426     mu_(default_env->mu_),
427     threads_to_join_(default_env->threads_to_join_),
428     allow_non_owner_access_(default_env->allow_non_owner_access_) {
429   thread_status_updater_ = default_env->thread_status_updater_;
430 }
431 
Schedule(void (* function)(void * arg1),void * arg,Priority pri,void * tag,void (* unschedFunction)(void * arg))432 void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
433                         void* tag, void (*unschedFunction)(void* arg)) {
434   assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
435   thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
436 }
437 
UnSchedule(void * arg,Priority pri)438 int PosixEnv::UnSchedule(void* arg, Priority pri) {
439   return thread_pools_[pri].UnSchedule(arg);
440 }
441 
GetThreadPoolQueueLen(Priority pri) const442 unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
443   assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
444   return thread_pools_[pri].GetQueueLen();
445 }
446 
447 struct StartThreadState {
448   void (*user_function)(void*);
449   void* arg;
450 };
451 
StartThreadWrapper(void * arg)452 static void* StartThreadWrapper(void* arg) {
453   StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
454   state->user_function(state->arg);
455   delete state;
456   return nullptr;
457 }
458 
StartThread(void (* function)(void * arg),void * arg)459 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
460   pthread_t t;
461   StartThreadState* state = new StartThreadState;
462   state->user_function = function;
463   state->arg = arg;
464   ThreadPoolImpl::PthreadCall(
465       "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
466   ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
467   threads_to_join_.push_back(t);
468   ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
469 }
470 
WaitForJoin()471 void PosixEnv::WaitForJoin() {
472   for (const auto tid : threads_to_join_) {
473     pthread_join(tid, nullptr);
474   }
475   threads_to_join_.clear();
476 }
477 
478 }  // namespace
479 
GenerateUniqueId()480 std::string Env::GenerateUniqueId() {
481   std::string uuid_file = "/proc/sys/kernel/random/uuid";
482 
483   Status s = FileExists(uuid_file);
484   if (s.ok()) {
485     std::string uuid;
486     s = ReadFileToString(this, uuid_file, &uuid);
487     if (s.ok()) {
488       return uuid;
489     }
490   }
491   // Could not read uuid_file - generate uuid using "nanos-random"
492   Random64 r(time(nullptr));
493   uint64_t random_uuid_portion =
494     r.Uniform(std::numeric_limits<uint64_t>::max());
495   uint64_t nanos_uuid_portion = NowNanos();
496   char uuid2[200];
497   snprintf(uuid2,
498            200,
499            "%lx-%lx",
500            (unsigned long)nanos_uuid_portion,
501            (unsigned long)random_uuid_portion);
502   return uuid2;
503 }
504 
505 //
506 // Default Posix Env
507 //
Default()508 Env* Env::Default() {
509   // The following function call initializes the singletons of ThreadLocalPtr
510   // right before the static default_env.  This guarantees default_env will
511   // always being destructed before the ThreadLocalPtr singletons get
512   // destructed as C++ guarantees that the destructions of static variables
513   // is in the reverse order of their constructions.
514   //
515   // Since static members are destructed in the reverse order
516   // of their construction, having this call here guarantees that
517   // the destructor of static PosixEnv will go first, then the
518   // the singletons of ThreadLocalPtr.
519   ThreadLocalPtr::InitSingletons();
520   CompressionContextCache::InitSingleton();
521   INIT_SYNC_POINT_SINGLETONS();
522   static PosixEnv default_env;
523   return &default_env;
524 }
525 
NewCompositeEnv(std::shared_ptr<FileSystem> fs)526 std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
527   PosixEnv* default_env = static_cast<PosixEnv*>(Env::Default());
528   return std::unique_ptr<Env>(new PosixEnv(default_env, fs));
529 }
530 
531 }  // namespace ROCKSDB_NAMESPACE
532