1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #pragma once
11 #include <memory>
12 #include <stdint.h>
13 
14 #include "db/log_format.h"
15 #include "file/sequence_file_reader.h"
16 #include "rocksdb/options.h"
17 #include "rocksdb/slice.h"
18 #include "rocksdb/status.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 class Logger;
22 
23 namespace log {
24 
25 /**
26  * Reader is a general purpose log stream reader implementation. The actual job
27  * of reading from the device is implemented by the SequentialFile interface.
28  *
29  * Please see Writer for details on the file and record layout.
30  */
31 class Reader {
32  public:
33   // Interface for reporting errors.
34   class Reporter {
35    public:
36     virtual ~Reporter();
37 
38     // Some corruption was detected.  "size" is the approximate number
39     // of bytes dropped due to the corruption.
40     virtual void Corruption(size_t bytes, const Status& status) = 0;
41   };
42 
43   // Create a reader that will return log records from "*file".
44   // "*file" must remain live while this Reader is in use.
45   //
46   // If "reporter" is non-nullptr, it is notified whenever some data is
47   // dropped due to a detected corruption.  "*reporter" must remain
48   // live while this Reader is in use.
49   //
50   // If "checksum" is true, verify checksums if available.
51   Reader(std::shared_ptr<Logger> info_log,
52          // @lint-ignore TXT2 T25377293 Grandfathered in
53          std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
54          bool checksum, uint64_t log_num);
55   // No copying allowed
56   Reader(const Reader&) = delete;
57   void operator=(const Reader&) = delete;
58 
59   virtual ~Reader();
60 
61   // Read the next record into *record.  Returns true if read
62   // successfully, false if we hit end of the input.  May use
63   // "*scratch" as temporary storage.  The contents filled in *record
64   // will only be valid until the next mutating operation on this
65   // reader or the next mutation to *scratch.
66   virtual bool ReadRecord(Slice* record, std::string* scratch,
67                           WALRecoveryMode wal_recovery_mode =
68                               WALRecoveryMode::kTolerateCorruptedTailRecords);
69 
70   // Returns the physical offset of the last record returned by ReadRecord.
71   //
72   // Undefined before the first call to ReadRecord.
73   uint64_t LastRecordOffset();
74 
75   // returns true if the reader has encountered an eof condition.
IsEOF()76   bool IsEOF() {
77     return eof_;
78   }
79 
80   // returns true if the reader has encountered read error.
hasReadError()81   bool hasReadError() const { return read_error_; }
82 
83   // when we know more data has been written to the file. we can use this
84   // function to force the reader to look again in the file.
85   // Also aligns the file position indicator to the start of the next block
86   // by reading the rest of the data from the EOF position to the end of the
87   // block that was partially read.
88   virtual void UnmarkEOF();
89 
file()90   SequentialFileReader* file() { return file_.get(); }
91 
GetReporter()92   Reporter* GetReporter() const { return reporter_; }
93 
GetLogNumber()94   uint64_t GetLogNumber() const { return log_number_; }
95 
GetReadOffset()96   size_t GetReadOffset() const {
97     return static_cast<size_t>(end_of_buffer_offset_);
98   }
99 
100  protected:
101   std::shared_ptr<Logger> info_log_;
102   const std::unique_ptr<SequentialFileReader> file_;
103   Reporter* const reporter_;
104   bool const checksum_;
105   char* const backing_store_;
106 
107   // Internal state variables used for reading records
108   Slice buffer_;
109   bool eof_;   // Last Read() indicated EOF by returning < kBlockSize
110   bool read_error_;   // Error occurred while reading from file
111 
112   // Offset of the file position indicator within the last block when an
113   // EOF was detected.
114   size_t eof_offset_;
115 
116   // Offset of the last record returned by ReadRecord.
117   uint64_t last_record_offset_;
118   // Offset of the first location past the end of buffer_.
119   uint64_t end_of_buffer_offset_;
120 
121   // which log number this is
122   uint64_t const log_number_;
123 
124   // Whether this is a recycled log file
125   bool recycled_;
126 
127   // Extend record types with the following special values
128   enum {
129     kEof = kMaxRecordType + 1,
130     // Returned whenever we find an invalid physical record.
131     // Currently there are three situations in which this happens:
132     // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
133     // * The record is a 0-length record (No drop is reported)
134     kBadRecord = kMaxRecordType + 2,
135     // Returned when we fail to read a valid header.
136     kBadHeader = kMaxRecordType + 3,
137     // Returned when we read an old record from a previous user of the log.
138     kOldRecord = kMaxRecordType + 4,
139     // Returned when we get a bad record length
140     kBadRecordLen = kMaxRecordType + 5,
141     // Returned when we get a bad record checksum
142     kBadRecordChecksum = kMaxRecordType + 6,
143   };
144 
145   // Return type, or one of the preceding special values
146   unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size);
147 
148   // Read some more
149   bool ReadMore(size_t* drop_size, int *error);
150 
151   void UnmarkEOFInternal();
152 
153   // Reports dropped bytes to the reporter.
154   // buffer_ must be updated to remove the dropped bytes prior to invocation.
155   void ReportCorruption(size_t bytes, const char* reason);
156   void ReportDrop(size_t bytes, const Status& reason);
157 };
158 
159 class FragmentBufferedReader : public Reader {
160  public:
FragmentBufferedReader(std::shared_ptr<Logger> info_log,std::unique_ptr<SequentialFileReader> && _file,Reporter * reporter,bool checksum,uint64_t log_num)161   FragmentBufferedReader(std::shared_ptr<Logger> info_log,
162                          // @lint-ignore TXT2 T25377293 Grandfathered in
163                          std::unique_ptr<SequentialFileReader>&& _file,
164                          Reporter* reporter, bool checksum, uint64_t log_num)
165       : Reader(info_log, std::move(_file), reporter, checksum, log_num),
166         fragments_(),
167         in_fragmented_record_(false) {}
~FragmentBufferedReader()168   ~FragmentBufferedReader() override {}
169   bool ReadRecord(Slice* record, std::string* scratch,
170                   WALRecoveryMode wal_recovery_mode =
171                       WALRecoveryMode::kTolerateCorruptedTailRecords) override;
172   void UnmarkEOF() override;
173 
174  private:
175   std::string fragments_;
176   bool in_fragmented_record_;
177 
178   bool TryReadFragment(Slice* result, size_t* drop_size,
179                        unsigned int* fragment_type_or_err);
180 
181   bool TryReadMore(size_t* drop_size, int* error);
182 
183   // No copy allowed
184   FragmentBufferedReader(const FragmentBufferedReader&);
185   void operator=(const FragmentBufferedReader&);
186 };
187 
188 }  // namespace log
189 }  // namespace ROCKSDB_NAMESPACE
190