1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 // Introduction of SyncPoint effectively disabled building and running this test
11 // in Release build.
12 // which is a pity, it is a good test
13 #if !defined(ROCKSDB_LITE)
14 
15 #include "db/db_test_util.h"
16 #include "port/stack_trace.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
20 class DBTestXactLogIterator : public DBTestBase {
21  public:
DBTestXactLogIterator()22   DBTestXactLogIterator() : DBTestBase("/db_log_iter_test") {}
23 
OpenTransactionLogIter(const SequenceNumber seq)24   std::unique_ptr<TransactionLogIterator> OpenTransactionLogIter(
25       const SequenceNumber seq) {
26     std::unique_ptr<TransactionLogIterator> iter;
27     Status status = dbfull()->GetUpdatesSince(seq, &iter);
28     EXPECT_OK(status);
29     EXPECT_TRUE(iter->Valid());
30     return iter;
31   }
32 };
33 
34 namespace {
ReadRecords(std::unique_ptr<TransactionLogIterator> & iter,int & count)35 SequenceNumber ReadRecords(
36     std::unique_ptr<TransactionLogIterator>& iter,
37     int& count) {
38   count = 0;
39   SequenceNumber lastSequence = 0;
40   BatchResult res;
41   while (iter->Valid()) {
42     res = iter->GetBatch();
43     EXPECT_TRUE(res.sequence > lastSequence);
44     ++count;
45     lastSequence = res.sequence;
46     EXPECT_OK(iter->status());
47     iter->Next();
48   }
49   return res.sequence;
50 }
51 
ExpectRecords(const int expected_no_records,std::unique_ptr<TransactionLogIterator> & iter)52 void ExpectRecords(
53     const int expected_no_records,
54     std::unique_ptr<TransactionLogIterator>& iter) {
55   int num_records;
56   ReadRecords(iter, num_records);
57   ASSERT_EQ(num_records, expected_no_records);
58 }
59 }  // namespace
60 
TEST_F(DBTestXactLogIterator,TransactionLogIterator)61 TEST_F(DBTestXactLogIterator, TransactionLogIterator) {
62   do {
63     Options options = OptionsForLogIterTest();
64     DestroyAndReopen(options);
65     CreateAndReopenWithCF({"pikachu"}, options);
66     Put(0, "key1", DummyString(1024));
67     Put(1, "key2", DummyString(1024));
68     Put(1, "key2", DummyString(1024));
69     ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 3U);
70     {
71       auto iter = OpenTransactionLogIter(0);
72       ExpectRecords(3, iter);
73     }
74     ReopenWithColumnFamilies({"default", "pikachu"}, options);
75     env_->SleepForMicroseconds(2 * 1000 * 1000);
76     {
77       Put(0, "key4", DummyString(1024));
78       Put(1, "key5", DummyString(1024));
79       Put(0, "key6", DummyString(1024));
80     }
81     {
82       auto iter = OpenTransactionLogIter(0);
83       ExpectRecords(6, iter);
84     }
85   } while (ChangeCompactOptions());
86 }
87 
88 #ifndef NDEBUG  // sync point is not included with DNDEBUG build
TEST_F(DBTestXactLogIterator,TransactionLogIteratorRace)89 TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
90   static const int LOG_ITERATOR_RACE_TEST_COUNT = 2;
91   static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = {
92       {"WalManager::GetSortedWalFiles:1",  "WalManager::PurgeObsoleteFiles:1",
93        "WalManager::PurgeObsoleteFiles:2", "WalManager::GetSortedWalFiles:2"},
94       {"WalManager::GetSortedWalsOfType:1",
95        "WalManager::PurgeObsoleteFiles:1",
96        "WalManager::PurgeObsoleteFiles:2",
97        "WalManager::GetSortedWalsOfType:2"}};
98   for (int test = 0; test < LOG_ITERATOR_RACE_TEST_COUNT; ++test) {
99     // Setup sync point dependency to reproduce the race condition of
100     // a log file moved to archived dir, in the middle of GetSortedWalFiles
101     ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
102         {sync_points[test][0], sync_points[test][1]},
103         {sync_points[test][2], sync_points[test][3]},
104     });
105 
106     do {
107       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
108       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
109       Options options = OptionsForLogIterTest();
110       DestroyAndReopen(options);
111       Put("key1", DummyString(1024));
112       dbfull()->Flush(FlushOptions());
113       Put("key2", DummyString(1024));
114       dbfull()->Flush(FlushOptions());
115       Put("key3", DummyString(1024));
116       dbfull()->Flush(FlushOptions());
117       Put("key4", DummyString(1024));
118       ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
119       dbfull()->FlushWAL(false);
120 
121       {
122         auto iter = OpenTransactionLogIter(0);
123         ExpectRecords(4, iter);
124       }
125 
126       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
127       // trigger async flush, and log move. Well, log move will
128       // wait until the GetSortedWalFiles:1 to reproduce the race
129       // condition
130       FlushOptions flush_options;
131       flush_options.wait = false;
132       dbfull()->Flush(flush_options);
133 
134       // "key5" would be written in a new memtable and log
135       Put("key5", DummyString(1024));
136       dbfull()->FlushWAL(false);
137       {
138         // this iter would miss "key4" if not fixed
139         auto iter = OpenTransactionLogIter(0);
140         ExpectRecords(5, iter);
141       }
142     } while (ChangeCompactOptions());
143   }
144 }
145 #endif
146 
TEST_F(DBTestXactLogIterator,TransactionLogIteratorStallAtLastRecord)147 TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
148   do {
149     Options options = OptionsForLogIterTest();
150     DestroyAndReopen(options);
151     Put("key1", DummyString(1024));
152     auto iter = OpenTransactionLogIter(0);
153     ASSERT_OK(iter->status());
154     ASSERT_TRUE(iter->Valid());
155     iter->Next();
156     ASSERT_TRUE(!iter->Valid());
157     ASSERT_OK(iter->status());
158     Put("key2", DummyString(1024));
159     iter->Next();
160     ASSERT_OK(iter->status());
161     ASSERT_TRUE(iter->Valid());
162   } while (ChangeCompactOptions());
163 }
164 
TEST_F(DBTestXactLogIterator,TransactionLogIteratorCheckAfterRestart)165 TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
166   do {
167     Options options = OptionsForLogIterTest();
168     DestroyAndReopen(options);
169     Put("key1", DummyString(1024));
170     Put("key2", DummyString(1023));
171     dbfull()->Flush(FlushOptions());
172     Reopen(options);
173     auto iter = OpenTransactionLogIter(0);
174     ExpectRecords(2, iter);
175   } while (ChangeCompactOptions());
176 }
177 
TEST_F(DBTestXactLogIterator,TransactionLogIteratorCorruptedLog)178 TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
179   do {
180     Options options = OptionsForLogIterTest();
181     DestroyAndReopen(options);
182     for (int i = 0; i < 1024; i++) {
183       Put("key"+ToString(i), DummyString(10));
184     }
185     dbfull()->Flush(FlushOptions());
186     dbfull()->FlushWAL(false);
187     // Corrupt this log to create a gap
188     ROCKSDB_NAMESPACE::VectorLogPtr wal_files;
189     ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
190     const auto logfile_path = dbname_ + "/" + wal_files.front()->PathName();
191     if (mem_env_) {
192       mem_env_->Truncate(logfile_path, wal_files.front()->SizeFileBytes() / 2);
193     } else {
194       ASSERT_EQ(0, truncate(logfile_path.c_str(),
195                    wal_files.front()->SizeFileBytes() / 2));
196     }
197 
198     // Insert a new entry to a new log file
199     Put("key1025", DummyString(10));
200     dbfull()->FlushWAL(false);
201     // Try to read from the beginning. Should stop before the gap and read less
202     // than 1025 entries
203     auto iter = OpenTransactionLogIter(0);
204     int count;
205     SequenceNumber last_sequence_read = ReadRecords(iter, count);
206     ASSERT_LT(last_sequence_read, 1025U);
207     // Try to read past the gap, should be able to seek to key1025
208     auto iter2 = OpenTransactionLogIter(last_sequence_read + 1);
209     ExpectRecords(1, iter2);
210   } while (ChangeCompactOptions());
211 }
212 
TEST_F(DBTestXactLogIterator,TransactionLogIteratorBatchOperations)213 TEST_F(DBTestXactLogIterator, TransactionLogIteratorBatchOperations) {
214   do {
215     Options options = OptionsForLogIterTest();
216     DestroyAndReopen(options);
217     CreateAndReopenWithCF({"pikachu"}, options);
218     WriteBatch batch;
219     batch.Put(handles_[1], "key1", DummyString(1024));
220     batch.Put(handles_[0], "key2", DummyString(1024));
221     batch.Put(handles_[1], "key3", DummyString(1024));
222     batch.Delete(handles_[0], "key2");
223     dbfull()->Write(WriteOptions(), &batch);
224     Flush(1);
225     Flush(0);
226     ReopenWithColumnFamilies({"default", "pikachu"}, options);
227     Put(1, "key4", DummyString(1024));
228     auto iter = OpenTransactionLogIter(3);
229     ExpectRecords(2, iter);
230   } while (ChangeCompactOptions());
231 }
232 
TEST_F(DBTestXactLogIterator,TransactionLogIteratorBlobs)233 TEST_F(DBTestXactLogIterator, TransactionLogIteratorBlobs) {
234   Options options = OptionsForLogIterTest();
235   DestroyAndReopen(options);
236   CreateAndReopenWithCF({"pikachu"}, options);
237   {
238     WriteBatch batch;
239     batch.Put(handles_[1], "key1", DummyString(1024));
240     batch.Put(handles_[0], "key2", DummyString(1024));
241     batch.PutLogData(Slice("blob1"));
242     batch.Put(handles_[1], "key3", DummyString(1024));
243     batch.PutLogData(Slice("blob2"));
244     batch.Delete(handles_[0], "key2");
245     dbfull()->Write(WriteOptions(), &batch);
246     ReopenWithColumnFamilies({"default", "pikachu"}, options);
247   }
248 
249   auto res = OpenTransactionLogIter(0)->GetBatch();
250   struct Handler : public WriteBatch::Handler {
251     std::string seen;
252     Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
253       seen += "Put(" + ToString(cf) + ", " + key.ToString() + ", " +
254               ToString(value.size()) + ")";
255       return Status::OK();
256     }
257     Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
258       seen += "Merge(" + ToString(cf) + ", " + key.ToString() + ", " +
259               ToString(value.size()) + ")";
260       return Status::OK();
261     }
262     void LogData(const Slice& blob) override {
263       seen += "LogData(" + blob.ToString() + ")";
264     }
265     Status DeleteCF(uint32_t cf, const Slice& key) override {
266       seen += "Delete(" + ToString(cf) + ", " + key.ToString() + ")";
267       return Status::OK();
268     }
269   } handler;
270   res.writeBatchPtr->Iterate(&handler);
271   ASSERT_EQ(
272       "Put(1, key1, 1024)"
273       "Put(0, key2, 1024)"
274       "LogData(blob1)"
275       "Put(1, key3, 1024)"
276       "LogData(blob2)"
277       "Delete(0, key2)",
278       handler.seen);
279 }
280 }  // namespace ROCKSDB_NAMESPACE
281 
282 #endif  // !defined(ROCKSDB_LITE)
283 
main(int argc,char ** argv)284 int main(int argc, char** argv) {
285 #if !defined(ROCKSDB_LITE)
286   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
287   ::testing::InitGoogleTest(&argc, argv);
288   return RUN_ALL_TESTS();
289 #else
290   (void) argc;
291   (void) argv;
292   return 0;
293 #endif
294 }
295