1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #pragma once 11 #include <atomic> 12 #include <string> 13 #include "db/version_edit.h" 14 #include "port/port.h" 15 #include "rocksdb/env.h" 16 #include "rocksdb/file_checksum.h" 17 #include "rocksdb/file_system.h" 18 #include "rocksdb/io_status.h" 19 #include "rocksdb/listener.h" 20 #include "rocksdb/rate_limiter.h" 21 #include "test_util/sync_point.h" 22 #include "util/aligned_buffer.h" 23 24 namespace ROCKSDB_NAMESPACE { 25 class Statistics; 26 27 // WritableFileWriter is a wrapper on top of Env::WritableFile. It provides 28 // facilities to: 29 // - Handle Buffered and Direct writes. 30 // - Rate limit writes. 31 // - Flush and Sync the data to the underlying filesystem. 32 // - Notify any interested listeners on the completion of a write. 33 // - Update IO stats. 34 class WritableFileWriter { 35 private: 36 #ifndef ROCKSDB_LITE NotifyOnFileWriteFinish(uint64_t offset,size_t length,const FileOperationInfo::TimePoint & start_ts,const FileOperationInfo::TimePoint & finish_ts,const IOStatus & io_status)37 void NotifyOnFileWriteFinish(uint64_t offset, size_t length, 38 const FileOperationInfo::TimePoint& start_ts, 39 const FileOperationInfo::TimePoint& finish_ts, 40 const IOStatus& io_status) { 41 FileOperationInfo info(file_name_, start_ts, finish_ts); 42 info.offset = offset; 43 info.length = length; 44 info.status = io_status; 45 46 for (auto& listener : listeners_) { 47 listener->OnFileWriteFinish(info); 48 } 49 } 50 #endif // ROCKSDB_LITE 51 ShouldNotifyListeners()52 bool ShouldNotifyListeners() const { return !listeners_.empty(); } 53 void CalculateFileChecksum(const Slice& data); 54 55 std::unique_ptr<FSWritableFile> writable_file_; 56 std::string file_name_; 57 Env* env_; 58 AlignedBuffer buf_; 59 size_t max_buffer_size_; 60 // Actually written data size can be used for truncate 61 // not counting padding data 62 uint64_t filesize_; 63 #ifndef ROCKSDB_LITE 64 // This is necessary when we use unbuffered access 65 // and writes must happen on aligned offsets 66 // so we need to go back and write that page again 67 uint64_t next_write_offset_; 68 #endif // ROCKSDB_LITE 69 bool pending_sync_; 70 uint64_t last_sync_size_; 71 uint64_t bytes_per_sync_; 72 RateLimiter* rate_limiter_; 73 Statistics* stats_; 74 std::vector<std::shared_ptr<EventListener>> listeners_; 75 std::unique_ptr<FileChecksumGenerator> checksum_generator_; 76 bool checksum_finalized_; 77 78 public: 79 WritableFileWriter( 80 std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name, 81 const FileOptions& options, Env* env = nullptr, 82 Statistics* stats = nullptr, 83 const std::vector<std::shared_ptr<EventListener>>& listeners = {}, 84 FileChecksumGenFactory* file_checksum_gen_factory = nullptr) writable_file_(std::move (file))85 : writable_file_(std::move(file)), 86 file_name_(_file_name), 87 env_(env), 88 buf_(), 89 max_buffer_size_(options.writable_file_max_buffer_size), 90 filesize_(0), 91 #ifndef ROCKSDB_LITE 92 next_write_offset_(0), 93 #endif // ROCKSDB_LITE 94 pending_sync_(false), 95 last_sync_size_(0), 96 bytes_per_sync_(options.bytes_per_sync), 97 rate_limiter_(options.rate_limiter), 98 stats_(stats), 99 listeners_(), 100 checksum_generator_(nullptr), 101 checksum_finalized_(false) { 102 TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", 103 reinterpret_cast<void*>(max_buffer_size_)); 104 buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); 105 buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); 106 #ifndef ROCKSDB_LITE 107 std::for_each(listeners.begin(), listeners.end(), 108 [this](const std::shared_ptr<EventListener>& e) { 109 if (e->ShouldBeNotifiedOnFileIO()) { 110 listeners_.emplace_back(e); 111 } 112 }); 113 #else // !ROCKSDB_LITE 114 (void)listeners; 115 #endif 116 if (file_checksum_gen_factory != nullptr) { 117 FileChecksumGenContext checksum_gen_context; 118 checksum_gen_context.file_name = _file_name; 119 checksum_generator_ = 120 file_checksum_gen_factory->CreateFileChecksumGenerator( 121 checksum_gen_context); 122 } 123 } 124 125 WritableFileWriter(const WritableFileWriter&) = delete; 126 127 WritableFileWriter& operator=(const WritableFileWriter&) = delete; 128 ~WritableFileWriter()129 ~WritableFileWriter() { Close(); } 130 file_name()131 std::string file_name() const { return file_name_; } 132 133 IOStatus Append(const Slice& data); 134 135 IOStatus Pad(const size_t pad_bytes); 136 137 IOStatus Flush(); 138 139 IOStatus Close(); 140 141 IOStatus Sync(bool use_fsync); 142 143 // Sync only the data that was already Flush()ed. Safe to call concurrently 144 // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), 145 // returns NotSupported status. 146 IOStatus SyncWithoutFlush(bool use_fsync); 147 GetFileSize()148 uint64_t GetFileSize() const { return filesize_; } 149 InvalidateCache(size_t offset,size_t length)150 IOStatus InvalidateCache(size_t offset, size_t length) { 151 return writable_file_->InvalidateCache(offset, length); 152 } 153 writable_file()154 FSWritableFile* writable_file() const { return writable_file_.get(); } 155 use_direct_io()156 bool use_direct_io() { return writable_file_->use_direct_io(); } 157 TEST_BufferIsEmpty()158 bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } 159 TEST_SetFileChecksumGenerator(FileChecksumGenerator * checksum_generator)160 void TEST_SetFileChecksumGenerator( 161 FileChecksumGenerator* checksum_generator) { 162 checksum_generator_.reset(checksum_generator); 163 } 164 165 std::string GetFileChecksum(); 166 167 const char* GetFileChecksumFuncName() const; 168 169 private: 170 // Used when os buffering is OFF and we are writing 171 // DMA such as in Direct I/O mode 172 #ifndef ROCKSDB_LITE 173 IOStatus WriteDirect(); 174 #endif // !ROCKSDB_LITE 175 // Normal write 176 IOStatus WriteBuffered(const char* data, size_t size); 177 IOStatus RangeSync(uint64_t offset, uint64_t nbytes); 178 IOStatus SyncInternal(bool use_fsync); 179 }; 180 } // namespace ROCKSDB_NAMESPACE 181