1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 // 10 // Logger implementation that uses custom Env object for logging. 11 12 #pragma once 13 14 #include <time.h> 15 #include <atomic> 16 #include <memory> 17 #include "port/sys_time.h" 18 19 #include "file/writable_file_writer.h" 20 #include "monitoring/iostats_context_imp.h" 21 #include "rocksdb/env.h" 22 #include "rocksdb/slice.h" 23 #include "test_util/sync_point.h" 24 #include "util/mutexlock.h" 25 26 namespace ROCKSDB_NAMESPACE { 27 28 class EnvLogger : public Logger { 29 public: 30 EnvLogger(std::unique_ptr<FSWritableFile>&& writable_file, 31 const std::string& fname, const EnvOptions& options, Env* env, 32 InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) Logger(log_level)33 : Logger(log_level), 34 file_(std::move(writable_file), fname, options, env), 35 last_flush_micros_(0), 36 env_(env), 37 flush_pending_(false) {} 38 ~EnvLogger()39 ~EnvLogger() { 40 if (!closed_) { 41 closed_ = true; 42 CloseHelper(); 43 } 44 } 45 46 private: FlushLocked()47 void FlushLocked() { 48 mutex_.AssertHeld(); 49 if (flush_pending_) { 50 flush_pending_ = false; 51 file_.Flush(); 52 } 53 last_flush_micros_ = env_->NowMicros(); 54 } 55 Flush()56 void Flush() override { 57 TEST_SYNC_POINT("EnvLogger::Flush:Begin1"); 58 TEST_SYNC_POINT("EnvLogger::Flush:Begin2"); 59 60 MutexLock l(&mutex_); 61 FlushLocked(); 62 } 63 CloseImpl()64 Status CloseImpl() override { return CloseHelper(); } 65 CloseHelper()66 Status CloseHelper() { 67 mutex_.Lock(); 68 const auto close_status = file_.Close(); 69 mutex_.Unlock(); 70 71 if (close_status.ok()) { 72 return close_status; 73 } 74 return Status::IOError("Close of log file failed with error:" + 75 (close_status.getState() 76 ? std::string(close_status.getState()) 77 : std::string())); 78 } 79 80 using Logger::Logv; Logv(const char * format,va_list ap)81 void Logv(const char* format, va_list ap) override { 82 IOSTATS_TIMER_GUARD(logger_nanos); 83 84 const uint64_t thread_id = env_->GetThreadID(); 85 86 // We try twice: the first time with a fixed-size stack allocated buffer, 87 // and the second time with a much larger dynamically allocated buffer. 88 char buffer[500]; 89 for (int iter = 0; iter < 2; iter++) { 90 char* base; 91 int bufsize; 92 if (iter == 0) { 93 bufsize = sizeof(buffer); 94 base = buffer; 95 } else { 96 bufsize = 65536; 97 base = new char[bufsize]; 98 } 99 char* p = base; 100 char* limit = base + bufsize; 101 102 struct timeval now_tv; 103 gettimeofday(&now_tv, nullptr); 104 const time_t seconds = now_tv.tv_sec; 105 struct tm t; 106 localtime_r(&seconds, &t); 107 p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", 108 t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, 109 t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec), 110 static_cast<long long unsigned int>(thread_id)); 111 112 // Print the message 113 if (p < limit) { 114 va_list backup_ap; 115 va_copy(backup_ap, ap); 116 p += vsnprintf(p, limit - p, format, backup_ap); 117 va_end(backup_ap); 118 } 119 120 // Truncate to available space if necessary 121 if (p >= limit) { 122 if (iter == 0) { 123 continue; // Try again with larger buffer 124 } else { 125 p = limit - 1; 126 } 127 } 128 129 // Add newline if necessary 130 if (p == base || p[-1] != '\n') { 131 *p++ = '\n'; 132 } 133 134 assert(p <= limit); 135 mutex_.Lock(); 136 // We will ignore any error returned by Append(). 137 file_.Append(Slice(base, p - base)); 138 flush_pending_ = true; 139 const uint64_t now_micros = env_->NowMicros(); 140 if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { 141 FlushLocked(); 142 } 143 mutex_.Unlock(); 144 if (base != buffer) { 145 delete[] base; 146 } 147 break; 148 } 149 } 150 GetLogFileSize()151 size_t GetLogFileSize() const override { 152 MutexLock l(&mutex_); 153 return file_.GetFileSize(); 154 } 155 156 private: 157 WritableFileWriter file_; 158 mutable port::Mutex mutex_; // Mutex to protect the shared variables below. 159 const static uint64_t flush_every_seconds_ = 5; 160 std::atomic_uint_fast64_t last_flush_micros_; 161 Env* env_; 162 std::atomic<bool> flush_pending_; 163 }; 164 165 } // namespace ROCKSDB_NAMESPACE 166