1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #include "rocksdb/perf_context.h"
13 #include "rocksdb/utilities/debug.h"
14 #include "table/block_based/block_based_table_reader.h"
15 #include "table/block_based/block_builder.h"
16 #include "test_util/fault_injection_test_env.h"
17 #if !defined(ROCKSDB_LITE)
18 #include "test_util/sync_point.h"
19 #endif
20 
21 namespace ROCKSDB_NAMESPACE {
22 class DBBasicTestWithTimestampBase : public DBTestBase {
23  public:
DBBasicTestWithTimestampBase(const std::string & dbname)24   explicit DBBasicTestWithTimestampBase(const std::string& dbname)
25       : DBTestBase(dbname) {}
26 
27  protected:
Key1(uint64_t k)28   static std::string Key1(uint64_t k) {
29     uint32_t x = 1;
30     const bool is_little_endian = (*reinterpret_cast<char*>(&x) != 0);
31     std::string ret;
32     if (is_little_endian) {
33       ret.assign(reinterpret_cast<char*>(&k), sizeof(k));
34     } else {
35       ret.resize(sizeof(k));
36       ret[0] = k & 0xff;
37       ret[1] = (k >> 8) & 0xff;
38       ret[2] = (k >> 16) & 0xff;
39       ret[3] = (k >> 24) & 0xff;
40       ret[4] = (k >> 32) & 0xff;
41       ret[5] = (k >> 40) & 0xff;
42       ret[6] = (k >> 48) & 0xff;
43       ret[7] = (k >> 56) & 0xff;
44     }
45     std::reverse(ret.begin(), ret.end());
46     return ret;
47   }
48 
49   class TestComparator : public Comparator {
50    private:
51     const Comparator* cmp_without_ts_;
52 
53    public:
TestComparator(size_t ts_sz)54     explicit TestComparator(size_t ts_sz)
55         : Comparator(ts_sz), cmp_without_ts_(nullptr) {
56       cmp_without_ts_ = BytewiseComparator();
57     }
58 
Name() const59     const char* Name() const override { return "TestComparator"; }
60 
FindShortSuccessor(std::string *) const61     void FindShortSuccessor(std::string*) const override {}
62 
FindShortestSeparator(std::string *,const Slice &) const63     void FindShortestSeparator(std::string*, const Slice&) const override {}
64 
Compare(const Slice & a,const Slice & b) const65     int Compare(const Slice& a, const Slice& b) const override {
66       int r = CompareWithoutTimestamp(a, b);
67       if (r != 0 || 0 == timestamp_size()) {
68         return r;
69       }
70       return -CompareTimestamp(
71           Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
72           Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
73     }
74 
75     using Comparator::CompareWithoutTimestamp;
CompareWithoutTimestamp(const Slice & a,bool a_has_ts,const Slice & b,bool b_has_ts) const76     int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
77                                 bool b_has_ts) const override {
78       if (a_has_ts) {
79         assert(a.size() >= timestamp_size());
80       }
81       if (b_has_ts) {
82         assert(b.size() >= timestamp_size());
83       }
84       Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, timestamp_size()) : a;
85       Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, timestamp_size()) : b;
86       return cmp_without_ts_->Compare(lhs, rhs);
87     }
88 
CompareTimestamp(const Slice & ts1,const Slice & ts2) const89     int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
90       if (!ts1.data() && !ts2.data()) {
91         return 0;
92       } else if (ts1.data() && !ts2.data()) {
93         return 1;
94       } else if (!ts1.data() && ts2.data()) {
95         return -1;
96       }
97       assert(ts1.size() == ts2.size());
98       uint64_t low1 = 0;
99       uint64_t low2 = 0;
100       uint64_t high1 = 0;
101       uint64_t high2 = 0;
102       const size_t kSize = ts1.size();
103       std::unique_ptr<char[]> ts1_buf(new char[kSize]);
104       memcpy(ts1_buf.get(), ts1.data(), ts1.size());
105       std::unique_ptr<char[]> ts2_buf(new char[kSize]);
106       memcpy(ts2_buf.get(), ts2.data(), ts2.size());
107       Slice ts1_copy = Slice(ts1_buf.get(), kSize);
108       Slice ts2_copy = Slice(ts2_buf.get(), kSize);
109       auto* ptr1 = const_cast<Slice*>(&ts1_copy);
110       auto* ptr2 = const_cast<Slice*>(&ts2_copy);
111       if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
112           !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
113         assert(false);
114       }
115       if (high1 < high2) {
116         return -1;
117       } else if (high1 > high2) {
118         return 1;
119       }
120       if (low1 < low2) {
121         return -1;
122       } else if (low1 > low2) {
123         return 1;
124       }
125       return 0;
126     }
127   };
128 
Timestamp(uint64_t low,uint64_t high)129   std::string Timestamp(uint64_t low, uint64_t high) {
130     std::string ts;
131     PutFixed64(&ts, low);
132     PutFixed64(&ts, high);
133     return ts;
134   }
135 
CheckIterUserEntry(const Iterator * it,const Slice & expected_key,const Slice & expected_value,const Slice & expected_ts) const136   void CheckIterUserEntry(const Iterator* it, const Slice& expected_key,
137                           const Slice& expected_value,
138                           const Slice& expected_ts) const {
139     ASSERT_TRUE(it->Valid());
140     ASSERT_OK(it->status());
141     ASSERT_EQ(expected_key, it->key());
142     ASSERT_EQ(expected_value, it->value());
143     ASSERT_EQ(expected_ts, it->timestamp());
144   }
145 
CheckIterEntry(const Iterator * it,const Slice & expected_ukey,SequenceNumber expected_seq,ValueType expected_val_type,const Slice & expected_value,const Slice & expected_ts)146   void CheckIterEntry(const Iterator* it, const Slice& expected_ukey,
147                       SequenceNumber expected_seq, ValueType expected_val_type,
148                       const Slice& expected_value, const Slice& expected_ts) {
149     ASSERT_TRUE(it->Valid());
150     ASSERT_OK(it->status());
151     std::string ukey_and_ts;
152     ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
153     ukey_and_ts.append(expected_ts.data(), expected_ts.size());
154     ParsedInternalKey parsed_ikey(ukey_and_ts, expected_seq, expected_val_type);
155     std::string ikey;
156     AppendInternalKey(&ikey, parsed_ikey);
157     ASSERT_EQ(Slice(ikey), it->key());
158     if (expected_val_type == kTypeValue) {
159       ASSERT_EQ(expected_value, it->value());
160     }
161     ASSERT_EQ(expected_ts, it->timestamp());
162   }
163 };
164 
165 class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
166  public:
DBBasicTestWithTimestamp()167   DBBasicTestWithTimestamp()
168       : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
169 };
170 
TEST_F(DBBasicTestWithTimestamp,SimpleForwardIterate)171 TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
172   const int kNumKeysPerFile = 128;
173   const uint64_t kMaxKey = 1024;
174   Options options = CurrentOptions();
175   options.env = env_;
176   // TODO(yanqin) re-enable auto compaction
177   options.disable_auto_compactions = true;
178   options.create_if_missing = true;
179   const size_t kTimestampSize = Timestamp(0, 0).size();
180   TestComparator test_cmp(kTimestampSize);
181   options.comparator = &test_cmp;
182   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
183   DestroyAndReopen(options);
184   const std::vector<uint64_t> start_keys = {1, 0};
185   const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
186                                                      Timestamp(3, 0)};
187   const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
188                                                     Timestamp(4, 0)};
189   for (size_t i = 0; i < write_timestamps.size(); ++i) {
190     WriteOptions write_opts;
191     Slice write_ts = write_timestamps[i];
192     write_opts.timestamp = &write_ts;
193     for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
194       Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
195       ASSERT_OK(s);
196     }
197   }
198   for (size_t i = 0; i < read_timestamps.size(); ++i) {
199     ReadOptions read_opts;
200     Slice read_ts = read_timestamps[i];
201     read_opts.timestamp = &read_ts;
202     std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
203     int count = 0;
204     uint64_t key = 0;
205     for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
206          it->Next(), ++count, ++key) {
207       CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
208                          write_timestamps[i]);
209     }
210     size_t expected_count = kMaxKey - start_keys[i] + 1;
211     ASSERT_EQ(expected_count, count);
212 
213     // SeekToFirst() with lower bound.
214     // Then iter with lower and upper bounds.
215     uint64_t l = 0;
216     uint64_t r = kMaxKey + 1;
217     while (l < r) {
218       std::string lb_str = Key1(l);
219       Slice lb = lb_str;
220       std::string ub_str = Key1(r);
221       Slice ub = ub_str;
222       read_opts.iterate_lower_bound = &lb;
223       read_opts.iterate_upper_bound = &ub;
224       it.reset(db_->NewIterator(read_opts));
225       for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
226            it->Valid(); it->Next(), ++key, ++count) {
227         CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
228                            write_timestamps[i]);
229       }
230       ASSERT_EQ(r - std::max(l, start_keys[i]), count);
231       l += (kMaxKey / 100);
232       r -= (kMaxKey / 100);
233     }
234   }
235   Close();
236 }
237 
TEST_F(DBBasicTestWithTimestamp,ForwardIterateStartSeqnum)238 TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
239   const int kNumKeysPerFile = 128;
240   const uint64_t kMaxKey = 0xffffffffffffffff;
241   const uint64_t kMinKey = kMaxKey - 1023;
242   Options options = CurrentOptions();
243   options.env = env_;
244   options.create_if_missing = true;
245   // TODO(yanqin) re-enable auto compaction
246   options.disable_auto_compactions = true;
247   const size_t kTimestampSize = Timestamp(0, 0).size();
248   TestComparator test_cmp(kTimestampSize);
249   options.comparator = &test_cmp;
250   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
251   DestroyAndReopen(options);
252   std::vector<SequenceNumber> start_seqs;
253 
254   const int kNumTimestamps = 4;
255   std::vector<std::string> write_ts_list;
256   for (int t = 0; t != kNumTimestamps; ++t) {
257     write_ts_list.push_back(Timestamp(2 * t, /*do not care*/ 17));
258   }
259   WriteOptions write_opts;
260   for (size_t i = 0; i != write_ts_list.size(); ++i) {
261     Slice write_ts = write_ts_list[i];
262     write_opts.timestamp = &write_ts;
263     uint64_t k = kMinKey;
264     do {
265       Status s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
266       ASSERT_OK(s);
267       if (k == kMaxKey) {
268         break;
269       }
270       ++k;
271     } while (k != 0);
272     start_seqs.push_back(db_->GetLatestSequenceNumber());
273   }
274   std::vector<std::string> read_ts_list;
275   for (int t = 0; t != kNumTimestamps - 1; ++t) {
276     read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17));
277   }
278   ReadOptions read_opts;
279   for (size_t i = 0; i != read_ts_list.size(); ++i) {
280     Slice read_ts = read_ts_list[i];
281     read_opts.timestamp = &read_ts;
282     read_opts.iter_start_seqnum = start_seqs[i];
283     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
284     SequenceNumber expected_seq = start_seqs[i] + 1;
285     uint64_t key = kMinKey;
286     for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) {
287       CheckIterEntry(iter.get(), Key1(key), expected_seq, kTypeValue,
288                      "value" + std::to_string(i + 1), write_ts_list[i + 1]);
289       ++key;
290       ++expected_seq;
291     }
292   }
293   Close();
294 }
295 
TEST_F(DBBasicTestWithTimestamp,ReseekToTargetTimestamp)296 TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
297   Options options = CurrentOptions();
298   options.env = env_;
299   options.create_if_missing = true;
300   constexpr size_t kNumKeys = 16;
301   options.max_sequential_skip_in_iterations = kNumKeys / 2;
302   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
303   // TODO(yanqin) re-enable auto compaction
304   options.disable_auto_compactions = true;
305   const size_t kTimestampSize = Timestamp(0, 0).size();
306   TestComparator test_cmp(kTimestampSize);
307   options.comparator = &test_cmp;
308   DestroyAndReopen(options);
309   // Insert kNumKeys
310   WriteOptions write_opts;
311   Status s;
312   for (size_t i = 0; i != kNumKeys; ++i) {
313     std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
314     Slice ts = ts_str;
315     write_opts.timestamp = &ts;
316     s = db_->Put(write_opts, "foo", "value" + std::to_string(i));
317     ASSERT_OK(s);
318   }
319   {
320     ReadOptions read_opts;
321     std::string ts_str = Timestamp(1, 0);
322     Slice ts = ts_str;
323     read_opts.timestamp = &ts;
324     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
325     iter->SeekToFirst();
326     CheckIterUserEntry(iter.get(), "foo", "value0", ts_str);
327     ASSERT_EQ(
328         1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
329   }
330   Close();
331 }
332 
TEST_F(DBBasicTestWithTimestamp,ReseekToNextUserKey)333 TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
334   Options options = CurrentOptions();
335   options.env = env_;
336   options.create_if_missing = true;
337   constexpr size_t kNumKeys = 16;
338   options.max_sequential_skip_in_iterations = kNumKeys / 2;
339   options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
340   // TODO(yanqin) re-enable auto compaction
341   options.disable_auto_compactions = true;
342   const size_t kTimestampSize = Timestamp(0, 0).size();
343   TestComparator test_cmp(kTimestampSize);
344   options.comparator = &test_cmp;
345   DestroyAndReopen(options);
346   // Write kNumKeys + 1 keys
347   WriteOptions write_opts;
348   Status s;
349   for (size_t i = 0; i != kNumKeys; ++i) {
350     std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
351     Slice ts = ts_str;
352     write_opts.timestamp = &ts;
353     s = db_->Put(write_opts, "a", "value" + std::to_string(i));
354     ASSERT_OK(s);
355   }
356   {
357     std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
358     WriteBatch batch(0, 0, kTimestampSize);
359     batch.Put("a", "new_value");
360     batch.Put("b", "new_value");
361     s = batch.AssignTimestamp(ts_str);
362     ASSERT_OK(s);
363     s = db_->Write(write_opts, &batch);
364     ASSERT_OK(s);
365   }
366   {
367     ReadOptions read_opts;
368     std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
369     Slice ts = ts_str;
370     read_opts.timestamp = &ts;
371     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
372     iter->Seek("a");
373     iter->Next();
374     CheckIterUserEntry(iter.get(), "b", "new_value", ts_str);
375     ASSERT_EQ(
376         1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
377   }
378   Close();
379 }
380 
TEST_F(DBBasicTestWithTimestamp,MaxKeysSkipped)381 TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
382   Options options = CurrentOptions();
383   options.env = env_;
384   options.create_if_missing = true;
385   const size_t kTimestampSize = Timestamp(0, 0).size();
386   TestComparator test_cmp(kTimestampSize);
387   options.comparator = &test_cmp;
388   DestroyAndReopen(options);
389   constexpr size_t max_skippable_internal_keys = 2;
390   const size_t kNumKeys = max_skippable_internal_keys + 2;
391   WriteOptions write_opts;
392   Status s;
393   {
394     std::string ts_str = Timestamp(1, 0);
395     Slice ts = ts_str;
396     write_opts.timestamp = &ts;
397     ASSERT_OK(db_->Put(write_opts, "a", "value"));
398   }
399   for (size_t i = 0; i < kNumKeys; ++i) {
400     std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
401     Slice ts = ts_str;
402     write_opts.timestamp = &ts;
403     s = db_->Put(write_opts, "b", "value" + std::to_string(i));
404     ASSERT_OK(s);
405   }
406   {
407     ReadOptions read_opts;
408     read_opts.max_skippable_internal_keys = max_skippable_internal_keys;
409     std::string ts_str = Timestamp(1, 0);
410     Slice ts = ts_str;
411     read_opts.timestamp = &ts;
412     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
413     iter->SeekToFirst();
414     iter->Next();
415     ASSERT_TRUE(iter->status().IsIncomplete());
416   }
417   Close();
418 }
419 
420 class DBBasicTestWithTimestampCompressionSettings
421     : public DBBasicTestWithTimestampBase,
422       public testing::WithParamInterface<std::tuple<
423           std::shared_ptr<const FilterPolicy>, CompressionType, uint32_t>> {
424  public:
DBBasicTestWithTimestampCompressionSettings()425   DBBasicTestWithTimestampCompressionSettings()
426       : DBBasicTestWithTimestampBase(
427             "db_basic_test_with_timestamp_compression") {}
428 };
429 
TEST_P(DBBasicTestWithTimestampCompressionSettings,PutAndGet)430 TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
431   const int kNumKeysPerFile = 1024;
432   const size_t kNumTimestamps = 4;
433   Options options = CurrentOptions();
434   options.create_if_missing = true;
435   options.env = env_;
436   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
437   size_t ts_sz = Timestamp(0, 0).size();
438   TestComparator test_cmp(ts_sz);
439   options.comparator = &test_cmp;
440   BlockBasedTableOptions bbto;
441   bbto.filter_policy = std::get<0>(GetParam());
442   bbto.whole_key_filtering = true;
443   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
444 
445   const CompressionType comp_type = std::get<1>(GetParam());
446 #if LZ4_VERSION_NUMBER < 10400  // r124+
447   if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
448     return;
449   }
450 #endif  // LZ4_VERSION_NUMBER >= 10400
451   if (!ZSTD_Supported() && comp_type == kZSTD) {
452     return;
453   }
454   if (!Zlib_Supported() && comp_type == kZlibCompression) {
455     return;
456   }
457 
458   options.compression = comp_type;
459   options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
460   if (comp_type == kZSTD) {
461     options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
462   }
463   options.target_file_size_base = 1 << 26;  // 64MB
464   DestroyAndReopen(options);
465   CreateAndReopenWithCF({"pikachu"}, options);
466   size_t num_cfs = handles_.size();
467   ASSERT_EQ(2, num_cfs);
468   std::vector<std::string> write_ts_list;
469   std::vector<std::string> read_ts_list;
470 
471   for (size_t i = 0; i != kNumTimestamps; ++i) {
472     write_ts_list.push_back(Timestamp(i * 2, 0));
473     read_ts_list.push_back(Timestamp(1 + i * 2, 0));
474     const Slice write_ts = write_ts_list.back();
475     WriteOptions wopts;
476     wopts.timestamp = &write_ts;
477     for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
478       for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
479         ASSERT_OK(Put(cf, Key1(j),
480                       "value_" + std::to_string(j) + "_" + std::to_string(i),
481                       wopts));
482       }
483     }
484   }
485   const auto& verify_db_func = [&]() {
486     for (size_t i = 0; i != kNumTimestamps; ++i) {
487       ReadOptions ropts;
488       const Slice read_ts = read_ts_list[i];
489       ropts.timestamp = &read_ts;
490       for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
491         ColumnFamilyHandle* cfh = handles_[cf];
492         for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
493           std::string value;
494           ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value));
495           ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
496                     value);
497         }
498       }
499     }
500   };
501   verify_db_func();
502   Close();
503 }
504 
505 #ifndef ROCKSDB_LITE
506 // A class which remembers the name of each flushed file.
507 class FlushedFileCollector : public EventListener {
508  public:
FlushedFileCollector()509   FlushedFileCollector() {}
~FlushedFileCollector()510   ~FlushedFileCollector() override {}
511 
OnFlushCompleted(DB *,const FlushJobInfo & info)512   void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
513     InstrumentedMutexLock lock(&mutex_);
514     flushed_files_.push_back(info.file_path);
515   }
516 
GetFlushedFiles()517   std::vector<std::string> GetFlushedFiles() {
518     std::vector<std::string> result;
519     {
520       InstrumentedMutexLock lock(&mutex_);
521       result = flushed_files_;
522     }
523     return result;
524   }
525 
ClearFlushedFiles()526   void ClearFlushedFiles() {
527     InstrumentedMutexLock lock(&mutex_);
528     flushed_files_.clear();
529   }
530 
531  private:
532   std::vector<std::string> flushed_files_;
533   InstrumentedMutex mutex_;
534 };
535 
TEST_P(DBBasicTestWithTimestampCompressionSettings,PutAndGetWithCompaction)536 TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
537   const int kNumKeysPerFile = 1024;
538   const size_t kNumTimestamps = 2;
539   const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
540   const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
541   Options options = CurrentOptions();
542   options.create_if_missing = true;
543   options.env = env_;
544   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
545 
546   FlushedFileCollector* collector = new FlushedFileCollector();
547   options.listeners.emplace_back(collector);
548 
549   size_t ts_sz = Timestamp(0, 0).size();
550   TestComparator test_cmp(ts_sz);
551   options.comparator = &test_cmp;
552   BlockBasedTableOptions bbto;
553   bbto.filter_policy = std::get<0>(GetParam());
554   bbto.whole_key_filtering = true;
555   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
556 
557   const CompressionType comp_type = std::get<1>(GetParam());
558 #if LZ4_VERSION_NUMBER < 10400  // r124+
559   if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
560     return;
561   }
562 #endif  // LZ4_VERSION_NUMBER >= 10400
563   if (!ZSTD_Supported() && comp_type == kZSTD) {
564     return;
565   }
566   if (!Zlib_Supported() && comp_type == kZlibCompression) {
567     return;
568   }
569 
570   options.compression = comp_type;
571   options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
572   if (comp_type == kZSTD) {
573     options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
574   }
575   DestroyAndReopen(options);
576   CreateAndReopenWithCF({"pikachu"}, options);
577 
578   size_t num_cfs = handles_.size();
579   ASSERT_EQ(2, num_cfs);
580   std::vector<std::string> write_ts_list;
581   std::vector<std::string> read_ts_list;
582 
583   const auto& verify_records_func = [&](size_t i, size_t begin, size_t end,
584                                        ColumnFamilyHandle* cfh) {
585     std::string value;
586     std::string timestamp;
587 
588     ReadOptions ropts;
589     const Slice read_ts = read_ts_list[i];
590     ropts.timestamp = &read_ts;
591     std::string expected_timestamp =
592         std::string(write_ts_list[i].data(), write_ts_list[i].size());
593 
594     for (size_t j = begin; j <= end; ++j) {
595       ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value, &timestamp));
596       ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value);
597       ASSERT_EQ(expected_timestamp, timestamp);
598     }
599   };
600 
601   for (size_t i = 0; i != kNumTimestamps; ++i) {
602     write_ts_list.push_back(Timestamp(i * 2, 0));
603     read_ts_list.push_back(Timestamp(1 + i * 2, 0));
604     const Slice write_ts = write_ts_list.back();
605     WriteOptions wopts;
606     wopts.timestamp = &write_ts;
607     for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
608       size_t memtable_get_start = 0;
609       for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
610         ASSERT_OK(Put(cf, Key1(j),
611                       "value_" + std::to_string(j) + "_" + std::to_string(i),
612                       wopts));
613         if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
614           verify_records_func(i, memtable_get_start, j, handles_[cf]);
615           memtable_get_start = j + 1;
616 
617           // flush all keys with the same timestamp to two sst files, split at
618           // incremental positions such that lowerlevel[1].smallest.userkey ==
619           // higherlevel[0].largest.userkey
620           ASSERT_OK(Flush(cf));
621 
622           // compact files (2 at each level) to a lower level such that all keys
623           // with the same timestamp is at one level, with newer versions at
624           // higher levels.
625           CompactionOptions compact_opt;
626           compact_opt.compression = kNoCompression;
627           db_->CompactFiles(compact_opt, handles_[cf],
628                             collector->GetFlushedFiles(),
629                             static_cast<int>(kNumTimestamps - i));
630           collector->ClearFlushedFiles();
631         }
632       }
633     }
634   }
635   const auto& verify_db_func = [&]() {
636     for (size_t i = 0; i != kNumTimestamps; ++i) {
637       ReadOptions ropts;
638       const Slice read_ts = read_ts_list[i];
639       ropts.timestamp = &read_ts;
640       std::string expected_timestamp(write_ts_list[i].data(),
641                                      write_ts_list[i].size());
642       for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
643         ColumnFamilyHandle* cfh = handles_[cf];
644         verify_records_func(i, 0, kNumKeysPerTimestamp - 1, cfh);
645       }
646     }
647   };
648   verify_db_func();
649   Close();
650 }
651 
TEST_F(DBBasicTestWithTimestamp,BatchWriteAndMultiGet)652 TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
653   const int kNumKeysPerFile = 8192;
654   const size_t kNumTimestamps = 2;
655   const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
656   Options options = CurrentOptions();
657   options.create_if_missing = true;
658   options.env = env_;
659   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
660 
661   size_t ts_sz = Timestamp(0, 0).size();
662   TestComparator test_cmp(ts_sz);
663   options.comparator = &test_cmp;
664   BlockBasedTableOptions bbto;
665   bbto.filter_policy.reset(NewBloomFilterPolicy(
666       10 /*bits_per_key*/, false /*use_block_based_builder*/));
667   bbto.whole_key_filtering = true;
668   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
669   DestroyAndReopen(options);
670   CreateAndReopenWithCF({"pikachu"}, options);
671   size_t num_cfs = handles_.size();
672   ASSERT_EQ(2, num_cfs);
673   std::vector<std::string> write_ts_list;
674   std::vector<std::string> read_ts_list;
675 
676   const auto& verify_records_func = [&](size_t i, ColumnFamilyHandle* cfh) {
677     std::vector<Slice> keys;
678     std::vector<std::string> key_vals;
679     std::vector<std::string> values;
680     std::vector<std::string> timestamps;
681 
682     for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
683       key_vals.push_back(Key1(j));
684     }
685     for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
686       keys.push_back(key_vals[j]);
687     }
688 
689     ReadOptions ropts;
690     const Slice read_ts = read_ts_list[i];
691     ropts.timestamp = &read_ts;
692     std::string expected_timestamp(write_ts_list[i].data(),
693                                    write_ts_list[i].size());
694 
695     std::vector<ColumnFamilyHandle*> cfhs(keys.size(), cfh);
696     std::vector<Status> statuses =
697         db_->MultiGet(ropts, cfhs, keys, &values, &timestamps);
698     for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
699       ASSERT_OK(statuses[j]);
700       ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
701                 values[j]);
702       ASSERT_EQ(expected_timestamp, timestamps[j]);
703     }
704   };
705 
706   for (size_t i = 0; i != kNumTimestamps; ++i) {
707     write_ts_list.push_back(Timestamp(i * 2, 0));
708     read_ts_list.push_back(Timestamp(1 + i * 2, 0));
709     const Slice& write_ts = write_ts_list.back();
710     for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
711       WriteOptions wopts;
712       WriteBatch batch(0, 0, ts_sz);
713       for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
714         ASSERT_OK(
715             batch.Put(handles_[cf], Key1(j),
716                       "value_" + std::to_string(j) + "_" + std::to_string(i)));
717       }
718       batch.AssignTimestamp(write_ts);
719       ASSERT_OK(db_->Write(wopts, &batch));
720 
721       verify_records_func(i, handles_[cf]);
722 
723       ASSERT_OK(Flush(cf));
724     }
725   }
726 
727   const auto& verify_db_func = [&]() {
728     for (size_t i = 0; i != kNumTimestamps; ++i) {
729       ReadOptions ropts;
730       const Slice read_ts = read_ts_list[i];
731       ropts.timestamp = &read_ts;
732       for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
733         ColumnFamilyHandle* cfh = handles_[cf];
734         verify_records_func(i, cfh);
735       }
736     }
737   };
738   verify_db_func();
739   Close();
740 }
741 
742 #endif  // !ROCKSDB_LITE
743 
744 INSTANTIATE_TEST_CASE_P(
745     Timestamp, DBBasicTestWithTimestampCompressionSettings,
746     ::testing::Combine(
747         ::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
748                           std::shared_ptr<const FilterPolicy>(
749                               NewBloomFilterPolicy(10, false))),
750         ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression,
751                           kLZ4HCCompression, kZSTD),
752         ::testing::Values(0, 1 << 14)));
753 
754 class DBBasicTestWithTimestampPrefixSeek
755     : public DBBasicTestWithTimestampBase,
756       public testing::WithParamInterface<
757           std::tuple<std::shared_ptr<const SliceTransform>,
758                      std::shared_ptr<const FilterPolicy>, bool>> {
759  public:
DBBasicTestWithTimestampPrefixSeek()760   DBBasicTestWithTimestampPrefixSeek()
761       : DBBasicTestWithTimestampBase(
762             "/db_basic_test_with_timestamp_prefix_seek") {}
763 };
764 
TEST_P(DBBasicTestWithTimestampPrefixSeek,ForwardIterateWithPrefix)765 TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
766   const size_t kNumKeysPerFile = 128;
767   Options options = CurrentOptions();
768   options.env = env_;
769   options.create_if_missing = true;
770   // TODO(yanqin): re-enable auto compactions
771   options.disable_auto_compactions = true;
772   const size_t kTimestampSize = Timestamp(0, 0).size();
773   TestComparator test_cmp(kTimestampSize);
774   options.comparator = &test_cmp;
775   options.prefix_extractor = std::get<0>(GetParam());
776   options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
777   BlockBasedTableOptions bbto;
778   bbto.filter_policy = std::get<1>(GetParam());
779   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
780   DestroyAndReopen(options);
781 
782   const uint64_t kMaxKey = 0xffffffffffffffff;
783   const uint64_t kMinKey = 0xfffffffffffff000;
784   const std::vector<std::string> write_ts_list = {Timestamp(3, 0xffffffff),
785                                                   Timestamp(6, 0xffffffff)};
786   WriteOptions write_opts;
787   {
788     for (size_t i = 0; i != write_ts_list.size(); ++i) {
789       Slice write_ts = write_ts_list[i];
790       write_opts.timestamp = &write_ts;
791       uint64_t key = kMinKey;
792       do {
793         Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
794         ASSERT_OK(s);
795         if (key == kMaxKey) {
796           break;
797         }
798         ++key;
799       } while (true);
800     }
801   }
802   const std::vector<std::string> read_ts_list = {Timestamp(5, 0xffffffff),
803                                                  Timestamp(9, 0xffffffff)};
804   {
805     ReadOptions read_opts;
806     read_opts.total_order_seek = false;
807     read_opts.prefix_same_as_start = std::get<2>(GetParam());
808     fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(),
809             bbto.filter_policy ? bbto.filter_policy->Name() : "null",
810             static_cast<int>(read_opts.prefix_same_as_start));
811     for (size_t i = 0; i != read_ts_list.size(); ++i) {
812       Slice read_ts = read_ts_list[i];
813       read_opts.timestamp = &read_ts;
814       std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
815 
816       // Seek to kMaxKey
817       iter->Seek(Key1(kMaxKey));
818       CheckIterUserEntry(iter.get(), Key1(kMaxKey), "value" + std::to_string(i),
819                          write_ts_list[i]);
820       iter->Next();
821       ASSERT_FALSE(iter->Valid());
822     }
823     const std::vector<uint64_t> targets = {kMinKey, kMinKey + 0x10,
824                                            kMinKey + 0x100, kMaxKey};
825     const SliceTransform* const pe = options.prefix_extractor.get();
826     ASSERT_NE(nullptr, pe);
827     const size_t kPrefixShift =
828         8 * (Key1(0).size() - pe->Transform(Key1(0)).size());
829     const uint64_t kPrefixMask =
830         ~((static_cast<uint64_t>(1) << kPrefixShift) - 1);
831     const uint64_t kNumKeysWithinPrefix =
832         (static_cast<uint64_t>(1) << kPrefixShift);
833     for (size_t i = 0; i != read_ts_list.size(); ++i) {
834       Slice read_ts = read_ts_list[i];
835       read_opts.timestamp = &read_ts;
836       std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
837       for (size_t j = 0; j != targets.size(); ++j) {
838         std::string start_key = Key1(targets[j]);
839         uint64_t expected_ub =
840             (targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix;
841         uint64_t expected_key = targets[j];
842         size_t count = 0;
843         it->Seek(Key1(targets[j]));
844         while (it->Valid()) {
845           std::string saved_prev_key;
846           saved_prev_key.assign(it->key().data(), it->key().size());
847 
848           // Out of prefix
849           if (!read_opts.prefix_same_as_start &&
850               pe->Transform(saved_prev_key) != pe->Transform(start_key)) {
851             break;
852           }
853           CheckIterUserEntry(it.get(), Key1(expected_key),
854                              "value" + std::to_string(i), write_ts_list[i]);
855           ++count;
856           ++expected_key;
857           it->Next();
858         }
859         ASSERT_EQ(expected_ub - targets[j] + 1, count);
860       }
861     }
862   }
863   Close();
864 }
865 
866 // TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g.
867 // NoopTransform.
868 INSTANTIATE_TEST_CASE_P(
869     Timestamp, DBBasicTestWithTimestampPrefixSeek,
870     ::testing::Combine(
871         ::testing::Values(
872             std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
873             std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
874             std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
875         ::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
876                           std::shared_ptr<const FilterPolicy>(
877                               NewBloomFilterPolicy(10 /*bits_per_key*/, false)),
878                           std::shared_ptr<const FilterPolicy>(
879                               NewBloomFilterPolicy(20 /*bits_per_key*/,
880                                                    false))),
881         ::testing::Bool()));
882 
883 }  // namespace ROCKSDB_NAMESPACE
884 
885 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
886 extern "C" {
887 void RegisterCustomObjects(int argc, char** argv);
888 }
889 #else
RegisterCustomObjects(int,char **)890 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
891 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
892 
main(int argc,char ** argv)893 int main(int argc, char** argv) {
894   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
895   ::testing::InitGoogleTest(&argc, argv);
896   RegisterCustomObjects(argc, argv);
897   return RUN_ALL_TESTS();
898 }
899