1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "db/transaction_log_impl.h"
9 #include <cinttypes>
10 #include "db/write_batch_internal.h"
11 #include "file/sequence_file_reader.h"
12
13 namespace ROCKSDB_NAMESPACE {
14
TransactionLogIteratorImpl(const std::string & dir,const ImmutableDBOptions * options,const TransactionLogIterator::ReadOptions & read_options,const EnvOptions & soptions,const SequenceNumber seq,std::unique_ptr<VectorLogPtr> files,VersionSet const * const versions,const bool seq_per_batch)15 TransactionLogIteratorImpl::TransactionLogIteratorImpl(
16 const std::string& dir, const ImmutableDBOptions* options,
17 const TransactionLogIterator::ReadOptions& read_options,
18 const EnvOptions& soptions, const SequenceNumber seq,
19 std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
20 const bool seq_per_batch)
21 : dir_(dir),
22 options_(options),
23 read_options_(read_options),
24 soptions_(soptions),
25 starting_sequence_number_(seq),
26 files_(std::move(files)),
27 started_(false),
28 is_valid_(false),
29 current_file_index_(0),
30 current_batch_seq_(0),
31 current_last_seq_(0),
32 versions_(versions),
33 seq_per_batch_(seq_per_batch) {
34 assert(files_ != nullptr);
35 assert(versions_ != nullptr);
36
37 reporter_.env = options_->env;
38 reporter_.info_log = options_->info_log.get();
39 SeekToStartSequence(); // Seek till starting sequence
40 }
41
OpenLogFile(const LogFile * log_file,std::unique_ptr<SequentialFileReader> * file_reader)42 Status TransactionLogIteratorImpl::OpenLogFile(
43 const LogFile* log_file,
44 std::unique_ptr<SequentialFileReader>* file_reader) {
45 FileSystem* fs = options_->fs.get();
46 std::unique_ptr<FSSequentialFile> file;
47 std::string fname;
48 Status s;
49 EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
50 if (log_file->Type() == kArchivedLogFile) {
51 fname = ArchivedLogFileName(dir_, log_file->LogNumber());
52 s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
53 } else {
54 fname = LogFileName(dir_, log_file->LogNumber());
55 s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
56 if (!s.ok()) {
57 // If cannot open file in DB directory.
58 // Try the archive dir, as it could have moved in the meanwhile.
59 fname = ArchivedLogFileName(dir_, log_file->LogNumber());
60 s = fs->NewSequentialFile(fname, optimized_env_options,
61 &file, nullptr);
62 }
63 }
64 if (s.ok()) {
65 file_reader->reset(new SequentialFileReader(std::move(file), fname));
66 }
67 return s;
68 }
69
GetBatch()70 BatchResult TransactionLogIteratorImpl::GetBatch() {
71 assert(is_valid_); // cannot call in a non valid state.
72 BatchResult result;
73 result.sequence = current_batch_seq_;
74 result.writeBatchPtr = std::move(current_batch_);
75 return result;
76 }
77
status()78 Status TransactionLogIteratorImpl::status() { return current_status_; }
79
Valid()80 bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
81
RestrictedRead(Slice * record)82 bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
83 // Don't read if no more complete entries to read from logs
84 if (current_last_seq_ >= versions_->LastSequence()) {
85 return false;
86 }
87 return current_log_reader_->ReadRecord(record, &scratch_);
88 }
89
SeekToStartSequence(uint64_t start_file_index,bool strict)90 void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
91 bool strict) {
92 Slice record;
93 started_ = false;
94 is_valid_ = false;
95 if (files_->size() <= start_file_index) {
96 return;
97 }
98 Status s =
99 OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
100 if (!s.ok()) {
101 current_status_ = s;
102 reporter_.Info(current_status_.ToString().c_str());
103 return;
104 }
105 while (RestrictedRead(&record)) {
106 if (record.size() < WriteBatchInternal::kHeader) {
107 reporter_.Corruption(
108 record.size(), Status::Corruption("very small log record"));
109 continue;
110 }
111 UpdateCurrentWriteBatch(record);
112 if (current_last_seq_ >= starting_sequence_number_) {
113 if (strict && current_batch_seq_ != starting_sequence_number_) {
114 current_status_ = Status::Corruption(
115 "Gap in sequence number. Could not "
116 "seek to required sequence number");
117 reporter_.Info(current_status_.ToString().c_str());
118 return;
119 } else if (strict) {
120 reporter_.Info("Could seek required sequence number. Iterator will "
121 "continue.");
122 }
123 is_valid_ = true;
124 started_ = true; // set started_ as we could seek till starting sequence
125 return;
126 } else {
127 is_valid_ = false;
128 }
129 }
130
131 // Could not find start sequence in first file. Normally this must be the
132 // only file. Otherwise log the error and let the iterator return next entry
133 // If strict is set, we want to seek exactly till the start sequence and it
134 // should have been present in the file we scanned above
135 if (strict) {
136 current_status_ = Status::Corruption(
137 "Gap in sequence number. Could not "
138 "seek to required sequence number");
139 reporter_.Info(current_status_.ToString().c_str());
140 } else if (files_->size() != 1) {
141 current_status_ = Status::Corruption(
142 "Start sequence was not found, "
143 "skipping to the next available");
144 reporter_.Info(current_status_.ToString().c_str());
145 // Let NextImpl find the next available entry. started_ remains false
146 // because we don't want to check for gaps while moving to start sequence
147 NextImpl(true);
148 }
149 }
150
Next()151 void TransactionLogIteratorImpl::Next() {
152 return NextImpl(false);
153 }
154
NextImpl(bool internal)155 void TransactionLogIteratorImpl::NextImpl(bool internal) {
156 Slice record;
157 is_valid_ = false;
158 if (!internal && !started_) {
159 // Runs every time until we can seek to the start sequence
160 return SeekToStartSequence();
161 }
162 while(true) {
163 assert(current_log_reader_);
164 if (current_log_reader_->IsEOF()) {
165 current_log_reader_->UnmarkEOF();
166 }
167 while (RestrictedRead(&record)) {
168 if (record.size() < WriteBatchInternal::kHeader) {
169 reporter_.Corruption(
170 record.size(), Status::Corruption("very small log record"));
171 continue;
172 } else {
173 // started_ should be true if called by application
174 assert(internal || started_);
175 // started_ should be false if called internally
176 assert(!internal || !started_);
177 UpdateCurrentWriteBatch(record);
178 if (internal && !started_) {
179 started_ = true;
180 }
181 return;
182 }
183 }
184
185 // Open the next file
186 if (current_file_index_ < files_->size() - 1) {
187 ++current_file_index_;
188 Status s = OpenLogReader(files_->at(current_file_index_).get());
189 if (!s.ok()) {
190 is_valid_ = false;
191 current_status_ = s;
192 return;
193 }
194 } else {
195 is_valid_ = false;
196 if (current_last_seq_ == versions_->LastSequence()) {
197 current_status_ = Status::OK();
198 } else {
199 const char* msg = "Create a new iterator to fetch the new tail.";
200 current_status_ = Status::TryAgain(msg);
201 }
202 return;
203 }
204 }
205 }
206
IsBatchExpected(const WriteBatch * batch,const SequenceNumber expected_seq)207 bool TransactionLogIteratorImpl::IsBatchExpected(
208 const WriteBatch* batch, const SequenceNumber expected_seq) {
209 assert(batch);
210 SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
211 if (batchSeq != expected_seq) {
212 char buf[200];
213 snprintf(buf, sizeof(buf),
214 "Discontinuity in log records. Got seq=%" PRIu64
215 ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
216 ".Log iterator will reseek the correct batch.",
217 batchSeq, expected_seq, versions_->LastSequence());
218 reporter_.Info(buf);
219 return false;
220 }
221 return true;
222 }
223
UpdateCurrentWriteBatch(const Slice & record)224 void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
225 std::unique_ptr<WriteBatch> batch(new WriteBatch());
226 WriteBatchInternal::SetContents(batch.get(), record);
227
228 SequenceNumber expected_seq = current_last_seq_ + 1;
229 // If the iterator has started, then confirm that we get continuous batches
230 if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
231 // Seek to the batch having expected sequence number
232 if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
233 // Expected batch must lie in the previous log file
234 // Avoid underflow.
235 if (current_file_index_ != 0) {
236 current_file_index_--;
237 }
238 }
239 starting_sequence_number_ = expected_seq;
240 // currentStatus_ will be set to Ok if reseek succeeds
241 // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
242 // that allows gaps in the WAL since it will still skip over the gap.
243 current_status_ = Status::NotFound("Gap in sequence numbers");
244 // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
245 // should be disabled
246 return SeekToStartSequence(current_file_index_, !seq_per_batch_);
247 }
248
249 struct BatchCounter : public WriteBatch::Handler {
250 SequenceNumber sequence_;
251 BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
252 Status MarkNoop(bool empty_batch) override {
253 if (!empty_batch) {
254 sequence_++;
255 }
256 return Status::OK();
257 }
258 Status MarkEndPrepare(const Slice&) override {
259 sequence_++;
260 return Status::OK();
261 }
262 Status MarkCommit(const Slice&) override {
263 sequence_++;
264 return Status::OK();
265 }
266
267 Status PutCF(uint32_t /*cf*/, const Slice& /*key*/,
268 const Slice& /*val*/) override {
269 return Status::OK();
270 }
271 Status DeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
272 return Status::OK();
273 }
274 Status SingleDeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
275 return Status::OK();
276 }
277 Status MergeCF(uint32_t /*cf*/, const Slice& /*key*/,
278 const Slice& /*val*/) override {
279 return Status::OK();
280 }
281 Status MarkBeginPrepare(bool) override { return Status::OK(); }
282 Status MarkRollback(const Slice&) override { return Status::OK(); }
283 };
284
285 current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
286 if (seq_per_batch_) {
287 BatchCounter counter(current_batch_seq_);
288 batch->Iterate(&counter);
289 current_last_seq_ = counter.sequence_;
290 } else {
291 current_last_seq_ =
292 current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
293 }
294 // currentBatchSeq_ can only change here
295 assert(current_last_seq_ <= versions_->LastSequence());
296
297 current_batch_ = std::move(batch);
298 is_valid_ = true;
299 current_status_ = Status::OK();
300 }
301
OpenLogReader(const LogFile * log_file)302 Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
303 std::unique_ptr<SequentialFileReader> file;
304 Status s = OpenLogFile(log_file, &file);
305 if (!s.ok()) {
306 return s;
307 }
308 assert(file);
309 current_log_reader_.reset(
310 new log::Reader(options_->info_log, std::move(file), &reporter_,
311 read_options_.verify_checksums_, log_file->LogNumber()));
312 return Status::OK();
313 }
314 } // namespace ROCKSDB_NAMESPACE
315 #endif // ROCKSDB_LITE
316