1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "db/transaction_log_impl.h"
9 #include <cinttypes>
10 #include "db/write_batch_internal.h"
11 #include "file/sequence_file_reader.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 
TransactionLogIteratorImpl(const std::string & dir,const ImmutableDBOptions * options,const TransactionLogIterator::ReadOptions & read_options,const EnvOptions & soptions,const SequenceNumber seq,std::unique_ptr<VectorLogPtr> files,VersionSet const * const versions,const bool seq_per_batch)15 TransactionLogIteratorImpl::TransactionLogIteratorImpl(
16     const std::string& dir, const ImmutableDBOptions* options,
17     const TransactionLogIterator::ReadOptions& read_options,
18     const EnvOptions& soptions, const SequenceNumber seq,
19     std::unique_ptr<VectorLogPtr> files, VersionSet const* const versions,
20     const bool seq_per_batch)
21     : dir_(dir),
22       options_(options),
23       read_options_(read_options),
24       soptions_(soptions),
25       starting_sequence_number_(seq),
26       files_(std::move(files)),
27       started_(false),
28       is_valid_(false),
29       current_file_index_(0),
30       current_batch_seq_(0),
31       current_last_seq_(0),
32       versions_(versions),
33       seq_per_batch_(seq_per_batch) {
34   assert(files_ != nullptr);
35   assert(versions_ != nullptr);
36 
37   reporter_.env = options_->env;
38   reporter_.info_log = options_->info_log.get();
39   SeekToStartSequence(); // Seek till starting sequence
40 }
41 
OpenLogFile(const LogFile * log_file,std::unique_ptr<SequentialFileReader> * file_reader)42 Status TransactionLogIteratorImpl::OpenLogFile(
43     const LogFile* log_file,
44     std::unique_ptr<SequentialFileReader>* file_reader) {
45   FileSystem* fs = options_->fs.get();
46   std::unique_ptr<FSSequentialFile> file;
47   std::string fname;
48   Status s;
49   EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
50   if (log_file->Type() == kArchivedLogFile) {
51     fname = ArchivedLogFileName(dir_, log_file->LogNumber());
52     s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
53   } else {
54     fname = LogFileName(dir_, log_file->LogNumber());
55     s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
56     if (!s.ok()) {
57       //  If cannot open file in DB directory.
58       //  Try the archive dir, as it could have moved in the meanwhile.
59       fname = ArchivedLogFileName(dir_, log_file->LogNumber());
60       s = fs->NewSequentialFile(fname, optimized_env_options,
61                                 &file, nullptr);
62     }
63   }
64   if (s.ok()) {
65     file_reader->reset(new SequentialFileReader(std::move(file), fname));
66   }
67   return s;
68 }
69 
GetBatch()70 BatchResult TransactionLogIteratorImpl::GetBatch()  {
71   assert(is_valid_);  //  cannot call in a non valid state.
72   BatchResult result;
73   result.sequence = current_batch_seq_;
74   result.writeBatchPtr = std::move(current_batch_);
75   return result;
76 }
77 
status()78 Status TransactionLogIteratorImpl::status() { return current_status_; }
79 
Valid()80 bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
81 
RestrictedRead(Slice * record)82 bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
83   // Don't read if no more complete entries to read from logs
84   if (current_last_seq_ >= versions_->LastSequence()) {
85     return false;
86   }
87   return current_log_reader_->ReadRecord(record, &scratch_);
88 }
89 
SeekToStartSequence(uint64_t start_file_index,bool strict)90 void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
91                                                      bool strict) {
92   Slice record;
93   started_ = false;
94   is_valid_ = false;
95   if (files_->size() <= start_file_index) {
96     return;
97   }
98   Status s =
99       OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
100   if (!s.ok()) {
101     current_status_ = s;
102     reporter_.Info(current_status_.ToString().c_str());
103     return;
104   }
105   while (RestrictedRead(&record)) {
106     if (record.size() < WriteBatchInternal::kHeader) {
107       reporter_.Corruption(
108         record.size(), Status::Corruption("very small log record"));
109       continue;
110     }
111     UpdateCurrentWriteBatch(record);
112     if (current_last_seq_ >= starting_sequence_number_) {
113       if (strict && current_batch_seq_ != starting_sequence_number_) {
114         current_status_ = Status::Corruption(
115             "Gap in sequence number. Could not "
116             "seek to required sequence number");
117         reporter_.Info(current_status_.ToString().c_str());
118         return;
119       } else if (strict) {
120         reporter_.Info("Could seek required sequence number. Iterator will "
121                        "continue.");
122       }
123       is_valid_ = true;
124       started_ = true; // set started_ as we could seek till starting sequence
125       return;
126     } else {
127       is_valid_ = false;
128     }
129   }
130 
131   // Could not find start sequence in first file. Normally this must be the
132   // only file. Otherwise log the error and let the iterator return next entry
133   // If strict is set, we want to seek exactly till the start sequence and it
134   // should have been present in the file we scanned above
135   if (strict) {
136     current_status_ = Status::Corruption(
137         "Gap in sequence number. Could not "
138         "seek to required sequence number");
139     reporter_.Info(current_status_.ToString().c_str());
140   } else if (files_->size() != 1) {
141     current_status_ = Status::Corruption(
142         "Start sequence was not found, "
143         "skipping to the next available");
144     reporter_.Info(current_status_.ToString().c_str());
145     // Let NextImpl find the next available entry. started_ remains false
146     // because we don't want to check for gaps while moving to start sequence
147     NextImpl(true);
148   }
149 }
150 
Next()151 void TransactionLogIteratorImpl::Next() {
152   return NextImpl(false);
153 }
154 
NextImpl(bool internal)155 void TransactionLogIteratorImpl::NextImpl(bool internal) {
156   Slice record;
157   is_valid_ = false;
158   if (!internal && !started_) {
159     // Runs every time until we can seek to the start sequence
160     return SeekToStartSequence();
161   }
162   while(true) {
163     assert(current_log_reader_);
164     if (current_log_reader_->IsEOF()) {
165       current_log_reader_->UnmarkEOF();
166     }
167     while (RestrictedRead(&record)) {
168       if (record.size() < WriteBatchInternal::kHeader) {
169         reporter_.Corruption(
170           record.size(), Status::Corruption("very small log record"));
171         continue;
172       } else {
173         // started_ should be true if called by application
174         assert(internal || started_);
175         // started_ should be false if called internally
176         assert(!internal || !started_);
177         UpdateCurrentWriteBatch(record);
178         if (internal && !started_) {
179           started_ = true;
180         }
181         return;
182       }
183     }
184 
185     // Open the next file
186     if (current_file_index_ < files_->size() - 1) {
187       ++current_file_index_;
188       Status s = OpenLogReader(files_->at(current_file_index_).get());
189       if (!s.ok()) {
190         is_valid_ = false;
191         current_status_ = s;
192         return;
193       }
194     } else {
195       is_valid_ = false;
196       if (current_last_seq_ == versions_->LastSequence()) {
197         current_status_ = Status::OK();
198       } else {
199         const char* msg = "Create a new iterator to fetch the new tail.";
200         current_status_ = Status::TryAgain(msg);
201       }
202       return;
203     }
204   }
205 }
206 
IsBatchExpected(const WriteBatch * batch,const SequenceNumber expected_seq)207 bool TransactionLogIteratorImpl::IsBatchExpected(
208     const WriteBatch* batch, const SequenceNumber expected_seq) {
209   assert(batch);
210   SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
211   if (batchSeq != expected_seq) {
212     char buf[200];
213     snprintf(buf, sizeof(buf),
214              "Discontinuity in log records. Got seq=%" PRIu64
215              ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
216              ".Log iterator will reseek the correct batch.",
217              batchSeq, expected_seq, versions_->LastSequence());
218     reporter_.Info(buf);
219     return false;
220   }
221   return true;
222 }
223 
UpdateCurrentWriteBatch(const Slice & record)224 void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
225   std::unique_ptr<WriteBatch> batch(new WriteBatch());
226   WriteBatchInternal::SetContents(batch.get(), record);
227 
228   SequenceNumber expected_seq = current_last_seq_ + 1;
229   // If the iterator has started, then confirm that we get continuous batches
230   if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
231     // Seek to the batch having expected sequence number
232     if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
233       // Expected batch must lie in the previous log file
234       // Avoid underflow.
235       if (current_file_index_ != 0) {
236         current_file_index_--;
237       }
238     }
239     starting_sequence_number_ = expected_seq;
240     // currentStatus_ will be set to Ok if reseek succeeds
241     // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
242     // that allows gaps in the WAL since it will still skip over the gap.
243     current_status_ = Status::NotFound("Gap in sequence numbers");
244     // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
245     // should be disabled
246     return SeekToStartSequence(current_file_index_, !seq_per_batch_);
247   }
248 
249   struct BatchCounter : public WriteBatch::Handler {
250     SequenceNumber sequence_;
251     BatchCounter(SequenceNumber sequence) : sequence_(sequence) {}
252     Status MarkNoop(bool empty_batch) override {
253       if (!empty_batch) {
254         sequence_++;
255       }
256       return Status::OK();
257     }
258     Status MarkEndPrepare(const Slice&) override {
259       sequence_++;
260       return Status::OK();
261     }
262     Status MarkCommit(const Slice&) override {
263       sequence_++;
264       return Status::OK();
265     }
266 
267     Status PutCF(uint32_t /*cf*/, const Slice& /*key*/,
268                  const Slice& /*val*/) override {
269       return Status::OK();
270     }
271     Status DeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
272       return Status::OK();
273     }
274     Status SingleDeleteCF(uint32_t /*cf*/, const Slice& /*key*/) override {
275       return Status::OK();
276     }
277     Status MergeCF(uint32_t /*cf*/, const Slice& /*key*/,
278                    const Slice& /*val*/) override {
279       return Status::OK();
280     }
281     Status MarkBeginPrepare(bool) override { return Status::OK(); }
282     Status MarkRollback(const Slice&) override { return Status::OK(); }
283   };
284 
285   current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
286   if (seq_per_batch_) {
287     BatchCounter counter(current_batch_seq_);
288     batch->Iterate(&counter);
289     current_last_seq_ = counter.sequence_;
290   } else {
291     current_last_seq_ =
292         current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
293   }
294   // currentBatchSeq_ can only change here
295   assert(current_last_seq_ <= versions_->LastSequence());
296 
297   current_batch_ = std::move(batch);
298   is_valid_ = true;
299   current_status_ = Status::OK();
300 }
301 
OpenLogReader(const LogFile * log_file)302 Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* log_file) {
303   std::unique_ptr<SequentialFileReader> file;
304   Status s = OpenLogFile(log_file, &file);
305   if (!s.ok()) {
306     return s;
307   }
308   assert(file);
309   current_log_reader_.reset(
310       new log::Reader(options_->info_log, std::move(file), &reporter_,
311                       read_options_.verify_checksums_, log_file->LogNumber()));
312   return Status::OK();
313 }
314 }  // namespace ROCKSDB_NAMESPACE
315 #endif  // ROCKSDB_LITE
316