1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/log_reader.h"
11 #include "db/log_writer.h"
12 #include "env/composite_env_wrapper.h"
13 #include "file/sequence_file_reader.h"
14 #include "file/writable_file_writer.h"
15 #include "rocksdb/env.h"
16 #include "test_util/testharness.h"
17 #include "test_util/testutil.h"
18 #include "util/coding.h"
19 #include "util/crc32c.h"
20 #include "util/random.h"
21 
22 namespace ROCKSDB_NAMESPACE {
23 namespace log {
24 
25 // Construct a string of the specified length made out of the supplied
26 // partial string.
BigString(const std::string & partial_string,size_t n)27 static std::string BigString(const std::string& partial_string, size_t n) {
28   std::string result;
29   while (result.size() < n) {
30     result.append(partial_string);
31   }
32   result.resize(n);
33   return result;
34 }
35 
36 // Construct a string from a number
NumberString(int n)37 static std::string NumberString(int n) {
38   char buf[50];
39   snprintf(buf, sizeof(buf), "%d.", n);
40   return std::string(buf);
41 }
42 
43 // Return a skewed potentially long string
RandomSkewedString(int i,Random * rnd)44 static std::string RandomSkewedString(int i, Random* rnd) {
45   return BigString(NumberString(i), rnd->Skewed(17));
46 }
47 
48 // Param type is tuple<int, bool>
49 // get<0>(tuple): non-zero if recycling log, zero if regular log
50 // get<1>(tuple): true if allow retry after read EOF, false otherwise
51 class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
52  private:
53   class StringSource : public SequentialFile {
54    public:
55     Slice& contents_;
56     bool force_error_;
57     size_t force_error_position_;
58     bool force_eof_;
59     size_t force_eof_position_;
60     bool returned_partial_;
61     bool fail_after_read_partial_;
StringSource(Slice & contents,bool fail_after_read_partial)62     explicit StringSource(Slice& contents, bool fail_after_read_partial)
63         : contents_(contents),
64           force_error_(false),
65           force_error_position_(0),
66           force_eof_(false),
67           force_eof_position_(0),
68           returned_partial_(false),
69           fail_after_read_partial_(fail_after_read_partial) {}
70 
Read(size_t n,Slice * result,char * scratch)71     Status Read(size_t n, Slice* result, char* scratch) override {
72       if (fail_after_read_partial_) {
73         EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
74       }
75 
76       if (force_error_) {
77         if (force_error_position_ >= n) {
78           force_error_position_ -= n;
79         } else {
80           *result = Slice(contents_.data(), force_error_position_);
81           contents_.remove_prefix(force_error_position_);
82           force_error_ = false;
83           returned_partial_ = true;
84           return Status::Corruption("read error");
85         }
86       }
87 
88       if (contents_.size() < n) {
89         n = contents_.size();
90         returned_partial_ = true;
91       }
92 
93       if (force_eof_) {
94         if (force_eof_position_ >= n) {
95           force_eof_position_ -= n;
96         } else {
97           force_eof_ = false;
98           n = force_eof_position_;
99           returned_partial_ = true;
100         }
101       }
102 
103       // By using scratch we ensure that caller has control over the
104       // lifetime of result.data()
105       memcpy(scratch, contents_.data(), n);
106       *result = Slice(scratch, n);
107 
108       contents_.remove_prefix(n);
109       return Status::OK();
110     }
111 
Skip(uint64_t n)112     Status Skip(uint64_t n) override {
113       if (n > contents_.size()) {
114         contents_.clear();
115         return Status::NotFound("in-memory file skipepd past end");
116       }
117 
118       contents_.remove_prefix(n);
119 
120       return Status::OK();
121     }
122   };
123 
GetStringSourceFromLegacyReader(SequentialFileReader * reader)124   inline StringSource* GetStringSourceFromLegacyReader(
125       SequentialFileReader* reader) {
126     LegacySequentialFileWrapper* file =
127         static_cast<LegacySequentialFileWrapper*>(reader->file());
128     return static_cast<StringSource*>(file->target());
129   }
130 
131   class ReportCollector : public Reader::Reporter {
132    public:
133     size_t dropped_bytes_;
134     std::string message_;
135 
ReportCollector()136     ReportCollector() : dropped_bytes_(0) { }
Corruption(size_t bytes,const Status & status)137     void Corruption(size_t bytes, const Status& status) override {
138       dropped_bytes_ += bytes;
139       message_.append(status.ToString());
140     }
141   };
142 
dest_contents()143   std::string& dest_contents() {
144     auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
145     assert(dest);
146     return dest->contents_;
147   }
148 
dest_contents() const149   const std::string& dest_contents() const {
150     auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
151     assert(dest);
152     return dest->contents_;
153   }
154 
reset_source_contents()155   void reset_source_contents() {
156     auto src = GetStringSourceFromLegacyReader(reader_->file());
157     assert(src);
158     src->contents_ = dest_contents();
159   }
160 
161   Slice reader_contents_;
162   std::unique_ptr<WritableFileWriter> dest_holder_;
163   std::unique_ptr<SequentialFileReader> source_holder_;
164   ReportCollector report_;
165   Writer writer_;
166   std::unique_ptr<Reader> reader_;
167 
168  protected:
169   bool allow_retry_read_;
170 
171  public:
LogTest()172   LogTest()
173       : reader_contents_(),
174         dest_holder_(test::GetWritableFileWriter(
175             new test::StringSink(&reader_contents_), "" /* don't care */)),
176         source_holder_(test::GetSequentialFileReader(
177             new StringSource(reader_contents_, !std::get<1>(GetParam())),
178             "" /* file name */)),
179         writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
180         allow_retry_read_(std::get<1>(GetParam())) {
181     if (allow_retry_read_) {
182       reader_.reset(new FragmentBufferedReader(
183           nullptr, std::move(source_holder_), &report_, true /* checksum */,
184           123 /* log_number */));
185     } else {
186       reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
187                                true /* checksum */, 123 /* log_number */));
188     }
189   }
190 
get_reader_contents()191   Slice* get_reader_contents() { return &reader_contents_; }
192 
Write(const std::string & msg)193   void Write(const std::string& msg) {
194     writer_.AddRecord(Slice(msg));
195   }
196 
WrittenBytes() const197   size_t WrittenBytes() const {
198     return dest_contents().size();
199   }
200 
Read(const WALRecoveryMode wal_recovery_mode=WALRecoveryMode::kTolerateCorruptedTailRecords)201   std::string Read(const WALRecoveryMode wal_recovery_mode =
202                        WALRecoveryMode::kTolerateCorruptedTailRecords) {
203     std::string scratch;
204     Slice record;
205     bool ret = false;
206     ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
207     if (ret) {
208       return record.ToString();
209     } else {
210       return "EOF";
211     }
212   }
213 
IncrementByte(int offset,char delta)214   void IncrementByte(int offset, char delta) {
215     dest_contents()[offset] += delta;
216   }
217 
SetByte(int offset,char new_byte)218   void SetByte(int offset, char new_byte) {
219     dest_contents()[offset] = new_byte;
220   }
221 
ShrinkSize(int bytes)222   void ShrinkSize(int bytes) {
223     auto dest = test::GetStringSinkFromLegacyWriter(writer_.file());
224     assert(dest);
225     dest->Drop(bytes);
226   }
227 
FixChecksum(int header_offset,int len,bool recyclable)228   void FixChecksum(int header_offset, int len, bool recyclable) {
229     // Compute crc of type/len/data
230     int header_size = recyclable ? kRecyclableHeaderSize : kHeaderSize;
231     uint32_t crc = crc32c::Value(&dest_contents()[header_offset + 6],
232                                  header_size - 6 + len);
233     crc = crc32c::Mask(crc);
234     EncodeFixed32(&dest_contents()[header_offset], crc);
235   }
236 
ForceError(size_t position=0)237   void ForceError(size_t position = 0) {
238     auto src = GetStringSourceFromLegacyReader(reader_->file());
239     src->force_error_ = true;
240     src->force_error_position_ = position;
241   }
242 
DroppedBytes() const243   size_t DroppedBytes() const {
244     return report_.dropped_bytes_;
245   }
246 
ReportMessage() const247   std::string ReportMessage() const {
248     return report_.message_;
249   }
250 
ForceEOF(size_t position=0)251   void ForceEOF(size_t position = 0) {
252     auto src = GetStringSourceFromLegacyReader(reader_->file());
253     src->force_eof_ = true;
254     src->force_eof_position_ = position;
255   }
256 
UnmarkEOF()257   void UnmarkEOF() {
258     auto src = GetStringSourceFromLegacyReader(reader_->file());
259     src->returned_partial_ = false;
260     reader_->UnmarkEOF();
261   }
262 
IsEOF()263   bool IsEOF() { return reader_->IsEOF(); }
264 
265   // Returns OK iff recorded error message contains "msg"
MatchError(const std::string & msg) const266   std::string MatchError(const std::string& msg) const {
267     if (report_.message_.find(msg) == std::string::npos) {
268       return report_.message_;
269     } else {
270       return "OK";
271     }
272   }
273 };
274 
TEST_P(LogTest,Empty)275 TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
276 
TEST_P(LogTest,ReadWrite)277 TEST_P(LogTest, ReadWrite) {
278   Write("foo");
279   Write("bar");
280   Write("");
281   Write("xxxx");
282   ASSERT_EQ("foo", Read());
283   ASSERT_EQ("bar", Read());
284   ASSERT_EQ("", Read());
285   ASSERT_EQ("xxxx", Read());
286   ASSERT_EQ("EOF", Read());
287   ASSERT_EQ("EOF", Read());  // Make sure reads at eof work
288 }
289 
TEST_P(LogTest,ManyBlocks)290 TEST_P(LogTest, ManyBlocks) {
291   for (int i = 0; i < 100000; i++) {
292     Write(NumberString(i));
293   }
294   for (int i = 0; i < 100000; i++) {
295     ASSERT_EQ(NumberString(i), Read());
296   }
297   ASSERT_EQ("EOF", Read());
298 }
299 
TEST_P(LogTest,Fragmentation)300 TEST_P(LogTest, Fragmentation) {
301   Write("small");
302   Write(BigString("medium", 50000));
303   Write(BigString("large", 100000));
304   ASSERT_EQ("small", Read());
305   ASSERT_EQ(BigString("medium", 50000), Read());
306   ASSERT_EQ(BigString("large", 100000), Read());
307   ASSERT_EQ("EOF", Read());
308 }
309 
TEST_P(LogTest,MarginalTrailer)310 TEST_P(LogTest, MarginalTrailer) {
311   // Make a trailer that is exactly the same length as an empty record.
312   int header_size =
313       std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
314   const int n = kBlockSize - 2 * header_size;
315   Write(BigString("foo", n));
316   ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
317   Write("");
318   Write("bar");
319   ASSERT_EQ(BigString("foo", n), Read());
320   ASSERT_EQ("", Read());
321   ASSERT_EQ("bar", Read());
322   ASSERT_EQ("EOF", Read());
323 }
324 
TEST_P(LogTest,MarginalTrailer2)325 TEST_P(LogTest, MarginalTrailer2) {
326   // Make a trailer that is exactly the same length as an empty record.
327   int header_size =
328       std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
329   const int n = kBlockSize - 2 * header_size;
330   Write(BigString("foo", n));
331   ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
332   Write("bar");
333   ASSERT_EQ(BigString("foo", n), Read());
334   ASSERT_EQ("bar", Read());
335   ASSERT_EQ("EOF", Read());
336   ASSERT_EQ(0U, DroppedBytes());
337   ASSERT_EQ("", ReportMessage());
338 }
339 
TEST_P(LogTest,ShortTrailer)340 TEST_P(LogTest, ShortTrailer) {
341   int header_size =
342       std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
343   const int n = kBlockSize - 2 * header_size + 4;
344   Write(BigString("foo", n));
345   ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
346   Write("");
347   Write("bar");
348   ASSERT_EQ(BigString("foo", n), Read());
349   ASSERT_EQ("", Read());
350   ASSERT_EQ("bar", Read());
351   ASSERT_EQ("EOF", Read());
352 }
353 
TEST_P(LogTest,AlignedEof)354 TEST_P(LogTest, AlignedEof) {
355   int header_size =
356       std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
357   const int n = kBlockSize - 2 * header_size + 4;
358   Write(BigString("foo", n));
359   ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
360   ASSERT_EQ(BigString("foo", n), Read());
361   ASSERT_EQ("EOF", Read());
362 }
363 
TEST_P(LogTest,RandomRead)364 TEST_P(LogTest, RandomRead) {
365   const int N = 500;
366   Random write_rnd(301);
367   for (int i = 0; i < N; i++) {
368     Write(RandomSkewedString(i, &write_rnd));
369   }
370   Random read_rnd(301);
371   for (int i = 0; i < N; i++) {
372     ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
373   }
374   ASSERT_EQ("EOF", Read());
375 }
376 
377 // Tests of all the error paths in log_reader.cc follow:
378 
TEST_P(LogTest,ReadError)379 TEST_P(LogTest, ReadError) {
380   Write("foo");
381   ForceError();
382   ASSERT_EQ("EOF", Read());
383   ASSERT_EQ((unsigned int)kBlockSize, DroppedBytes());
384   ASSERT_EQ("OK", MatchError("read error"));
385 }
386 
TEST_P(LogTest,BadRecordType)387 TEST_P(LogTest, BadRecordType) {
388   Write("foo");
389   // Type is stored in header[6]
390   IncrementByte(6, 100);
391   FixChecksum(0, 3, false);
392   ASSERT_EQ("EOF", Read());
393   ASSERT_EQ(3U, DroppedBytes());
394   ASSERT_EQ("OK", MatchError("unknown record type"));
395 }
396 
TEST_P(LogTest,TruncatedTrailingRecordIsIgnored)397 TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
398   Write("foo");
399   ShrinkSize(4);   // Drop all payload as well as a header byte
400   ASSERT_EQ("EOF", Read());
401   // Truncated last record is ignored, not treated as an error
402   ASSERT_EQ(0U, DroppedBytes());
403   ASSERT_EQ("", ReportMessage());
404 }
405 
TEST_P(LogTest,TruncatedTrailingRecordIsNotIgnored)406 TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
407   if (allow_retry_read_) {
408     // If read retry is allowed, then truncated trailing record should not
409     // raise an error.
410     return;
411   }
412   Write("foo");
413   ShrinkSize(4);  // Drop all payload as well as a header byte
414   ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
415   // Truncated last record is ignored, not treated as an error
416   ASSERT_GT(DroppedBytes(), 0U);
417   ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
418 }
419 
TEST_P(LogTest,BadLength)420 TEST_P(LogTest, BadLength) {
421   if (allow_retry_read_) {
422     // If read retry is allowed, then we should not raise an error when the
423     // record length specified in header is longer than data currently
424     // available. It's possible that the body of the record is not written yet.
425     return;
426   }
427   bool recyclable_log = (std::get<0>(GetParam()) != 0);
428   int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
429   const int kPayloadSize = kBlockSize - header_size;
430   Write(BigString("bar", kPayloadSize));
431   Write("foo");
432   // Least significant size byte is stored in header[4].
433   IncrementByte(4, 1);
434   if (!recyclable_log) {
435     ASSERT_EQ("foo", Read());
436     ASSERT_EQ(kBlockSize, DroppedBytes());
437     ASSERT_EQ("OK", MatchError("bad record length"));
438   } else {
439     ASSERT_EQ("EOF", Read());
440   }
441 }
442 
TEST_P(LogTest,BadLengthAtEndIsIgnored)443 TEST_P(LogTest, BadLengthAtEndIsIgnored) {
444   if (allow_retry_read_) {
445     // If read retry is allowed, then we should not raise an error when the
446     // record length specified in header is longer than data currently
447     // available. It's possible that the body of the record is not written yet.
448     return;
449   }
450   Write("foo");
451   ShrinkSize(1);
452   ASSERT_EQ("EOF", Read());
453   ASSERT_EQ(0U, DroppedBytes());
454   ASSERT_EQ("", ReportMessage());
455 }
456 
TEST_P(LogTest,BadLengthAtEndIsNotIgnored)457 TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
458   if (allow_retry_read_) {
459     // If read retry is allowed, then we should not raise an error when the
460     // record length specified in header is longer than data currently
461     // available. It's possible that the body of the record is not written yet.
462     return;
463   }
464   Write("foo");
465   ShrinkSize(1);
466   ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
467   ASSERT_GT(DroppedBytes(), 0U);
468   ASSERT_EQ("OK", MatchError("Corruption: truncated header"));
469 }
470 
TEST_P(LogTest,ChecksumMismatch)471 TEST_P(LogTest, ChecksumMismatch) {
472   Write("foooooo");
473   IncrementByte(0, 14);
474   ASSERT_EQ("EOF", Read());
475   bool recyclable_log = (std::get<0>(GetParam()) != 0);
476   if (!recyclable_log) {
477     ASSERT_EQ(14U, DroppedBytes());
478     ASSERT_EQ("OK", MatchError("checksum mismatch"));
479   } else {
480     ASSERT_EQ(0U, DroppedBytes());
481     ASSERT_EQ("", ReportMessage());
482   }
483 }
484 
TEST_P(LogTest,UnexpectedMiddleType)485 TEST_P(LogTest, UnexpectedMiddleType) {
486   Write("foo");
487   bool recyclable_log = (std::get<0>(GetParam()) != 0);
488   SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
489                                               : kMiddleType));
490   FixChecksum(0, 3, !!recyclable_log);
491   ASSERT_EQ("EOF", Read());
492   ASSERT_EQ(3U, DroppedBytes());
493   ASSERT_EQ("OK", MatchError("missing start"));
494 }
495 
TEST_P(LogTest,UnexpectedLastType)496 TEST_P(LogTest, UnexpectedLastType) {
497   Write("foo");
498   bool recyclable_log = (std::get<0>(GetParam()) != 0);
499   SetByte(6,
500           static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
501   FixChecksum(0, 3, !!recyclable_log);
502   ASSERT_EQ("EOF", Read());
503   ASSERT_EQ(3U, DroppedBytes());
504   ASSERT_EQ("OK", MatchError("missing start"));
505 }
506 
TEST_P(LogTest,UnexpectedFullType)507 TEST_P(LogTest, UnexpectedFullType) {
508   Write("foo");
509   Write("bar");
510   bool recyclable_log = (std::get<0>(GetParam()) != 0);
511   SetByte(
512       6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
513   FixChecksum(0, 3, !!recyclable_log);
514   ASSERT_EQ("bar", Read());
515   ASSERT_EQ("EOF", Read());
516   ASSERT_EQ(3U, DroppedBytes());
517   ASSERT_EQ("OK", MatchError("partial record without end"));
518 }
519 
TEST_P(LogTest,UnexpectedFirstType)520 TEST_P(LogTest, UnexpectedFirstType) {
521   Write("foo");
522   Write(BigString("bar", 100000));
523   bool recyclable_log = (std::get<0>(GetParam()) != 0);
524   SetByte(
525       6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
526   FixChecksum(0, 3, !!recyclable_log);
527   ASSERT_EQ(BigString("bar", 100000), Read());
528   ASSERT_EQ("EOF", Read());
529   ASSERT_EQ(3U, DroppedBytes());
530   ASSERT_EQ("OK", MatchError("partial record without end"));
531 }
532 
TEST_P(LogTest,MissingLastIsIgnored)533 TEST_P(LogTest, MissingLastIsIgnored) {
534   Write(BigString("bar", kBlockSize));
535   // Remove the LAST block, including header.
536   ShrinkSize(14);
537   ASSERT_EQ("EOF", Read());
538   ASSERT_EQ("", ReportMessage());
539   ASSERT_EQ(0U, DroppedBytes());
540 }
541 
TEST_P(LogTest,MissingLastIsNotIgnored)542 TEST_P(LogTest, MissingLastIsNotIgnored) {
543   if (allow_retry_read_) {
544     // If read retry is allowed, then truncated trailing record should not
545     // raise an error.
546     return;
547   }
548   Write(BigString("bar", kBlockSize));
549   // Remove the LAST block, including header.
550   ShrinkSize(14);
551   ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
552   ASSERT_GT(DroppedBytes(), 0U);
553   ASSERT_EQ("OK", MatchError("Corruption: error reading trailing data"));
554 }
555 
TEST_P(LogTest,PartialLastIsIgnored)556 TEST_P(LogTest, PartialLastIsIgnored) {
557   Write(BigString("bar", kBlockSize));
558   // Cause a bad record length in the LAST block.
559   ShrinkSize(1);
560   ASSERT_EQ("EOF", Read());
561   ASSERT_EQ("", ReportMessage());
562   ASSERT_EQ(0U, DroppedBytes());
563 }
564 
TEST_P(LogTest,PartialLastIsNotIgnored)565 TEST_P(LogTest, PartialLastIsNotIgnored) {
566   if (allow_retry_read_) {
567     // If read retry is allowed, then truncated trailing record should not
568     // raise an error.
569     return;
570   }
571   Write(BigString("bar", kBlockSize));
572   // Cause a bad record length in the LAST block.
573   ShrinkSize(1);
574   ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
575   ASSERT_GT(DroppedBytes(), 0U);
576   ASSERT_EQ("OK", MatchError(
577                       "Corruption: truncated headerCorruption: "
578                       "error reading trailing data"));
579 }
580 
TEST_P(LogTest,ErrorJoinsRecords)581 TEST_P(LogTest, ErrorJoinsRecords) {
582   // Consider two fragmented records:
583   //    first(R1) last(R1) first(R2) last(R2)
584   // where the middle two fragments disappear.  We do not want
585   // first(R1),last(R2) to get joined and returned as a valid record.
586 
587   // Write records that span two blocks
588   Write(BigString("foo", kBlockSize));
589   Write(BigString("bar", kBlockSize));
590   Write("correct");
591 
592   // Wipe the middle block
593   for (unsigned int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
594     SetByte(offset, 'x');
595   }
596 
597   bool recyclable_log = (std::get<0>(GetParam()) != 0);
598   if (!recyclable_log) {
599     ASSERT_EQ("correct", Read());
600     ASSERT_EQ("EOF", Read());
601     size_t dropped = DroppedBytes();
602     ASSERT_LE(dropped, 2 * kBlockSize + 100);
603     ASSERT_GE(dropped, 2 * kBlockSize);
604   } else {
605     ASSERT_EQ("EOF", Read());
606   }
607 }
608 
TEST_P(LogTest,ClearEofSingleBlock)609 TEST_P(LogTest, ClearEofSingleBlock) {
610   Write("foo");
611   Write("bar");
612   bool recyclable_log = (std::get<0>(GetParam()) != 0);
613   int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
614   ForceEOF(3 + header_size + 2);
615   ASSERT_EQ("foo", Read());
616   UnmarkEOF();
617   ASSERT_EQ("bar", Read());
618   ASSERT_TRUE(IsEOF());
619   ASSERT_EQ("EOF", Read());
620   Write("xxx");
621   UnmarkEOF();
622   ASSERT_EQ("xxx", Read());
623   ASSERT_TRUE(IsEOF());
624 }
625 
TEST_P(LogTest,ClearEofMultiBlock)626 TEST_P(LogTest, ClearEofMultiBlock) {
627   size_t num_full_blocks = 5;
628   bool recyclable_log = (std::get<0>(GetParam()) != 0);
629   int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
630   size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
631   Write(BigString("foo", n));
632   Write(BigString("bar", n));
633   ForceEOF(n + num_full_blocks * header_size + header_size + 3);
634   ASSERT_EQ(BigString("foo", n), Read());
635   ASSERT_TRUE(IsEOF());
636   UnmarkEOF();
637   ASSERT_EQ(BigString("bar", n), Read());
638   ASSERT_TRUE(IsEOF());
639   Write(BigString("xxx", n));
640   UnmarkEOF();
641   ASSERT_EQ(BigString("xxx", n), Read());
642   ASSERT_TRUE(IsEOF());
643 }
644 
TEST_P(LogTest,ClearEofError)645 TEST_P(LogTest, ClearEofError) {
646   // If an error occurs during Read() in UnmarkEOF(), the records contained
647   // in the buffer should be returned on subsequent calls of ReadRecord()
648   // until no more full records are left, whereafter ReadRecord() should return
649   // false to indicate that it cannot read any further.
650 
651   Write("foo");
652   Write("bar");
653   UnmarkEOF();
654   ASSERT_EQ("foo", Read());
655   ASSERT_TRUE(IsEOF());
656   Write("xxx");
657   ForceError(0);
658   UnmarkEOF();
659   ASSERT_EQ("bar", Read());
660   ASSERT_EQ("EOF", Read());
661 }
662 
TEST_P(LogTest,ClearEofError2)663 TEST_P(LogTest, ClearEofError2) {
664   Write("foo");
665   Write("bar");
666   UnmarkEOF();
667   ASSERT_EQ("foo", Read());
668   Write("xxx");
669   ForceError(3);
670   UnmarkEOF();
671   ASSERT_EQ("bar", Read());
672   ASSERT_EQ("EOF", Read());
673   ASSERT_EQ(3U, DroppedBytes());
674   ASSERT_EQ("OK", MatchError("read error"));
675 }
676 
TEST_P(LogTest,Recycle)677 TEST_P(LogTest, Recycle) {
678   bool recyclable_log = (std::get<0>(GetParam()) != 0);
679   if (!recyclable_log) {
680     return;  // test is only valid for recycled logs
681   }
682   Write("foo");
683   Write("bar");
684   Write("baz");
685   Write("bif");
686   Write("blitz");
687   while (get_reader_contents()->size() < log::kBlockSize * 2) {
688     Write("xxxxxxxxxxxxxxxx");
689   }
690   std::unique_ptr<WritableFileWriter> dest_holder(test::GetWritableFileWriter(
691       new test::OverwritingStringSink(get_reader_contents()),
692       "" /* don't care */));
693   Writer recycle_writer(std::move(dest_holder), 123, true);
694   recycle_writer.AddRecord(Slice("foooo"));
695   recycle_writer.AddRecord(Slice("bar"));
696   ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
697   ASSERT_EQ("foooo", Read());
698   ASSERT_EQ("bar", Read());
699   ASSERT_EQ("EOF", Read());
700 }
701 
702 INSTANTIATE_TEST_CASE_P(bool, LogTest,
703                         ::testing::Values(std::make_tuple(0, false),
704                                           std::make_tuple(0, true),
705                                           std::make_tuple(1, false),
706                                           std::make_tuple(1, true)));
707 
708 class RetriableLogTest : public ::testing::TestWithParam<int> {
709  private:
710   class ReportCollector : public Reader::Reporter {
711    public:
712     size_t dropped_bytes_;
713     std::string message_;
714 
ReportCollector()715     ReportCollector() : dropped_bytes_(0) {}
Corruption(size_t bytes,const Status & status)716     void Corruption(size_t bytes, const Status& status) override {
717       dropped_bytes_ += bytes;
718       message_.append(status.ToString());
719     }
720   };
721 
722   Slice contents_;
723   std::unique_ptr<WritableFileWriter> dest_holder_;
724   std::unique_ptr<Writer> log_writer_;
725   Env* env_;
726   EnvOptions env_options_;
727   const std::string test_dir_;
728   const std::string log_file_;
729   std::unique_ptr<WritableFileWriter> writer_;
730   std::unique_ptr<SequentialFileReader> reader_;
731   ReportCollector report_;
732   std::unique_ptr<FragmentBufferedReader> log_reader_;
733 
734  public:
RetriableLogTest()735   RetriableLogTest()
736       : contents_(),
737         dest_holder_(nullptr),
738         log_writer_(nullptr),
739         env_(Env::Default()),
740         test_dir_(test::PerThreadDBPath("retriable_log_test")),
741         log_file_(test_dir_ + "/log"),
742         writer_(nullptr),
743         reader_(nullptr),
744         log_reader_(nullptr) {}
745 
SetupTestEnv()746   Status SetupTestEnv() {
747     dest_holder_.reset(test::GetWritableFileWriter(
748         new test::StringSink(&contents_), "" /* file name */));
749     assert(dest_holder_ != nullptr);
750     log_writer_.reset(new Writer(std::move(dest_holder_), 123, GetParam()));
751     assert(log_writer_ != nullptr);
752 
753     Status s;
754     s = env_->CreateDirIfMissing(test_dir_);
755     std::unique_ptr<WritableFile> writable_file;
756     if (s.ok()) {
757       s = env_->NewWritableFile(log_file_, &writable_file, env_options_);
758     }
759     if (s.ok()) {
760       writer_.reset(new WritableFileWriter(
761           NewLegacyWritableFileWrapper(std::move(writable_file)), log_file_,
762           env_options_));
763       assert(writer_ != nullptr);
764     }
765     std::unique_ptr<SequentialFile> seq_file;
766     if (s.ok()) {
767       s = env_->NewSequentialFile(log_file_, &seq_file, env_options_);
768     }
769     if (s.ok()) {
770       reader_.reset(new SequentialFileReader(
771           NewLegacySequentialFileWrapper(seq_file), log_file_));
772       assert(reader_ != nullptr);
773       log_reader_.reset(new FragmentBufferedReader(
774           nullptr, std::move(reader_), &report_, true /* checksum */,
775           123 /* log_number */));
776       assert(log_reader_ != nullptr);
777     }
778     return s;
779   }
780 
contents()781   std::string contents() {
782     auto file = test::GetStringSinkFromLegacyWriter(log_writer_->file());
783     assert(file != nullptr);
784     return file->contents_;
785   }
786 
Encode(const std::string & msg)787   void Encode(const std::string& msg) { log_writer_->AddRecord(Slice(msg)); }
788 
Write(const Slice & data)789   void Write(const Slice& data) {
790     writer_->Append(data);
791     writer_->Sync(true);
792   }
793 
TryRead(std::string * result)794   bool TryRead(std::string* result) {
795     assert(result != nullptr);
796     result->clear();
797     std::string scratch;
798     Slice record;
799     bool r = log_reader_->ReadRecord(&record, &scratch);
800     if (r) {
801       result->assign(record.data(), record.size());
802       return true;
803     } else {
804       return false;
805     }
806   }
807 };
808 
TEST_P(RetriableLogTest,TailLog_PartialHeader)809 TEST_P(RetriableLogTest, TailLog_PartialHeader) {
810   ASSERT_OK(SetupTestEnv());
811   std::vector<int> remaining_bytes_in_last_record;
812   size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
813   bool eof = false;
814   SyncPoint::GetInstance()->DisableProcessing();
815   SyncPoint::GetInstance()->LoadDependency(
816       {{"RetriableLogTest::TailLog:AfterPart1",
817         "RetriableLogTest::TailLog:BeforeReadRecord"},
818        {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
819         "RetriableLogTest::TailLog:BeforePart2"}});
820   SyncPoint::GetInstance()->ClearAllCallBacks();
821   SyncPoint::GetInstance()->SetCallBack(
822       "FragmentBufferedLogReader::TryReadMore:FirstEOF",
823       [&](void* /*arg*/) { eof = true; });
824   SyncPoint::GetInstance()->EnableProcessing();
825 
826   size_t delta = header_size - 1;
827   port::Thread log_writer_thread([&]() {
828     size_t old_sz = contents().size();
829     Encode("foo");
830     size_t new_sz = contents().size();
831     std::string part1 = contents().substr(old_sz, delta);
832     std::string part2 =
833         contents().substr(old_sz + delta, new_sz - old_sz - delta);
834     Write(Slice(part1));
835     TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
836     TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
837     Write(Slice(part2));
838   });
839 
840   std::string record;
841   port::Thread log_reader_thread([&]() {
842     TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
843     while (!TryRead(&record)) {
844     }
845   });
846   log_reader_thread.join();
847   log_writer_thread.join();
848   ASSERT_EQ("foo", record);
849   ASSERT_TRUE(eof);
850 }
851 
TEST_P(RetriableLogTest,TailLog_FullHeader)852 TEST_P(RetriableLogTest, TailLog_FullHeader) {
853   ASSERT_OK(SetupTestEnv());
854   std::vector<int> remaining_bytes_in_last_record;
855   size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
856   bool eof = false;
857   SyncPoint::GetInstance()->DisableProcessing();
858   SyncPoint::GetInstance()->LoadDependency(
859       {{"RetriableLogTest::TailLog:AfterPart1",
860         "RetriableLogTest::TailLog:BeforeReadRecord"},
861        {"FragmentBufferedLogReader::TryReadMore:FirstEOF",
862         "RetriableLogTest::TailLog:BeforePart2"}});
863   SyncPoint::GetInstance()->ClearAllCallBacks();
864   SyncPoint::GetInstance()->SetCallBack(
865       "FragmentBufferedLogReader::TryReadMore:FirstEOF",
866       [&](void* /*arg*/) { eof = true; });
867   SyncPoint::GetInstance()->EnableProcessing();
868 
869   size_t delta = header_size + 1;
870   port::Thread log_writer_thread([&]() {
871     size_t old_sz = contents().size();
872     Encode("foo");
873     size_t new_sz = contents().size();
874     std::string part1 = contents().substr(old_sz, delta);
875     std::string part2 =
876         contents().substr(old_sz + delta, new_sz - old_sz - delta);
877     Write(Slice(part1));
878     TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
879     TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
880     Write(Slice(part2));
881     ASSERT_TRUE(eof);
882   });
883 
884   std::string record;
885   port::Thread log_reader_thread([&]() {
886     TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
887     while (!TryRead(&record)) {
888     }
889   });
890   log_reader_thread.join();
891   log_writer_thread.join();
892   ASSERT_EQ("foo", record);
893 }
894 
TEST_P(RetriableLogTest,NonBlockingReadFullRecord)895 TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
896   // Clear all sync point callbacks even if this test does not use sync point.
897   // It is necessary, otherwise the execute of this test may hit a sync point
898   // with which a callback is registered. The registered callback may access
899   // some dead variable, causing segfault.
900   SyncPoint::GetInstance()->DisableProcessing();
901   SyncPoint::GetInstance()->ClearAllCallBacks();
902   ASSERT_OK(SetupTestEnv());
903   size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
904   size_t delta = header_size - 1;
905   size_t old_sz = contents().size();
906   Encode("foo-bar");
907   size_t new_sz = contents().size();
908   std::string part1 = contents().substr(old_sz, delta);
909   std::string part2 =
910       contents().substr(old_sz + delta, new_sz - old_sz - delta);
911   Write(Slice(part1));
912   std::string record;
913   ASSERT_FALSE(TryRead(&record));
914   ASSERT_TRUE(record.empty());
915   Write(Slice(part2));
916   ASSERT_TRUE(TryRead(&record));
917   ASSERT_EQ("foo-bar", record);
918 }
919 
920 INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
921 
922 }  // namespace log
923 }  // namespace ROCKSDB_NAMESPACE
924 
main(int argc,char ** argv)925 int main(int argc, char** argv) {
926   ::testing::InitGoogleTest(&argc, argv);
927   return RUN_ALL_TESTS();
928 }
929