1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "rocksdb/env.h"
11
12 #include <thread>
13 #include "env/composite_env_wrapper.h"
14 #include "logging/env_logger.h"
15 #include "memory/arena.h"
16 #include "options/db_options.h"
17 #include "port/port.h"
18 #include "port/sys_time.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/utilities/object_registry.h"
21 #include "util/autovector.h"
22
23 namespace ROCKSDB_NAMESPACE {
24
Env()25 Env::Env() : thread_status_updater_(nullptr) {
26 file_system_ = std::make_shared<LegacyFileSystemWrapper>(this);
27 }
28
Env(std::shared_ptr<FileSystem> fs)29 Env::Env(std::shared_ptr<FileSystem> fs)
30 : thread_status_updater_(nullptr),
31 file_system_(fs) {}
32
~Env()33 Env::~Env() {
34 }
35
NewLogger(const std::string & fname,std::shared_ptr<Logger> * result)36 Status Env::NewLogger(const std::string& fname,
37 std::shared_ptr<Logger>* result) {
38 return NewEnvLogger(fname, this, result);
39 }
40
LoadEnv(const std::string & value,Env ** result)41 Status Env::LoadEnv(const std::string& value, Env** result) {
42 Env* env = *result;
43 Status s;
44 #ifndef ROCKSDB_LITE
45 s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
46 #else
47 s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
48 #endif
49 if (s.ok()) {
50 *result = env;
51 }
52 return s;
53 }
54
LoadEnv(const std::string & value,Env ** result,std::shared_ptr<Env> * guard)55 Status Env::LoadEnv(const std::string& value, Env** result,
56 std::shared_ptr<Env>* guard) {
57 assert(result);
58 Status s;
59 #ifndef ROCKSDB_LITE
60 Env* env = nullptr;
61 std::unique_ptr<Env> uniq_guard;
62 std::string err_msg;
63 assert(guard != nullptr);
64 env = ObjectRegistry::NewInstance()->NewObject<Env>(value, &uniq_guard,
65 &err_msg);
66 if (!env) {
67 s = Status::NotFound(std::string("Cannot load ") + Env::Type() + ": " +
68 value);
69 env = Env::Default();
70 }
71 if (s.ok() && uniq_guard) {
72 guard->reset(uniq_guard.release());
73 *result = guard->get();
74 } else {
75 *result = env;
76 }
77 #else
78 (void)result;
79 (void)guard;
80 s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
81 #endif
82 return s;
83 }
84
PriorityToString(Env::Priority priority)85 std::string Env::PriorityToString(Env::Priority priority) {
86 switch (priority) {
87 case Env::Priority::BOTTOM:
88 return "Bottom";
89 case Env::Priority::LOW:
90 return "Low";
91 case Env::Priority::HIGH:
92 return "High";
93 case Env::Priority::USER:
94 return "User";
95 case Env::Priority::TOTAL:
96 assert(false);
97 }
98 return "Invalid";
99 }
100
GetThreadID() const101 uint64_t Env::GetThreadID() const {
102 std::hash<std::thread::id> hasher;
103 return hasher(std::this_thread::get_id());
104 }
105
ReuseWritableFile(const std::string & fname,const std::string & old_fname,std::unique_ptr<WritableFile> * result,const EnvOptions & options)106 Status Env::ReuseWritableFile(const std::string& fname,
107 const std::string& old_fname,
108 std::unique_ptr<WritableFile>* result,
109 const EnvOptions& options) {
110 Status s = RenameFile(old_fname, fname);
111 if (!s.ok()) {
112 return s;
113 }
114 return NewWritableFile(fname, result, options);
115 }
116
GetChildrenFileAttributes(const std::string & dir,std::vector<FileAttributes> * result)117 Status Env::GetChildrenFileAttributes(const std::string& dir,
118 std::vector<FileAttributes>* result) {
119 assert(result != nullptr);
120 std::vector<std::string> child_fnames;
121 Status s = GetChildren(dir, &child_fnames);
122 if (!s.ok()) {
123 return s;
124 }
125 result->resize(child_fnames.size());
126 size_t result_size = 0;
127 for (size_t i = 0; i < child_fnames.size(); ++i) {
128 const std::string path = dir + "/" + child_fnames[i];
129 if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) {
130 if (FileExists(path).IsNotFound()) {
131 // The file may have been deleted since we listed the directory
132 continue;
133 }
134 return s;
135 }
136 (*result)[result_size].name = std::move(child_fnames[i]);
137 result_size++;
138 }
139 result->resize(result_size);
140 return Status::OK();
141 }
142
~SequentialFile()143 SequentialFile::~SequentialFile() {
144 }
145
~RandomAccessFile()146 RandomAccessFile::~RandomAccessFile() {
147 }
148
~WritableFile()149 WritableFile::~WritableFile() {
150 }
151
~MemoryMappedFileBuffer()152 MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
153
~Logger()154 Logger::~Logger() {}
155
Close()156 Status Logger::Close() {
157 if (!closed_) {
158 closed_ = true;
159 return CloseImpl();
160 } else {
161 return Status::OK();
162 }
163 }
164
CloseImpl()165 Status Logger::CloseImpl() { return Status::NotSupported(); }
166
~FileLock()167 FileLock::~FileLock() {
168 }
169
LogFlush(Logger * info_log)170 void LogFlush(Logger *info_log) {
171 if (info_log) {
172 info_log->Flush();
173 }
174 }
175
Logv(Logger * info_log,const char * format,va_list ap)176 static void Logv(Logger *info_log, const char* format, va_list ap) {
177 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
178 info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
179 }
180 }
181
Log(Logger * info_log,const char * format,...)182 void Log(Logger* info_log, const char* format, ...) {
183 va_list ap;
184 va_start(ap, format);
185 Logv(info_log, format, ap);
186 va_end(ap);
187 }
188
Logv(const InfoLogLevel log_level,const char * format,va_list ap)189 void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
190 static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN",
191 "ERROR", "FATAL" };
192 if (log_level < log_level_) {
193 return;
194 }
195
196 if (log_level == InfoLogLevel::INFO_LEVEL) {
197 // Doesn't print log level if it is INFO level.
198 // This is to avoid unexpected performance regression after we add
199 // the feature of log level. All the logs before we add the feature
200 // are INFO level. We don't want to add extra costs to those existing
201 // logging.
202 Logv(format, ap);
203 } else if (log_level == InfoLogLevel::HEADER_LEVEL) {
204 LogHeader(format, ap);
205 } else {
206 char new_format[500];
207 snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
208 kInfoLogLevelNames[log_level], format);
209 Logv(new_format, ap);
210 }
211 }
212
Logv(const InfoLogLevel log_level,Logger * info_log,const char * format,va_list ap)213 static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) {
214 if (info_log && info_log->GetInfoLogLevel() <= log_level) {
215 if (log_level == InfoLogLevel::HEADER_LEVEL) {
216 info_log->LogHeader(format, ap);
217 } else {
218 info_log->Logv(log_level, format, ap);
219 }
220 }
221 }
222
Log(const InfoLogLevel log_level,Logger * info_log,const char * format,...)223 void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
224 ...) {
225 va_list ap;
226 va_start(ap, format);
227 Logv(log_level, info_log, format, ap);
228 va_end(ap);
229 }
230
Headerv(Logger * info_log,const char * format,va_list ap)231 static void Headerv(Logger *info_log, const char *format, va_list ap) {
232 if (info_log) {
233 info_log->LogHeader(format, ap);
234 }
235 }
236
Header(Logger * info_log,const char * format,...)237 void Header(Logger* info_log, const char* format, ...) {
238 va_list ap;
239 va_start(ap, format);
240 Headerv(info_log, format, ap);
241 va_end(ap);
242 }
243
Debugv(Logger * info_log,const char * format,va_list ap)244 static void Debugv(Logger* info_log, const char* format, va_list ap) {
245 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
246 info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap);
247 }
248 }
249
Debug(Logger * info_log,const char * format,...)250 void Debug(Logger* info_log, const char* format, ...) {
251 va_list ap;
252 va_start(ap, format);
253 Debugv(info_log, format, ap);
254 va_end(ap);
255 }
256
Infov(Logger * info_log,const char * format,va_list ap)257 static void Infov(Logger* info_log, const char* format, va_list ap) {
258 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
259 info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
260 }
261 }
262
Info(Logger * info_log,const char * format,...)263 void Info(Logger* info_log, const char* format, ...) {
264 va_list ap;
265 va_start(ap, format);
266 Infov(info_log, format, ap);
267 va_end(ap);
268 }
269
Warnv(Logger * info_log,const char * format,va_list ap)270 static void Warnv(Logger* info_log, const char* format, va_list ap) {
271 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) {
272 info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap);
273 }
274 }
275
Warn(Logger * info_log,const char * format,...)276 void Warn(Logger* info_log, const char* format, ...) {
277 va_list ap;
278 va_start(ap, format);
279 Warnv(info_log, format, ap);
280 va_end(ap);
281 }
282
Errorv(Logger * info_log,const char * format,va_list ap)283 static void Errorv(Logger* info_log, const char* format, va_list ap) {
284 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) {
285 info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
286 }
287 }
288
Error(Logger * info_log,const char * format,...)289 void Error(Logger* info_log, const char* format, ...) {
290 va_list ap;
291 va_start(ap, format);
292 Errorv(info_log, format, ap);
293 va_end(ap);
294 }
295
Fatalv(Logger * info_log,const char * format,va_list ap)296 static void Fatalv(Logger* info_log, const char* format, va_list ap) {
297 if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) {
298 info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap);
299 }
300 }
301
Fatal(Logger * info_log,const char * format,...)302 void Fatal(Logger* info_log, const char* format, ...) {
303 va_list ap;
304 va_start(ap, format);
305 Fatalv(info_log, format, ap);
306 va_end(ap);
307 }
308
LogFlush(const std::shared_ptr<Logger> & info_log)309 void LogFlush(const std::shared_ptr<Logger>& info_log) {
310 LogFlush(info_log.get());
311 }
312
Log(const InfoLogLevel log_level,const std::shared_ptr<Logger> & info_log,const char * format,...)313 void Log(const InfoLogLevel log_level, const std::shared_ptr<Logger>& info_log,
314 const char* format, ...) {
315 va_list ap;
316 va_start(ap, format);
317 Logv(log_level, info_log.get(), format, ap);
318 va_end(ap);
319 }
320
Header(const std::shared_ptr<Logger> & info_log,const char * format,...)321 void Header(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
322 va_list ap;
323 va_start(ap, format);
324 Headerv(info_log.get(), format, ap);
325 va_end(ap);
326 }
327
Debug(const std::shared_ptr<Logger> & info_log,const char * format,...)328 void Debug(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
329 va_list ap;
330 va_start(ap, format);
331 Debugv(info_log.get(), format, ap);
332 va_end(ap);
333 }
334
Info(const std::shared_ptr<Logger> & info_log,const char * format,...)335 void Info(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
336 va_list ap;
337 va_start(ap, format);
338 Infov(info_log.get(), format, ap);
339 va_end(ap);
340 }
341
Warn(const std::shared_ptr<Logger> & info_log,const char * format,...)342 void Warn(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
343 va_list ap;
344 va_start(ap, format);
345 Warnv(info_log.get(), format, ap);
346 va_end(ap);
347 }
348
Error(const std::shared_ptr<Logger> & info_log,const char * format,...)349 void Error(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
350 va_list ap;
351 va_start(ap, format);
352 Errorv(info_log.get(), format, ap);
353 va_end(ap);
354 }
355
Fatal(const std::shared_ptr<Logger> & info_log,const char * format,...)356 void Fatal(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
357 va_list ap;
358 va_start(ap, format);
359 Fatalv(info_log.get(), format, ap);
360 va_end(ap);
361 }
362
Log(const std::shared_ptr<Logger> & info_log,const char * format,...)363 void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
364 va_list ap;
365 va_start(ap, format);
366 Logv(info_log.get(), format, ap);
367 va_end(ap);
368 }
369
WriteStringToFile(Env * env,const Slice & data,const std::string & fname,bool should_sync)370 Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
371 bool should_sync) {
372 LegacyFileSystemWrapper lfsw(env);
373 return WriteStringToFile(&lfsw, data, fname, should_sync);
374 }
375
ReadFileToString(Env * env,const std::string & fname,std::string * data)376 Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
377 LegacyFileSystemWrapper lfsw(env);
378 return ReadFileToString(&lfsw, fname, data);
379 }
380
~EnvWrapper()381 EnvWrapper::~EnvWrapper() {
382 }
383
384 namespace { // anonymous namespace
385
AssignEnvOptions(EnvOptions * env_options,const DBOptions & options)386 void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
387 env_options->use_mmap_reads = options.allow_mmap_reads;
388 env_options->use_mmap_writes = options.allow_mmap_writes;
389 env_options->use_direct_reads = options.use_direct_reads;
390 env_options->set_fd_cloexec = options.is_fd_close_on_exec;
391 env_options->bytes_per_sync = options.bytes_per_sync;
392 env_options->compaction_readahead_size = options.compaction_readahead_size;
393 env_options->random_access_max_buffer_size =
394 options.random_access_max_buffer_size;
395 env_options->rate_limiter = options.rate_limiter.get();
396 env_options->writable_file_max_buffer_size =
397 options.writable_file_max_buffer_size;
398 env_options->allow_fallocate = options.allow_fallocate;
399 env_options->strict_bytes_per_sync = options.strict_bytes_per_sync;
400 options.env->SanitizeEnvOptions(env_options);
401 }
402
403 }
404
OptimizeForLogWrite(const EnvOptions & env_options,const DBOptions & db_options) const405 EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
406 const DBOptions& db_options) const {
407 EnvOptions optimized_env_options(env_options);
408 optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
409 optimized_env_options.writable_file_max_buffer_size =
410 db_options.writable_file_max_buffer_size;
411 return optimized_env_options;
412 }
413
OptimizeForManifestWrite(const EnvOptions & env_options) const414 EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
415 return env_options;
416 }
417
OptimizeForLogRead(const EnvOptions & env_options) const418 EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
419 EnvOptions optimized_env_options(env_options);
420 optimized_env_options.use_direct_reads = false;
421 return optimized_env_options;
422 }
423
OptimizeForManifestRead(const EnvOptions & env_options) const424 EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
425 EnvOptions optimized_env_options(env_options);
426 optimized_env_options.use_direct_reads = false;
427 return optimized_env_options;
428 }
429
OptimizeForCompactionTableWrite(const EnvOptions & env_options,const ImmutableDBOptions & db_options) const430 EnvOptions Env::OptimizeForCompactionTableWrite(
431 const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
432 EnvOptions optimized_env_options(env_options);
433 optimized_env_options.use_direct_writes =
434 db_options.use_direct_io_for_flush_and_compaction;
435 return optimized_env_options;
436 }
437
OptimizeForCompactionTableRead(const EnvOptions & env_options,const ImmutableDBOptions & db_options) const438 EnvOptions Env::OptimizeForCompactionTableRead(
439 const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
440 EnvOptions optimized_env_options(env_options);
441 optimized_env_options.use_direct_reads = db_options.use_direct_reads;
442 return optimized_env_options;
443 }
444
EnvOptions(const DBOptions & options)445 EnvOptions::EnvOptions(const DBOptions& options) {
446 AssignEnvOptions(this, options);
447 }
448
EnvOptions()449 EnvOptions::EnvOptions() {
450 DBOptions options;
451 AssignEnvOptions(this, options);
452 }
453
NewEnvLogger(const std::string & fname,Env * env,std::shared_ptr<Logger> * result)454 Status NewEnvLogger(const std::string& fname, Env* env,
455 std::shared_ptr<Logger>* result) {
456 EnvOptions options;
457 // TODO: Tune the buffer size.
458 options.writable_file_max_buffer_size = 1024 * 1024;
459 std::unique_ptr<WritableFile> writable_file;
460 const auto status = env->NewWritableFile(fname, &writable_file, options);
461 if (!status.ok()) {
462 return status;
463 }
464
465 *result = std::make_shared<EnvLogger>(
466 NewLegacyWritableFileWrapper(std::move(writable_file)), fname, options,
467 env);
468 return Status::OK();
469 }
470
GetFileSystem() const471 const std::shared_ptr<FileSystem>& Env::GetFileSystem() const {
472 return file_system_;
473 }
474
475 #ifdef OS_WIN
NewCompositeEnv(std::shared_ptr<FileSystem> fs)476 std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
477 return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
478 }
479 #endif
480
481 } // namespace ROCKSDB_NAMESPACE
482