1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "rocksdb/env.h"
11 
12 #include <thread>
13 #include "env/composite_env_wrapper.h"
14 #include "logging/env_logger.h"
15 #include "memory/arena.h"
16 #include "options/db_options.h"
17 #include "port/port.h"
18 #include "port/sys_time.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/utilities/object_registry.h"
21 #include "util/autovector.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
Env()25 Env::Env() : thread_status_updater_(nullptr) {
26   file_system_ = std::make_shared<LegacyFileSystemWrapper>(this);
27 }
28 
Env(std::shared_ptr<FileSystem> fs)29 Env::Env(std::shared_ptr<FileSystem> fs)
30   : thread_status_updater_(nullptr),
31     file_system_(fs) {}
32 
~Env()33 Env::~Env() {
34 }
35 
NewLogger(const std::string & fname,std::shared_ptr<Logger> * result)36 Status Env::NewLogger(const std::string& fname,
37                       std::shared_ptr<Logger>* result) {
38   return NewEnvLogger(fname, this, result);
39 }
40 
LoadEnv(const std::string & value,Env ** result)41 Status Env::LoadEnv(const std::string& value, Env** result) {
42   Env* env = *result;
43   Status s;
44 #ifndef ROCKSDB_LITE
45   s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
46 #else
47   s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
48 #endif
49   if (s.ok()) {
50     *result = env;
51   }
52   return s;
53 }
54 
LoadEnv(const std::string & value,Env ** result,std::shared_ptr<Env> * guard)55 Status Env::LoadEnv(const std::string& value, Env** result,
56                     std::shared_ptr<Env>* guard) {
57   assert(result);
58   Status s;
59 #ifndef ROCKSDB_LITE
60   Env* env = nullptr;
61   std::unique_ptr<Env> uniq_guard;
62   std::string err_msg;
63   assert(guard != nullptr);
64   env = ObjectRegistry::NewInstance()->NewObject<Env>(value, &uniq_guard,
65                                                       &err_msg);
66   if (!env) {
67     s = Status::NotFound(std::string("Cannot load ") + Env::Type() + ": " +
68                          value);
69     env = Env::Default();
70   }
71   if (s.ok() && uniq_guard) {
72     guard->reset(uniq_guard.release());
73     *result = guard->get();
74   } else {
75     *result = env;
76   }
77 #else
78   (void)result;
79   (void)guard;
80   s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
81 #endif
82   return s;
83 }
84 
PriorityToString(Env::Priority priority)85 std::string Env::PriorityToString(Env::Priority priority) {
86   switch (priority) {
87     case Env::Priority::BOTTOM:
88       return "Bottom";
89     case Env::Priority::LOW:
90       return "Low";
91     case Env::Priority::HIGH:
92       return "High";
93     case Env::Priority::USER:
94       return "User";
95     case Env::Priority::TOTAL:
96       assert(false);
97   }
98   return "Invalid";
99 }
100 
GetThreadID() const101 uint64_t Env::GetThreadID() const {
102   std::hash<std::thread::id> hasher;
103   return hasher(std::this_thread::get_id());
104 }
105 
ReuseWritableFile(const std::string & fname,const std::string & old_fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)106 Status Env::ReuseWritableFile(const std::string& fname,
107                               const std::string& old_fname,
108                               std::unique_ptr<WritableFile>* result,
109                               const EnvOptions& options) {
110   Status s = RenameFile(old_fname, fname);
111   if (!s.ok()) {
112     return s;
113   }
114   return NewWritableFile(fname, result, options);
115 }
116 
GetChildrenFileAttributes(const std::string & dir,std::vector<FileAttributes> * result)117 Status Env::GetChildrenFileAttributes(const std::string& dir,
118                                       std::vector<FileAttributes>* result) {
119   assert(result != nullptr);
120   std::vector<std::string> child_fnames;
121   Status s = GetChildren(dir, &child_fnames);
122   if (!s.ok()) {
123     return s;
124   }
125   result->resize(child_fnames.size());
126   size_t result_size = 0;
127   for (size_t i = 0; i < child_fnames.size(); ++i) {
128     const std::string path = dir + "/" + child_fnames[i];
129     if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) {
130       if (FileExists(path).IsNotFound()) {
131         // The file may have been deleted since we listed the directory
132         continue;
133       }
134       return s;
135     }
136     (*result)[result_size].name = std::move(child_fnames[i]);
137     result_size++;
138   }
139   result->resize(result_size);
140   return Status::OK();
141 }
142 
~SequentialFile()143 SequentialFile::~SequentialFile() {
144 }
145 
~RandomAccessFile()146 RandomAccessFile::~RandomAccessFile() {
147 }
148 
~WritableFile()149 WritableFile::~WritableFile() {
150 }
151 
~MemoryMappedFileBuffer()152 MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
153 
~Logger()154 Logger::~Logger() {}
155 
Close()156 Status Logger::Close() {
157   if (!closed_) {
158     closed_ = true;
159     return CloseImpl();
160   } else {
161     return Status::OK();
162   }
163 }
164 
CloseImpl()165 Status Logger::CloseImpl() { return Status::NotSupported(); }
166 
~FileLock()167 FileLock::~FileLock() {
168 }
169 
LogFlush(Logger * info_log)170 void LogFlush(Logger *info_log) {
171   if (info_log) {
172     info_log->Flush();
173   }
174 }
175 
Logv(Logger * info_log,const char * format,va_list ap)176 static void Logv(Logger *info_log, const char* format, va_list ap) {
177   if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
178     info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
179   }
180 }
181 
Log(Logger * info_log,const char * format,...)182 void Log(Logger* info_log, const char* format, ...) {
183   va_list ap;
184   va_start(ap, format);
185   Logv(info_log, format, ap);
186   va_end(ap);
187 }
188 
Logv(const InfoLogLevel log_level,const char * format,va_list ap)189 void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
190   static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN",
191     "ERROR", "FATAL" };
192   if (log_level < log_level_) {
193     return;
194   }
195 
196   if (log_level == InfoLogLevel::INFO_LEVEL) {
197     // Doesn't print log level if it is INFO level.
198     // This is to avoid unexpected performance regression after we add
199     // the feature of log level. All the logs before we add the feature
200     // are INFO level. We don't want to add extra costs to those existing
201     // logging.
202     Logv(format, ap);
203   } else if (log_level == InfoLogLevel::HEADER_LEVEL) {
204     LogHeader(format, ap);
205   } else {
206     char new_format[500];
207     snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
208       kInfoLogLevelNames[log_level], format);
209     Logv(new_format, ap);
210   }
211 }
212 
Logv(const InfoLogLevel log_level,Logger * info_log,const char * format,va_list ap)213 static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) {
214   if (info_log && info_log->GetInfoLogLevel() <= log_level) {
215     if (log_level == InfoLogLevel::HEADER_LEVEL) {
216       info_log->LogHeader(format, ap);
217     } else {
218       info_log->Logv(log_level, format, ap);
219     }
220   }
221 }
222 
Log(const InfoLogLevel log_level,Logger * info_log,const char * format,...)223 void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
224          ...) {
225   va_list ap;
226   va_start(ap, format);
227   Logv(log_level, info_log, format, ap);
228   va_end(ap);
229 }
230 
Headerv(Logger * info_log,const char * format,va_list ap)231 static void Headerv(Logger *info_log, const char *format, va_list ap) {
232   if (info_log) {
233     info_log->LogHeader(format, ap);
234   }
235 }
236 
Header(Logger * info_log,const char * format,...)237 void Header(Logger* info_log, const char* format, ...) {
238   va_list ap;
239   va_start(ap, format);
240   Headerv(info_log, format, ap);
241   va_end(ap);
242 }
243 
Debugv(Logger * info_log,const char * format,va_list ap)244 static void Debugv(Logger* info_log, const char* format, va_list ap) {
245   if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
246     info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap);
247   }
248 }
249 
Debug(Logger * info_log,const char * format,...)250 void Debug(Logger* info_log, const char* format, ...) {
251   va_list ap;
252   va_start(ap, format);
253   Debugv(info_log, format, ap);
254   va_end(ap);
255 }
256 
Infov(Logger * info_log,const char * format,va_list ap)257 static void Infov(Logger* info_log, const char* format, va_list ap) {
258   if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
259     info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
260   }
261 }
262 
Info(Logger * info_log,const char * format,...)263 void Info(Logger* info_log, const char* format, ...) {
264   va_list ap;
265   va_start(ap, format);
266   Infov(info_log, format, ap);
267   va_end(ap);
268 }
269 
Warnv(Logger * info_log,const char * format,va_list ap)270 static void Warnv(Logger* info_log, const char* format, va_list ap) {
271   if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) {
272     info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap);
273   }
274 }
275 
Warn(Logger * info_log,const char * format,...)276 void Warn(Logger* info_log, const char* format, ...) {
277   va_list ap;
278   va_start(ap, format);
279   Warnv(info_log, format, ap);
280   va_end(ap);
281 }
282 
Errorv(Logger * info_log,const char * format,va_list ap)283 static void Errorv(Logger* info_log, const char* format, va_list ap) {
284   if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) {
285     info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
286   }
287 }
288 
Error(Logger * info_log,const char * format,...)289 void Error(Logger* info_log, const char* format, ...) {
290   va_list ap;
291   va_start(ap, format);
292   Errorv(info_log, format, ap);
293   va_end(ap);
294 }
295 
Fatalv(Logger * info_log,const char * format,va_list ap)296 static void Fatalv(Logger* info_log, const char* format, va_list ap) {
297   if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) {
298     info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap);
299   }
300 }
301 
Fatal(Logger * info_log,const char * format,...)302 void Fatal(Logger* info_log, const char* format, ...) {
303   va_list ap;
304   va_start(ap, format);
305   Fatalv(info_log, format, ap);
306   va_end(ap);
307 }
308 
LogFlush(const std::shared_ptr<Logger> & info_log)309 void LogFlush(const std::shared_ptr<Logger>& info_log) {
310   LogFlush(info_log.get());
311 }
312 
Log(const InfoLogLevel log_level,const std::shared_ptr<Logger> & info_log,const char * format,...)313 void Log(const InfoLogLevel log_level, const std::shared_ptr<Logger>& info_log,
314          const char* format, ...) {
315   va_list ap;
316   va_start(ap, format);
317   Logv(log_level, info_log.get(), format, ap);
318   va_end(ap);
319 }
320 
Header(const std::shared_ptr<Logger> & info_log,const char * format,...)321 void Header(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
322   va_list ap;
323   va_start(ap, format);
324   Headerv(info_log.get(), format, ap);
325   va_end(ap);
326 }
327 
Debug(const std::shared_ptr<Logger> & info_log,const char * format,...)328 void Debug(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
329   va_list ap;
330   va_start(ap, format);
331   Debugv(info_log.get(), format, ap);
332   va_end(ap);
333 }
334 
Info(const std::shared_ptr<Logger> & info_log,const char * format,...)335 void Info(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
336   va_list ap;
337   va_start(ap, format);
338   Infov(info_log.get(), format, ap);
339   va_end(ap);
340 }
341 
Warn(const std::shared_ptr<Logger> & info_log,const char * format,...)342 void Warn(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
343   va_list ap;
344   va_start(ap, format);
345   Warnv(info_log.get(), format, ap);
346   va_end(ap);
347 }
348 
Error(const std::shared_ptr<Logger> & info_log,const char * format,...)349 void Error(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
350   va_list ap;
351   va_start(ap, format);
352   Errorv(info_log.get(), format, ap);
353   va_end(ap);
354 }
355 
Fatal(const std::shared_ptr<Logger> & info_log,const char * format,...)356 void Fatal(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
357   va_list ap;
358   va_start(ap, format);
359   Fatalv(info_log.get(), format, ap);
360   va_end(ap);
361 }
362 
Log(const std::shared_ptr<Logger> & info_log,const char * format,...)363 void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
364   va_list ap;
365   va_start(ap, format);
366   Logv(info_log.get(), format, ap);
367   va_end(ap);
368 }
369 
WriteStringToFile(Env * env,const Slice & data,const std::string & fname,bool should_sync)370 Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
371                          bool should_sync) {
372   LegacyFileSystemWrapper lfsw(env);
373   return WriteStringToFile(&lfsw, data, fname, should_sync);
374 }
375 
ReadFileToString(Env * env,const std::string & fname,std::string * data)376 Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
377   LegacyFileSystemWrapper lfsw(env);
378   return ReadFileToString(&lfsw, fname, data);
379 }
380 
~EnvWrapper()381 EnvWrapper::~EnvWrapper() {
382 }
383 
384 namespace {  // anonymous namespace
385 
AssignEnvOptions(EnvOptions * env_options,const DBOptions & options)386 void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
387   env_options->use_mmap_reads = options.allow_mmap_reads;
388   env_options->use_mmap_writes = options.allow_mmap_writes;
389   env_options->use_direct_reads = options.use_direct_reads;
390   env_options->set_fd_cloexec = options.is_fd_close_on_exec;
391   env_options->bytes_per_sync = options.bytes_per_sync;
392   env_options->compaction_readahead_size = options.compaction_readahead_size;
393   env_options->random_access_max_buffer_size =
394       options.random_access_max_buffer_size;
395   env_options->rate_limiter = options.rate_limiter.get();
396   env_options->writable_file_max_buffer_size =
397       options.writable_file_max_buffer_size;
398   env_options->allow_fallocate = options.allow_fallocate;
399   env_options->strict_bytes_per_sync = options.strict_bytes_per_sync;
400   options.env->SanitizeEnvOptions(env_options);
401 }
402 
403 }
404 
OptimizeForLogWrite(const EnvOptions & env_options,const DBOptions & db_options) const405 EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
406                                     const DBOptions& db_options) const {
407   EnvOptions optimized_env_options(env_options);
408   optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
409   optimized_env_options.writable_file_max_buffer_size =
410       db_options.writable_file_max_buffer_size;
411   return optimized_env_options;
412 }
413 
OptimizeForManifestWrite(const EnvOptions & env_options) const414 EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
415   return env_options;
416 }
417 
OptimizeForLogRead(const EnvOptions & env_options) const418 EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
419   EnvOptions optimized_env_options(env_options);
420   optimized_env_options.use_direct_reads = false;
421   return optimized_env_options;
422 }
423 
OptimizeForManifestRead(const EnvOptions & env_options) const424 EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
425   EnvOptions optimized_env_options(env_options);
426   optimized_env_options.use_direct_reads = false;
427   return optimized_env_options;
428 }
429 
OptimizeForCompactionTableWrite(const EnvOptions & env_options,const ImmutableDBOptions & db_options) const430 EnvOptions Env::OptimizeForCompactionTableWrite(
431     const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
432   EnvOptions optimized_env_options(env_options);
433   optimized_env_options.use_direct_writes =
434       db_options.use_direct_io_for_flush_and_compaction;
435   return optimized_env_options;
436 }
437 
OptimizeForCompactionTableRead(const EnvOptions & env_options,const ImmutableDBOptions & db_options) const438 EnvOptions Env::OptimizeForCompactionTableRead(
439     const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
440   EnvOptions optimized_env_options(env_options);
441   optimized_env_options.use_direct_reads = db_options.use_direct_reads;
442   return optimized_env_options;
443 }
444 
EnvOptions(const DBOptions & options)445 EnvOptions::EnvOptions(const DBOptions& options) {
446   AssignEnvOptions(this, options);
447 }
448 
EnvOptions()449 EnvOptions::EnvOptions() {
450   DBOptions options;
451   AssignEnvOptions(this, options);
452 }
453 
NewEnvLogger(const std::string & fname,Env * env,std::shared_ptr<Logger> * result)454 Status NewEnvLogger(const std::string& fname, Env* env,
455                     std::shared_ptr<Logger>* result) {
456   EnvOptions options;
457   // TODO: Tune the buffer size.
458   options.writable_file_max_buffer_size = 1024 * 1024;
459   std::unique_ptr<WritableFile> writable_file;
460   const auto status = env->NewWritableFile(fname, &writable_file, options);
461   if (!status.ok()) {
462     return status;
463   }
464 
465   *result = std::make_shared<EnvLogger>(
466       NewLegacyWritableFileWrapper(std::move(writable_file)), fname, options,
467       env);
468   return Status::OK();
469 }
470 
GetFileSystem() const471 const std::shared_ptr<FileSystem>& Env::GetFileSystem() const {
472   return file_system_;
473 }
474 
475 #ifdef OS_WIN
NewCompositeEnv(std::shared_ptr<FileSystem> fs)476 std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
477   return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
478 }
479 #endif
480 
481 }  // namespace ROCKSDB_NAMESPACE
482