1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #include "rocksdb/perf_context.h"
13 #include "rocksdb/utilities/debug.h"
14 #include "table/block_based/block_based_table_reader.h"
15 #include "table/block_based/block_builder.h"
16 #include "test_util/fault_injection_test_env.h"
17 #if !defined(ROCKSDB_LITE)
18 #include "test_util/sync_point.h"
19 #endif
20
21 namespace ROCKSDB_NAMESPACE {
22 class DBBasicTestWithTimestampBase : public DBTestBase {
23 public:
DBBasicTestWithTimestampBase(const std::string & dbname)24 explicit DBBasicTestWithTimestampBase(const std::string& dbname)
25 : DBTestBase(dbname) {}
26
27 protected:
Key1(uint64_t k)28 static std::string Key1(uint64_t k) {
29 uint32_t x = 1;
30 const bool is_little_endian = (*reinterpret_cast<char*>(&x) != 0);
31 std::string ret;
32 if (is_little_endian) {
33 ret.assign(reinterpret_cast<char*>(&k), sizeof(k));
34 } else {
35 ret.resize(sizeof(k));
36 ret[0] = k & 0xff;
37 ret[1] = (k >> 8) & 0xff;
38 ret[2] = (k >> 16) & 0xff;
39 ret[3] = (k >> 24) & 0xff;
40 ret[4] = (k >> 32) & 0xff;
41 ret[5] = (k >> 40) & 0xff;
42 ret[6] = (k >> 48) & 0xff;
43 ret[7] = (k >> 56) & 0xff;
44 }
45 std::reverse(ret.begin(), ret.end());
46 return ret;
47 }
48
49 class TestComparator : public Comparator {
50 private:
51 const Comparator* cmp_without_ts_;
52
53 public:
TestComparator(size_t ts_sz)54 explicit TestComparator(size_t ts_sz)
55 : Comparator(ts_sz), cmp_without_ts_(nullptr) {
56 cmp_without_ts_ = BytewiseComparator();
57 }
58
Name() const59 const char* Name() const override { return "TestComparator"; }
60
FindShortSuccessor(std::string *) const61 void FindShortSuccessor(std::string*) const override {}
62
FindShortestSeparator(std::string *,const Slice &) const63 void FindShortestSeparator(std::string*, const Slice&) const override {}
64
Compare(const Slice & a,const Slice & b) const65 int Compare(const Slice& a, const Slice& b) const override {
66 int r = CompareWithoutTimestamp(a, b);
67 if (r != 0 || 0 == timestamp_size()) {
68 return r;
69 }
70 return -CompareTimestamp(
71 Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
72 Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
73 }
74
75 using Comparator::CompareWithoutTimestamp;
CompareWithoutTimestamp(const Slice & a,bool a_has_ts,const Slice & b,bool b_has_ts) const76 int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
77 bool b_has_ts) const override {
78 if (a_has_ts) {
79 assert(a.size() >= timestamp_size());
80 }
81 if (b_has_ts) {
82 assert(b.size() >= timestamp_size());
83 }
84 Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, timestamp_size()) : a;
85 Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, timestamp_size()) : b;
86 return cmp_without_ts_->Compare(lhs, rhs);
87 }
88
CompareTimestamp(const Slice & ts1,const Slice & ts2) const89 int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
90 if (!ts1.data() && !ts2.data()) {
91 return 0;
92 } else if (ts1.data() && !ts2.data()) {
93 return 1;
94 } else if (!ts1.data() && ts2.data()) {
95 return -1;
96 }
97 assert(ts1.size() == ts2.size());
98 uint64_t low1 = 0;
99 uint64_t low2 = 0;
100 uint64_t high1 = 0;
101 uint64_t high2 = 0;
102 const size_t kSize = ts1.size();
103 std::unique_ptr<char[]> ts1_buf(new char[kSize]);
104 memcpy(ts1_buf.get(), ts1.data(), ts1.size());
105 std::unique_ptr<char[]> ts2_buf(new char[kSize]);
106 memcpy(ts2_buf.get(), ts2.data(), ts2.size());
107 Slice ts1_copy = Slice(ts1_buf.get(), kSize);
108 Slice ts2_copy = Slice(ts2_buf.get(), kSize);
109 auto* ptr1 = const_cast<Slice*>(&ts1_copy);
110 auto* ptr2 = const_cast<Slice*>(&ts2_copy);
111 if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
112 !GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
113 assert(false);
114 }
115 if (high1 < high2) {
116 return -1;
117 } else if (high1 > high2) {
118 return 1;
119 }
120 if (low1 < low2) {
121 return -1;
122 } else if (low1 > low2) {
123 return 1;
124 }
125 return 0;
126 }
127 };
128
Timestamp(uint64_t low,uint64_t high)129 std::string Timestamp(uint64_t low, uint64_t high) {
130 std::string ts;
131 PutFixed64(&ts, low);
132 PutFixed64(&ts, high);
133 return ts;
134 }
135
CheckIterUserEntry(const Iterator * it,const Slice & expected_key,const Slice & expected_value,const Slice & expected_ts) const136 void CheckIterUserEntry(const Iterator* it, const Slice& expected_key,
137 const Slice& expected_value,
138 const Slice& expected_ts) const {
139 ASSERT_TRUE(it->Valid());
140 ASSERT_OK(it->status());
141 ASSERT_EQ(expected_key, it->key());
142 ASSERT_EQ(expected_value, it->value());
143 ASSERT_EQ(expected_ts, it->timestamp());
144 }
145
CheckIterEntry(const Iterator * it,const Slice & expected_ukey,SequenceNumber expected_seq,ValueType expected_val_type,const Slice & expected_value,const Slice & expected_ts)146 void CheckIterEntry(const Iterator* it, const Slice& expected_ukey,
147 SequenceNumber expected_seq, ValueType expected_val_type,
148 const Slice& expected_value, const Slice& expected_ts) {
149 ASSERT_TRUE(it->Valid());
150 ASSERT_OK(it->status());
151 std::string ukey_and_ts;
152 ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
153 ukey_and_ts.append(expected_ts.data(), expected_ts.size());
154 ParsedInternalKey parsed_ikey(ukey_and_ts, expected_seq, expected_val_type);
155 std::string ikey;
156 AppendInternalKey(&ikey, parsed_ikey);
157 ASSERT_EQ(Slice(ikey), it->key());
158 if (expected_val_type == kTypeValue) {
159 ASSERT_EQ(expected_value, it->value());
160 }
161 ASSERT_EQ(expected_ts, it->timestamp());
162 }
163 };
164
165 class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
166 public:
DBBasicTestWithTimestamp()167 DBBasicTestWithTimestamp()
168 : DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
169 };
170
TEST_F(DBBasicTestWithTimestamp,SimpleForwardIterate)171 TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
172 const int kNumKeysPerFile = 128;
173 const uint64_t kMaxKey = 1024;
174 Options options = CurrentOptions();
175 options.env = env_;
176 // TODO(yanqin) re-enable auto compaction
177 options.disable_auto_compactions = true;
178 options.create_if_missing = true;
179 const size_t kTimestampSize = Timestamp(0, 0).size();
180 TestComparator test_cmp(kTimestampSize);
181 options.comparator = &test_cmp;
182 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
183 DestroyAndReopen(options);
184 const std::vector<uint64_t> start_keys = {1, 0};
185 const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
186 Timestamp(3, 0)};
187 const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
188 Timestamp(4, 0)};
189 for (size_t i = 0; i < write_timestamps.size(); ++i) {
190 WriteOptions write_opts;
191 Slice write_ts = write_timestamps[i];
192 write_opts.timestamp = &write_ts;
193 for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
194 Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
195 ASSERT_OK(s);
196 }
197 }
198 for (size_t i = 0; i < read_timestamps.size(); ++i) {
199 ReadOptions read_opts;
200 Slice read_ts = read_timestamps[i];
201 read_opts.timestamp = &read_ts;
202 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
203 int count = 0;
204 uint64_t key = 0;
205 for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
206 it->Next(), ++count, ++key) {
207 CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
208 write_timestamps[i]);
209 }
210 size_t expected_count = kMaxKey - start_keys[i] + 1;
211 ASSERT_EQ(expected_count, count);
212
213 // SeekToFirst() with lower bound.
214 // Then iter with lower and upper bounds.
215 uint64_t l = 0;
216 uint64_t r = kMaxKey + 1;
217 while (l < r) {
218 std::string lb_str = Key1(l);
219 Slice lb = lb_str;
220 std::string ub_str = Key1(r);
221 Slice ub = ub_str;
222 read_opts.iterate_lower_bound = &lb;
223 read_opts.iterate_upper_bound = &ub;
224 it.reset(db_->NewIterator(read_opts));
225 for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
226 it->Valid(); it->Next(), ++key, ++count) {
227 CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
228 write_timestamps[i]);
229 }
230 ASSERT_EQ(r - std::max(l, start_keys[i]), count);
231 l += (kMaxKey / 100);
232 r -= (kMaxKey / 100);
233 }
234 }
235 Close();
236 }
237
TEST_F(DBBasicTestWithTimestamp,ForwardIterateStartSeqnum)238 TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
239 const int kNumKeysPerFile = 128;
240 const uint64_t kMaxKey = 0xffffffffffffffff;
241 const uint64_t kMinKey = kMaxKey - 1023;
242 Options options = CurrentOptions();
243 options.env = env_;
244 options.create_if_missing = true;
245 // TODO(yanqin) re-enable auto compaction
246 options.disable_auto_compactions = true;
247 const size_t kTimestampSize = Timestamp(0, 0).size();
248 TestComparator test_cmp(kTimestampSize);
249 options.comparator = &test_cmp;
250 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
251 DestroyAndReopen(options);
252 std::vector<SequenceNumber> start_seqs;
253
254 const int kNumTimestamps = 4;
255 std::vector<std::string> write_ts_list;
256 for (int t = 0; t != kNumTimestamps; ++t) {
257 write_ts_list.push_back(Timestamp(2 * t, /*do not care*/ 17));
258 }
259 WriteOptions write_opts;
260 for (size_t i = 0; i != write_ts_list.size(); ++i) {
261 Slice write_ts = write_ts_list[i];
262 write_opts.timestamp = &write_ts;
263 uint64_t k = kMinKey;
264 do {
265 Status s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
266 ASSERT_OK(s);
267 if (k == kMaxKey) {
268 break;
269 }
270 ++k;
271 } while (k != 0);
272 start_seqs.push_back(db_->GetLatestSequenceNumber());
273 }
274 std::vector<std::string> read_ts_list;
275 for (int t = 0; t != kNumTimestamps - 1; ++t) {
276 read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17));
277 }
278 ReadOptions read_opts;
279 for (size_t i = 0; i != read_ts_list.size(); ++i) {
280 Slice read_ts = read_ts_list[i];
281 read_opts.timestamp = &read_ts;
282 read_opts.iter_start_seqnum = start_seqs[i];
283 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
284 SequenceNumber expected_seq = start_seqs[i] + 1;
285 uint64_t key = kMinKey;
286 for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) {
287 CheckIterEntry(iter.get(), Key1(key), expected_seq, kTypeValue,
288 "value" + std::to_string(i + 1), write_ts_list[i + 1]);
289 ++key;
290 ++expected_seq;
291 }
292 }
293 Close();
294 }
295
TEST_F(DBBasicTestWithTimestamp,ReseekToTargetTimestamp)296 TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
297 Options options = CurrentOptions();
298 options.env = env_;
299 options.create_if_missing = true;
300 constexpr size_t kNumKeys = 16;
301 options.max_sequential_skip_in_iterations = kNumKeys / 2;
302 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
303 // TODO(yanqin) re-enable auto compaction
304 options.disable_auto_compactions = true;
305 const size_t kTimestampSize = Timestamp(0, 0).size();
306 TestComparator test_cmp(kTimestampSize);
307 options.comparator = &test_cmp;
308 DestroyAndReopen(options);
309 // Insert kNumKeys
310 WriteOptions write_opts;
311 Status s;
312 for (size_t i = 0; i != kNumKeys; ++i) {
313 std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
314 Slice ts = ts_str;
315 write_opts.timestamp = &ts;
316 s = db_->Put(write_opts, "foo", "value" + std::to_string(i));
317 ASSERT_OK(s);
318 }
319 {
320 ReadOptions read_opts;
321 std::string ts_str = Timestamp(1, 0);
322 Slice ts = ts_str;
323 read_opts.timestamp = &ts;
324 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
325 iter->SeekToFirst();
326 CheckIterUserEntry(iter.get(), "foo", "value0", ts_str);
327 ASSERT_EQ(
328 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
329 }
330 Close();
331 }
332
TEST_F(DBBasicTestWithTimestamp,ReseekToNextUserKey)333 TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
334 Options options = CurrentOptions();
335 options.env = env_;
336 options.create_if_missing = true;
337 constexpr size_t kNumKeys = 16;
338 options.max_sequential_skip_in_iterations = kNumKeys / 2;
339 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
340 // TODO(yanqin) re-enable auto compaction
341 options.disable_auto_compactions = true;
342 const size_t kTimestampSize = Timestamp(0, 0).size();
343 TestComparator test_cmp(kTimestampSize);
344 options.comparator = &test_cmp;
345 DestroyAndReopen(options);
346 // Write kNumKeys + 1 keys
347 WriteOptions write_opts;
348 Status s;
349 for (size_t i = 0; i != kNumKeys; ++i) {
350 std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
351 Slice ts = ts_str;
352 write_opts.timestamp = &ts;
353 s = db_->Put(write_opts, "a", "value" + std::to_string(i));
354 ASSERT_OK(s);
355 }
356 {
357 std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
358 WriteBatch batch(0, 0, kTimestampSize);
359 batch.Put("a", "new_value");
360 batch.Put("b", "new_value");
361 s = batch.AssignTimestamp(ts_str);
362 ASSERT_OK(s);
363 s = db_->Write(write_opts, &batch);
364 ASSERT_OK(s);
365 }
366 {
367 ReadOptions read_opts;
368 std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
369 Slice ts = ts_str;
370 read_opts.timestamp = &ts;
371 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
372 iter->Seek("a");
373 iter->Next();
374 CheckIterUserEntry(iter.get(), "b", "new_value", ts_str);
375 ASSERT_EQ(
376 1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
377 }
378 Close();
379 }
380
TEST_F(DBBasicTestWithTimestamp,MaxKeysSkipped)381 TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
382 Options options = CurrentOptions();
383 options.env = env_;
384 options.create_if_missing = true;
385 const size_t kTimestampSize = Timestamp(0, 0).size();
386 TestComparator test_cmp(kTimestampSize);
387 options.comparator = &test_cmp;
388 DestroyAndReopen(options);
389 constexpr size_t max_skippable_internal_keys = 2;
390 const size_t kNumKeys = max_skippable_internal_keys + 2;
391 WriteOptions write_opts;
392 Status s;
393 {
394 std::string ts_str = Timestamp(1, 0);
395 Slice ts = ts_str;
396 write_opts.timestamp = &ts;
397 ASSERT_OK(db_->Put(write_opts, "a", "value"));
398 }
399 for (size_t i = 0; i < kNumKeys; ++i) {
400 std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
401 Slice ts = ts_str;
402 write_opts.timestamp = &ts;
403 s = db_->Put(write_opts, "b", "value" + std::to_string(i));
404 ASSERT_OK(s);
405 }
406 {
407 ReadOptions read_opts;
408 read_opts.max_skippable_internal_keys = max_skippable_internal_keys;
409 std::string ts_str = Timestamp(1, 0);
410 Slice ts = ts_str;
411 read_opts.timestamp = &ts;
412 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
413 iter->SeekToFirst();
414 iter->Next();
415 ASSERT_TRUE(iter->status().IsIncomplete());
416 }
417 Close();
418 }
419
420 class DBBasicTestWithTimestampCompressionSettings
421 : public DBBasicTestWithTimestampBase,
422 public testing::WithParamInterface<std::tuple<
423 std::shared_ptr<const FilterPolicy>, CompressionType, uint32_t>> {
424 public:
DBBasicTestWithTimestampCompressionSettings()425 DBBasicTestWithTimestampCompressionSettings()
426 : DBBasicTestWithTimestampBase(
427 "db_basic_test_with_timestamp_compression") {}
428 };
429
TEST_P(DBBasicTestWithTimestampCompressionSettings,PutAndGet)430 TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
431 const int kNumKeysPerFile = 1024;
432 const size_t kNumTimestamps = 4;
433 Options options = CurrentOptions();
434 options.create_if_missing = true;
435 options.env = env_;
436 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
437 size_t ts_sz = Timestamp(0, 0).size();
438 TestComparator test_cmp(ts_sz);
439 options.comparator = &test_cmp;
440 BlockBasedTableOptions bbto;
441 bbto.filter_policy = std::get<0>(GetParam());
442 bbto.whole_key_filtering = true;
443 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
444
445 const CompressionType comp_type = std::get<1>(GetParam());
446 #if LZ4_VERSION_NUMBER < 10400 // r124+
447 if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
448 return;
449 }
450 #endif // LZ4_VERSION_NUMBER >= 10400
451 if (!ZSTD_Supported() && comp_type == kZSTD) {
452 return;
453 }
454 if (!Zlib_Supported() && comp_type == kZlibCompression) {
455 return;
456 }
457
458 options.compression = comp_type;
459 options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
460 if (comp_type == kZSTD) {
461 options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
462 }
463 options.target_file_size_base = 1 << 26; // 64MB
464 DestroyAndReopen(options);
465 CreateAndReopenWithCF({"pikachu"}, options);
466 size_t num_cfs = handles_.size();
467 ASSERT_EQ(2, num_cfs);
468 std::vector<std::string> write_ts_list;
469 std::vector<std::string> read_ts_list;
470
471 for (size_t i = 0; i != kNumTimestamps; ++i) {
472 write_ts_list.push_back(Timestamp(i * 2, 0));
473 read_ts_list.push_back(Timestamp(1 + i * 2, 0));
474 const Slice write_ts = write_ts_list.back();
475 WriteOptions wopts;
476 wopts.timestamp = &write_ts;
477 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
478 for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
479 ASSERT_OK(Put(cf, Key1(j),
480 "value_" + std::to_string(j) + "_" + std::to_string(i),
481 wopts));
482 }
483 }
484 }
485 const auto& verify_db_func = [&]() {
486 for (size_t i = 0; i != kNumTimestamps; ++i) {
487 ReadOptions ropts;
488 const Slice read_ts = read_ts_list[i];
489 ropts.timestamp = &read_ts;
490 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
491 ColumnFamilyHandle* cfh = handles_[cf];
492 for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
493 std::string value;
494 ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value));
495 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
496 value);
497 }
498 }
499 }
500 };
501 verify_db_func();
502 Close();
503 }
504
505 #ifndef ROCKSDB_LITE
506 // A class which remembers the name of each flushed file.
507 class FlushedFileCollector : public EventListener {
508 public:
FlushedFileCollector()509 FlushedFileCollector() {}
~FlushedFileCollector()510 ~FlushedFileCollector() override {}
511
OnFlushCompleted(DB *,const FlushJobInfo & info)512 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
513 InstrumentedMutexLock lock(&mutex_);
514 flushed_files_.push_back(info.file_path);
515 }
516
GetFlushedFiles()517 std::vector<std::string> GetFlushedFiles() {
518 std::vector<std::string> result;
519 {
520 InstrumentedMutexLock lock(&mutex_);
521 result = flushed_files_;
522 }
523 return result;
524 }
525
ClearFlushedFiles()526 void ClearFlushedFiles() {
527 InstrumentedMutexLock lock(&mutex_);
528 flushed_files_.clear();
529 }
530
531 private:
532 std::vector<std::string> flushed_files_;
533 InstrumentedMutex mutex_;
534 };
535
TEST_P(DBBasicTestWithTimestampCompressionSettings,PutAndGetWithCompaction)536 TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
537 const int kNumKeysPerFile = 1024;
538 const size_t kNumTimestamps = 2;
539 const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
540 const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
541 Options options = CurrentOptions();
542 options.create_if_missing = true;
543 options.env = env_;
544 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
545
546 FlushedFileCollector* collector = new FlushedFileCollector();
547 options.listeners.emplace_back(collector);
548
549 size_t ts_sz = Timestamp(0, 0).size();
550 TestComparator test_cmp(ts_sz);
551 options.comparator = &test_cmp;
552 BlockBasedTableOptions bbto;
553 bbto.filter_policy = std::get<0>(GetParam());
554 bbto.whole_key_filtering = true;
555 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
556
557 const CompressionType comp_type = std::get<1>(GetParam());
558 #if LZ4_VERSION_NUMBER < 10400 // r124+
559 if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
560 return;
561 }
562 #endif // LZ4_VERSION_NUMBER >= 10400
563 if (!ZSTD_Supported() && comp_type == kZSTD) {
564 return;
565 }
566 if (!Zlib_Supported() && comp_type == kZlibCompression) {
567 return;
568 }
569
570 options.compression = comp_type;
571 options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
572 if (comp_type == kZSTD) {
573 options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
574 }
575 DestroyAndReopen(options);
576 CreateAndReopenWithCF({"pikachu"}, options);
577
578 size_t num_cfs = handles_.size();
579 ASSERT_EQ(2, num_cfs);
580 std::vector<std::string> write_ts_list;
581 std::vector<std::string> read_ts_list;
582
583 const auto& verify_records_func = [&](size_t i, size_t begin, size_t end,
584 ColumnFamilyHandle* cfh) {
585 std::string value;
586 std::string timestamp;
587
588 ReadOptions ropts;
589 const Slice read_ts = read_ts_list[i];
590 ropts.timestamp = &read_ts;
591 std::string expected_timestamp =
592 std::string(write_ts_list[i].data(), write_ts_list[i].size());
593
594 for (size_t j = begin; j <= end; ++j) {
595 ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value, ×tamp));
596 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i), value);
597 ASSERT_EQ(expected_timestamp, timestamp);
598 }
599 };
600
601 for (size_t i = 0; i != kNumTimestamps; ++i) {
602 write_ts_list.push_back(Timestamp(i * 2, 0));
603 read_ts_list.push_back(Timestamp(1 + i * 2, 0));
604 const Slice write_ts = write_ts_list.back();
605 WriteOptions wopts;
606 wopts.timestamp = &write_ts;
607 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
608 size_t memtable_get_start = 0;
609 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
610 ASSERT_OK(Put(cf, Key1(j),
611 "value_" + std::to_string(j) + "_" + std::to_string(i),
612 wopts));
613 if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
614 verify_records_func(i, memtable_get_start, j, handles_[cf]);
615 memtable_get_start = j + 1;
616
617 // flush all keys with the same timestamp to two sst files, split at
618 // incremental positions such that lowerlevel[1].smallest.userkey ==
619 // higherlevel[0].largest.userkey
620 ASSERT_OK(Flush(cf));
621
622 // compact files (2 at each level) to a lower level such that all keys
623 // with the same timestamp is at one level, with newer versions at
624 // higher levels.
625 CompactionOptions compact_opt;
626 compact_opt.compression = kNoCompression;
627 db_->CompactFiles(compact_opt, handles_[cf],
628 collector->GetFlushedFiles(),
629 static_cast<int>(kNumTimestamps - i));
630 collector->ClearFlushedFiles();
631 }
632 }
633 }
634 }
635 const auto& verify_db_func = [&]() {
636 for (size_t i = 0; i != kNumTimestamps; ++i) {
637 ReadOptions ropts;
638 const Slice read_ts = read_ts_list[i];
639 ropts.timestamp = &read_ts;
640 std::string expected_timestamp(write_ts_list[i].data(),
641 write_ts_list[i].size());
642 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
643 ColumnFamilyHandle* cfh = handles_[cf];
644 verify_records_func(i, 0, kNumKeysPerTimestamp - 1, cfh);
645 }
646 }
647 };
648 verify_db_func();
649 Close();
650 }
651
TEST_F(DBBasicTestWithTimestamp,BatchWriteAndMultiGet)652 TEST_F(DBBasicTestWithTimestamp, BatchWriteAndMultiGet) {
653 const int kNumKeysPerFile = 8192;
654 const size_t kNumTimestamps = 2;
655 const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
656 Options options = CurrentOptions();
657 options.create_if_missing = true;
658 options.env = env_;
659 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
660
661 size_t ts_sz = Timestamp(0, 0).size();
662 TestComparator test_cmp(ts_sz);
663 options.comparator = &test_cmp;
664 BlockBasedTableOptions bbto;
665 bbto.filter_policy.reset(NewBloomFilterPolicy(
666 10 /*bits_per_key*/, false /*use_block_based_builder*/));
667 bbto.whole_key_filtering = true;
668 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
669 DestroyAndReopen(options);
670 CreateAndReopenWithCF({"pikachu"}, options);
671 size_t num_cfs = handles_.size();
672 ASSERT_EQ(2, num_cfs);
673 std::vector<std::string> write_ts_list;
674 std::vector<std::string> read_ts_list;
675
676 const auto& verify_records_func = [&](size_t i, ColumnFamilyHandle* cfh) {
677 std::vector<Slice> keys;
678 std::vector<std::string> key_vals;
679 std::vector<std::string> values;
680 std::vector<std::string> timestamps;
681
682 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
683 key_vals.push_back(Key1(j));
684 }
685 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
686 keys.push_back(key_vals[j]);
687 }
688
689 ReadOptions ropts;
690 const Slice read_ts = read_ts_list[i];
691 ropts.timestamp = &read_ts;
692 std::string expected_timestamp(write_ts_list[i].data(),
693 write_ts_list[i].size());
694
695 std::vector<ColumnFamilyHandle*> cfhs(keys.size(), cfh);
696 std::vector<Status> statuses =
697 db_->MultiGet(ropts, cfhs, keys, &values, ×tamps);
698 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
699 ASSERT_OK(statuses[j]);
700 ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
701 values[j]);
702 ASSERT_EQ(expected_timestamp, timestamps[j]);
703 }
704 };
705
706 for (size_t i = 0; i != kNumTimestamps; ++i) {
707 write_ts_list.push_back(Timestamp(i * 2, 0));
708 read_ts_list.push_back(Timestamp(1 + i * 2, 0));
709 const Slice& write_ts = write_ts_list.back();
710 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
711 WriteOptions wopts;
712 WriteBatch batch(0, 0, ts_sz);
713 for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
714 ASSERT_OK(
715 batch.Put(handles_[cf], Key1(j),
716 "value_" + std::to_string(j) + "_" + std::to_string(i)));
717 }
718 batch.AssignTimestamp(write_ts);
719 ASSERT_OK(db_->Write(wopts, &batch));
720
721 verify_records_func(i, handles_[cf]);
722
723 ASSERT_OK(Flush(cf));
724 }
725 }
726
727 const auto& verify_db_func = [&]() {
728 for (size_t i = 0; i != kNumTimestamps; ++i) {
729 ReadOptions ropts;
730 const Slice read_ts = read_ts_list[i];
731 ropts.timestamp = &read_ts;
732 for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
733 ColumnFamilyHandle* cfh = handles_[cf];
734 verify_records_func(i, cfh);
735 }
736 }
737 };
738 verify_db_func();
739 Close();
740 }
741
742 #endif // !ROCKSDB_LITE
743
744 INSTANTIATE_TEST_CASE_P(
745 Timestamp, DBBasicTestWithTimestampCompressionSettings,
746 ::testing::Combine(
747 ::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
748 std::shared_ptr<const FilterPolicy>(
749 NewBloomFilterPolicy(10, false))),
750 ::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression,
751 kLZ4HCCompression, kZSTD),
752 ::testing::Values(0, 1 << 14)));
753
754 class DBBasicTestWithTimestampPrefixSeek
755 : public DBBasicTestWithTimestampBase,
756 public testing::WithParamInterface<
757 std::tuple<std::shared_ptr<const SliceTransform>,
758 std::shared_ptr<const FilterPolicy>, bool>> {
759 public:
DBBasicTestWithTimestampPrefixSeek()760 DBBasicTestWithTimestampPrefixSeek()
761 : DBBasicTestWithTimestampBase(
762 "/db_basic_test_with_timestamp_prefix_seek") {}
763 };
764
TEST_P(DBBasicTestWithTimestampPrefixSeek,ForwardIterateWithPrefix)765 TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
766 const size_t kNumKeysPerFile = 128;
767 Options options = CurrentOptions();
768 options.env = env_;
769 options.create_if_missing = true;
770 // TODO(yanqin): re-enable auto compactions
771 options.disable_auto_compactions = true;
772 const size_t kTimestampSize = Timestamp(0, 0).size();
773 TestComparator test_cmp(kTimestampSize);
774 options.comparator = &test_cmp;
775 options.prefix_extractor = std::get<0>(GetParam());
776 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
777 BlockBasedTableOptions bbto;
778 bbto.filter_policy = std::get<1>(GetParam());
779 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
780 DestroyAndReopen(options);
781
782 const uint64_t kMaxKey = 0xffffffffffffffff;
783 const uint64_t kMinKey = 0xfffffffffffff000;
784 const std::vector<std::string> write_ts_list = {Timestamp(3, 0xffffffff),
785 Timestamp(6, 0xffffffff)};
786 WriteOptions write_opts;
787 {
788 for (size_t i = 0; i != write_ts_list.size(); ++i) {
789 Slice write_ts = write_ts_list[i];
790 write_opts.timestamp = &write_ts;
791 uint64_t key = kMinKey;
792 do {
793 Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
794 ASSERT_OK(s);
795 if (key == kMaxKey) {
796 break;
797 }
798 ++key;
799 } while (true);
800 }
801 }
802 const std::vector<std::string> read_ts_list = {Timestamp(5, 0xffffffff),
803 Timestamp(9, 0xffffffff)};
804 {
805 ReadOptions read_opts;
806 read_opts.total_order_seek = false;
807 read_opts.prefix_same_as_start = std::get<2>(GetParam());
808 fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(),
809 bbto.filter_policy ? bbto.filter_policy->Name() : "null",
810 static_cast<int>(read_opts.prefix_same_as_start));
811 for (size_t i = 0; i != read_ts_list.size(); ++i) {
812 Slice read_ts = read_ts_list[i];
813 read_opts.timestamp = &read_ts;
814 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
815
816 // Seek to kMaxKey
817 iter->Seek(Key1(kMaxKey));
818 CheckIterUserEntry(iter.get(), Key1(kMaxKey), "value" + std::to_string(i),
819 write_ts_list[i]);
820 iter->Next();
821 ASSERT_FALSE(iter->Valid());
822 }
823 const std::vector<uint64_t> targets = {kMinKey, kMinKey + 0x10,
824 kMinKey + 0x100, kMaxKey};
825 const SliceTransform* const pe = options.prefix_extractor.get();
826 ASSERT_NE(nullptr, pe);
827 const size_t kPrefixShift =
828 8 * (Key1(0).size() - pe->Transform(Key1(0)).size());
829 const uint64_t kPrefixMask =
830 ~((static_cast<uint64_t>(1) << kPrefixShift) - 1);
831 const uint64_t kNumKeysWithinPrefix =
832 (static_cast<uint64_t>(1) << kPrefixShift);
833 for (size_t i = 0; i != read_ts_list.size(); ++i) {
834 Slice read_ts = read_ts_list[i];
835 read_opts.timestamp = &read_ts;
836 std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
837 for (size_t j = 0; j != targets.size(); ++j) {
838 std::string start_key = Key1(targets[j]);
839 uint64_t expected_ub =
840 (targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix;
841 uint64_t expected_key = targets[j];
842 size_t count = 0;
843 it->Seek(Key1(targets[j]));
844 while (it->Valid()) {
845 std::string saved_prev_key;
846 saved_prev_key.assign(it->key().data(), it->key().size());
847
848 // Out of prefix
849 if (!read_opts.prefix_same_as_start &&
850 pe->Transform(saved_prev_key) != pe->Transform(start_key)) {
851 break;
852 }
853 CheckIterUserEntry(it.get(), Key1(expected_key),
854 "value" + std::to_string(i), write_ts_list[i]);
855 ++count;
856 ++expected_key;
857 it->Next();
858 }
859 ASSERT_EQ(expected_ub - targets[j] + 1, count);
860 }
861 }
862 }
863 Close();
864 }
865
866 // TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g.
867 // NoopTransform.
868 INSTANTIATE_TEST_CASE_P(
869 Timestamp, DBBasicTestWithTimestampPrefixSeek,
870 ::testing::Combine(
871 ::testing::Values(
872 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
873 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
874 std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
875 ::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
876 std::shared_ptr<const FilterPolicy>(
877 NewBloomFilterPolicy(10 /*bits_per_key*/, false)),
878 std::shared_ptr<const FilterPolicy>(
879 NewBloomFilterPolicy(20 /*bits_per_key*/,
880 false))),
881 ::testing::Bool()));
882
883 } // namespace ROCKSDB_NAMESPACE
884
885 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
886 extern "C" {
887 void RegisterCustomObjects(int argc, char** argv);
888 }
889 #else
RegisterCustomObjects(int,char **)890 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
891 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
892
main(int argc,char ** argv)893 int main(int argc, char** argv) {
894 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
895 ::testing::InitGoogleTest(&argc, argv);
896 RegisterCustomObjects(argc, argv);
897 return RUN_ALL_TESTS();
898 }
899