1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #pragma once 11 #include <memory> 12 #include <stdint.h> 13 14 #include "db/log_format.h" 15 #include "file/sequence_file_reader.h" 16 #include "rocksdb/options.h" 17 #include "rocksdb/slice.h" 18 #include "rocksdb/status.h" 19 20 namespace ROCKSDB_NAMESPACE { 21 class Logger; 22 23 namespace log { 24 25 /** 26 * Reader is a general purpose log stream reader implementation. The actual job 27 * of reading from the device is implemented by the SequentialFile interface. 28 * 29 * Please see Writer for details on the file and record layout. 30 */ 31 class Reader { 32 public: 33 // Interface for reporting errors. 34 class Reporter { 35 public: 36 virtual ~Reporter(); 37 38 // Some corruption was detected. "size" is the approximate number 39 // of bytes dropped due to the corruption. 40 virtual void Corruption(size_t bytes, const Status& status) = 0; 41 }; 42 43 // Create a reader that will return log records from "*file". 44 // "*file" must remain live while this Reader is in use. 45 // 46 // If "reporter" is non-nullptr, it is notified whenever some data is 47 // dropped due to a detected corruption. "*reporter" must remain 48 // live while this Reader is in use. 49 // 50 // If "checksum" is true, verify checksums if available. 51 Reader(std::shared_ptr<Logger> info_log, 52 // @lint-ignore TXT2 T25377293 Grandfathered in 53 std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter, 54 bool checksum, uint64_t log_num); 55 // No copying allowed 56 Reader(const Reader&) = delete; 57 void operator=(const Reader&) = delete; 58 59 virtual ~Reader(); 60 61 // Read the next record into *record. Returns true if read 62 // successfully, false if we hit end of the input. May use 63 // "*scratch" as temporary storage. The contents filled in *record 64 // will only be valid until the next mutating operation on this 65 // reader or the next mutation to *scratch. 66 virtual bool ReadRecord(Slice* record, std::string* scratch, 67 WALRecoveryMode wal_recovery_mode = 68 WALRecoveryMode::kTolerateCorruptedTailRecords); 69 70 // Returns the physical offset of the last record returned by ReadRecord. 71 // 72 // Undefined before the first call to ReadRecord. 73 uint64_t LastRecordOffset(); 74 75 // returns true if the reader has encountered an eof condition. IsEOF()76 bool IsEOF() { 77 return eof_; 78 } 79 80 // returns true if the reader has encountered read error. hasReadError()81 bool hasReadError() const { return read_error_; } 82 83 // when we know more data has been written to the file. we can use this 84 // function to force the reader to look again in the file. 85 // Also aligns the file position indicator to the start of the next block 86 // by reading the rest of the data from the EOF position to the end of the 87 // block that was partially read. 88 virtual void UnmarkEOF(); 89 file()90 SequentialFileReader* file() { return file_.get(); } 91 GetReporter()92 Reporter* GetReporter() const { return reporter_; } 93 GetLogNumber()94 uint64_t GetLogNumber() const { return log_number_; } 95 GetReadOffset()96 size_t GetReadOffset() const { 97 return static_cast<size_t>(end_of_buffer_offset_); 98 } 99 100 protected: 101 std::shared_ptr<Logger> info_log_; 102 const std::unique_ptr<SequentialFileReader> file_; 103 Reporter* const reporter_; 104 bool const checksum_; 105 char* const backing_store_; 106 107 // Internal state variables used for reading records 108 Slice buffer_; 109 bool eof_; // Last Read() indicated EOF by returning < kBlockSize 110 bool read_error_; // Error occurred while reading from file 111 112 // Offset of the file position indicator within the last block when an 113 // EOF was detected. 114 size_t eof_offset_; 115 116 // Offset of the last record returned by ReadRecord. 117 uint64_t last_record_offset_; 118 // Offset of the first location past the end of buffer_. 119 uint64_t end_of_buffer_offset_; 120 121 // which log number this is 122 uint64_t const log_number_; 123 124 // Whether this is a recycled log file 125 bool recycled_; 126 127 // Extend record types with the following special values 128 enum { 129 kEof = kMaxRecordType + 1, 130 // Returned whenever we find an invalid physical record. 131 // Currently there are three situations in which this happens: 132 // * The record has an invalid CRC (ReadPhysicalRecord reports a drop) 133 // * The record is a 0-length record (No drop is reported) 134 kBadRecord = kMaxRecordType + 2, 135 // Returned when we fail to read a valid header. 136 kBadHeader = kMaxRecordType + 3, 137 // Returned when we read an old record from a previous user of the log. 138 kOldRecord = kMaxRecordType + 4, 139 // Returned when we get a bad record length 140 kBadRecordLen = kMaxRecordType + 5, 141 // Returned when we get a bad record checksum 142 kBadRecordChecksum = kMaxRecordType + 6, 143 }; 144 145 // Return type, or one of the preceding special values 146 unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size); 147 148 // Read some more 149 bool ReadMore(size_t* drop_size, int *error); 150 151 void UnmarkEOFInternal(); 152 153 // Reports dropped bytes to the reporter. 154 // buffer_ must be updated to remove the dropped bytes prior to invocation. 155 void ReportCorruption(size_t bytes, const char* reason); 156 void ReportDrop(size_t bytes, const Status& reason); 157 }; 158 159 class FragmentBufferedReader : public Reader { 160 public: FragmentBufferedReader(std::shared_ptr<Logger> info_log,std::unique_ptr<SequentialFileReader> && _file,Reporter * reporter,bool checksum,uint64_t log_num)161 FragmentBufferedReader(std::shared_ptr<Logger> info_log, 162 // @lint-ignore TXT2 T25377293 Grandfathered in 163 std::unique_ptr<SequentialFileReader>&& _file, 164 Reporter* reporter, bool checksum, uint64_t log_num) 165 : Reader(info_log, std::move(_file), reporter, checksum, log_num), 166 fragments_(), 167 in_fragmented_record_(false) {} ~FragmentBufferedReader()168 ~FragmentBufferedReader() override {} 169 bool ReadRecord(Slice* record, std::string* scratch, 170 WALRecoveryMode wal_recovery_mode = 171 WALRecoveryMode::kTolerateCorruptedTailRecords) override; 172 void UnmarkEOF() override; 173 174 private: 175 std::string fragments_; 176 bool in_fragmented_record_; 177 178 bool TryReadFragment(Slice* result, size_t* drop_size, 179 unsigned int* fragment_type_or_err); 180 181 bool TryReadMore(size_t* drop_size, int* error); 182 183 // No copy allowed 184 FragmentBufferedReader(const FragmentBufferedReader&); 185 void operator=(const FragmentBufferedReader&); 186 }; 187 188 } // namespace log 189 } // namespace ROCKSDB_NAMESPACE 190