1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <atomic>
12 #include <string>
13 #include "db/version_edit.h"
14 #include "port/port.h"
15 #include "rocksdb/env.h"
16 #include "rocksdb/file_checksum.h"
17 #include "rocksdb/file_system.h"
18 #include "rocksdb/io_status.h"
19 #include "rocksdb/listener.h"
20 #include "rocksdb/rate_limiter.h"
21 #include "test_util/sync_point.h"
22 #include "util/aligned_buffer.h"
23 
24 namespace ROCKSDB_NAMESPACE {
25 class Statistics;
26 
27 // WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
28 // facilities to:
29 // - Handle Buffered and Direct writes.
30 // - Rate limit writes.
31 // - Flush and Sync the data to the underlying filesystem.
32 // - Notify any interested listeners on the completion of a write.
33 // - Update IO stats.
34 class WritableFileWriter {
35  private:
36 #ifndef ROCKSDB_LITE
NotifyOnFileWriteFinish(uint64_t offset,size_t length,const FileOperationInfo::TimePoint & start_ts,const FileOperationInfo::TimePoint & finish_ts,const IOStatus & io_status)37   void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
38                                const FileOperationInfo::TimePoint& start_ts,
39                                const FileOperationInfo::TimePoint& finish_ts,
40                                const IOStatus& io_status) {
41     FileOperationInfo info(file_name_, start_ts, finish_ts);
42     info.offset = offset;
43     info.length = length;
44     info.status = io_status;
45 
46     for (auto& listener : listeners_) {
47       listener->OnFileWriteFinish(info);
48     }
49   }
50 #endif  // ROCKSDB_LITE
51 
ShouldNotifyListeners()52   bool ShouldNotifyListeners() const { return !listeners_.empty(); }
53   void CalculateFileChecksum(const Slice& data);
54 
55   std::unique_ptr<FSWritableFile> writable_file_;
56   std::string file_name_;
57   Env* env_;
58   AlignedBuffer buf_;
59   size_t max_buffer_size_;
60   // Actually written data size can be used for truncate
61   // not counting padding data
62   uint64_t filesize_;
63 #ifndef ROCKSDB_LITE
64   // This is necessary when we use unbuffered access
65   // and writes must happen on aligned offsets
66   // so we need to go back and write that page again
67   uint64_t next_write_offset_;
68 #endif  // ROCKSDB_LITE
69   bool pending_sync_;
70   uint64_t last_sync_size_;
71   uint64_t bytes_per_sync_;
72   RateLimiter* rate_limiter_;
73   Statistics* stats_;
74   std::vector<std::shared_ptr<EventListener>> listeners_;
75   std::unique_ptr<FileChecksumGenerator> checksum_generator_;
76   bool checksum_finalized_;
77 
78  public:
79   WritableFileWriter(
80       std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
81       const FileOptions& options, Env* env = nullptr,
82       Statistics* stats = nullptr,
83       const std::vector<std::shared_ptr<EventListener>>& listeners = {},
84       FileChecksumGenFactory* file_checksum_gen_factory = nullptr)
writable_file_(std::move (file))85       : writable_file_(std::move(file)),
86         file_name_(_file_name),
87         env_(env),
88         buf_(),
89         max_buffer_size_(options.writable_file_max_buffer_size),
90         filesize_(0),
91 #ifndef ROCKSDB_LITE
92         next_write_offset_(0),
93 #endif  // ROCKSDB_LITE
94         pending_sync_(false),
95         last_sync_size_(0),
96         bytes_per_sync_(options.bytes_per_sync),
97         rate_limiter_(options.rate_limiter),
98         stats_(stats),
99         listeners_(),
100         checksum_generator_(nullptr),
101         checksum_finalized_(false) {
102     TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
103                              reinterpret_cast<void*>(max_buffer_size_));
104     buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
105     buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
106 #ifndef ROCKSDB_LITE
107     std::for_each(listeners.begin(), listeners.end(),
108                   [this](const std::shared_ptr<EventListener>& e) {
109                     if (e->ShouldBeNotifiedOnFileIO()) {
110                       listeners_.emplace_back(e);
111                     }
112                   });
113 #else  // !ROCKSDB_LITE
114     (void)listeners;
115 #endif
116     if (file_checksum_gen_factory != nullptr) {
117       FileChecksumGenContext checksum_gen_context;
118       checksum_gen_context.file_name = _file_name;
119       checksum_generator_ =
120           file_checksum_gen_factory->CreateFileChecksumGenerator(
121               checksum_gen_context);
122     }
123   }
124 
125   WritableFileWriter(const WritableFileWriter&) = delete;
126 
127   WritableFileWriter& operator=(const WritableFileWriter&) = delete;
128 
~WritableFileWriter()129   ~WritableFileWriter() { Close(); }
130 
file_name()131   std::string file_name() const { return file_name_; }
132 
133   IOStatus Append(const Slice& data);
134 
135   IOStatus Pad(const size_t pad_bytes);
136 
137   IOStatus Flush();
138 
139   IOStatus Close();
140 
141   IOStatus Sync(bool use_fsync);
142 
143   // Sync only the data that was already Flush()ed. Safe to call concurrently
144   // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
145   // returns NotSupported status.
146   IOStatus SyncWithoutFlush(bool use_fsync);
147 
GetFileSize()148   uint64_t GetFileSize() const { return filesize_; }
149 
InvalidateCache(size_t offset,size_t length)150   IOStatus InvalidateCache(size_t offset, size_t length) {
151     return writable_file_->InvalidateCache(offset, length);
152   }
153 
writable_file()154   FSWritableFile* writable_file() const { return writable_file_.get(); }
155 
use_direct_io()156   bool use_direct_io() { return writable_file_->use_direct_io(); }
157 
TEST_BufferIsEmpty()158   bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
159 
TEST_SetFileChecksumGenerator(FileChecksumGenerator * checksum_generator)160   void TEST_SetFileChecksumGenerator(
161       FileChecksumGenerator* checksum_generator) {
162     checksum_generator_.reset(checksum_generator);
163   }
164 
165   std::string GetFileChecksum();
166 
167   const char* GetFileChecksumFuncName() const;
168 
169  private:
170   // Used when os buffering is OFF and we are writing
171   // DMA such as in Direct I/O mode
172 #ifndef ROCKSDB_LITE
173   IOStatus WriteDirect();
174 #endif  // !ROCKSDB_LITE
175   // Normal write
176   IOStatus WriteBuffered(const char* data, size_t size);
177   IOStatus RangeSync(uint64_t offset, uint64_t nbytes);
178   IOStatus SyncInternal(bool use_fsync);
179 };
180 }  // namespace ROCKSDB_NAMESPACE
181