1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #include "rocksdb/perf_context.h"
13 #include "rocksdb/utilities/debug.h"
14 #include "table/block_based/block_based_table_reader.h"
15 #include "table/block_based/block_builder.h"
16 #include "test_util/fault_injection_test_env.h"
17 #if !defined(ROCKSDB_LITE)
18 #include "test_util/sync_point.h"
19 #endif
20
21 namespace ROCKSDB_NAMESPACE {
22
23 class DBBasicTest : public DBTestBase {
24 public:
DBBasicTest()25 DBBasicTest() : DBTestBase("/db_basic_test") {}
26 };
27
TEST_F(DBBasicTest,OpenWhenOpen)28 TEST_F(DBBasicTest, OpenWhenOpen) {
29 Options options = CurrentOptions();
30 options.env = env_;
31 ROCKSDB_NAMESPACE::DB* db2 = nullptr;
32 ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
33
34 ASSERT_EQ(Status::Code::kIOError, s.code());
35 ASSERT_EQ(Status::SubCode::kNone, s.subcode());
36 ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
37
38 delete db2;
39 }
40
41 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,ReadOnlyDB)42 TEST_F(DBBasicTest, ReadOnlyDB) {
43 ASSERT_OK(Put("foo", "v1"));
44 ASSERT_OK(Put("bar", "v2"));
45 ASSERT_OK(Put("foo", "v3"));
46 Close();
47
48 auto options = CurrentOptions();
49 assert(options.env == env_);
50 ASSERT_OK(ReadOnlyReopen(options));
51 ASSERT_EQ("v3", Get("foo"));
52 ASSERT_EQ("v2", Get("bar"));
53 Iterator* iter = db_->NewIterator(ReadOptions());
54 int count = 0;
55 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
56 ASSERT_OK(iter->status());
57 ++count;
58 }
59 ASSERT_EQ(count, 2);
60 delete iter;
61 Close();
62
63 // Reopen and flush memtable.
64 Reopen(options);
65 Flush();
66 Close();
67 // Now check keys in read only mode.
68 ASSERT_OK(ReadOnlyReopen(options));
69 ASSERT_EQ("v3", Get("foo"));
70 ASSERT_EQ("v2", Get("bar"));
71 ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
72 }
73
TEST_F(DBBasicTest,ReadOnlyDBWithWriteDBIdToManifestSet)74 TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
75 ASSERT_OK(Put("foo", "v1"));
76 ASSERT_OK(Put("bar", "v2"));
77 ASSERT_OK(Put("foo", "v3"));
78 Close();
79
80 auto options = CurrentOptions();
81 options.write_dbid_to_manifest = true;
82 assert(options.env == env_);
83 ASSERT_OK(ReadOnlyReopen(options));
84 std::string db_id1;
85 db_->GetDbIdentity(db_id1);
86 ASSERT_EQ("v3", Get("foo"));
87 ASSERT_EQ("v2", Get("bar"));
88 Iterator* iter = db_->NewIterator(ReadOptions());
89 int count = 0;
90 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
91 ASSERT_OK(iter->status());
92 ++count;
93 }
94 ASSERT_EQ(count, 2);
95 delete iter;
96 Close();
97
98 // Reopen and flush memtable.
99 Reopen(options);
100 Flush();
101 Close();
102 // Now check keys in read only mode.
103 ASSERT_OK(ReadOnlyReopen(options));
104 ASSERT_EQ("v3", Get("foo"));
105 ASSERT_EQ("v2", Get("bar"));
106 ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
107 std::string db_id2;
108 db_->GetDbIdentity(db_id2);
109 ASSERT_EQ(db_id1, db_id2);
110 }
111
TEST_F(DBBasicTest,CompactedDB)112 TEST_F(DBBasicTest, CompactedDB) {
113 const uint64_t kFileSize = 1 << 20;
114 Options options = CurrentOptions();
115 options.disable_auto_compactions = true;
116 options.write_buffer_size = kFileSize;
117 options.target_file_size_base = kFileSize;
118 options.max_bytes_for_level_base = 1 << 30;
119 options.compression = kNoCompression;
120 Reopen(options);
121 // 1 L0 file, use CompactedDB if max_open_files = -1
122 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
123 Flush();
124 Close();
125 ASSERT_OK(ReadOnlyReopen(options));
126 Status s = Put("new", "value");
127 ASSERT_EQ(s.ToString(),
128 "Not implemented: Not supported operation in read only mode.");
129 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
130 Close();
131 options.max_open_files = -1;
132 ASSERT_OK(ReadOnlyReopen(options));
133 s = Put("new", "value");
134 ASSERT_EQ(s.ToString(),
135 "Not implemented: Not supported in compacted db mode.");
136 ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
137 Close();
138 Reopen(options);
139 // Add more L0 files
140 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
141 Flush();
142 ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
143 Flush();
144 ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
145 ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
146 Flush();
147 Close();
148
149 ASSERT_OK(ReadOnlyReopen(options));
150 // Fallback to read-only DB
151 s = Put("new", "value");
152 ASSERT_EQ(s.ToString(),
153 "Not implemented: Not supported operation in read only mode.");
154 Close();
155
156 // Full compaction
157 Reopen(options);
158 // Add more keys
159 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
160 ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
161 ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
162 ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
163 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
164 ASSERT_EQ(3, NumTableFilesAtLevel(1));
165 Close();
166
167 // CompactedDB
168 ASSERT_OK(ReadOnlyReopen(options));
169 s = Put("new", "value");
170 ASSERT_EQ(s.ToString(),
171 "Not implemented: Not supported in compacted db mode.");
172 ASSERT_EQ("NOT_FOUND", Get("abc"));
173 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
174 ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
175 ASSERT_EQ("NOT_FOUND", Get("ccc"));
176 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
177 ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
178 ASSERT_EQ("NOT_FOUND", Get("ggg"));
179 ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
180 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
181 ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
182 ASSERT_EQ("NOT_FOUND", Get("kkk"));
183
184 // MultiGet
185 std::vector<std::string> values;
186 std::vector<Status> status_list = dbfull()->MultiGet(
187 ReadOptions(),
188 std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
189 Slice("ggg"), Slice("iii"), Slice("kkk")}),
190 &values);
191 ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
192 ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
193 ASSERT_OK(status_list[0]);
194 ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
195 ASSERT_TRUE(status_list[1].IsNotFound());
196 ASSERT_OK(status_list[2]);
197 ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
198 ASSERT_TRUE(status_list[3].IsNotFound());
199 ASSERT_OK(status_list[4]);
200 ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
201 ASSERT_TRUE(status_list[5].IsNotFound());
202
203 Reopen(options);
204 // Add a key
205 ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
206 Close();
207 ASSERT_OK(ReadOnlyReopen(options));
208 s = Put("new", "value");
209 ASSERT_EQ(s.ToString(),
210 "Not implemented: Not supported operation in read only mode.");
211 }
212
TEST_F(DBBasicTest,LevelLimitReopen)213 TEST_F(DBBasicTest, LevelLimitReopen) {
214 Options options = CurrentOptions();
215 CreateAndReopenWithCF({"pikachu"}, options);
216
217 const std::string value(1024 * 1024, ' ');
218 int i = 0;
219 while (NumTableFilesAtLevel(2, 1) == 0) {
220 ASSERT_OK(Put(1, Key(i++), value));
221 dbfull()->TEST_WaitForFlushMemTable();
222 dbfull()->TEST_WaitForCompact();
223 }
224
225 options.num_levels = 1;
226 options.max_bytes_for_level_multiplier_additional.resize(1, 1);
227 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
228 ASSERT_EQ(s.IsInvalidArgument(), true);
229 ASSERT_EQ(s.ToString(),
230 "Invalid argument: db has more levels than options.num_levels");
231
232 options.num_levels = 10;
233 options.max_bytes_for_level_multiplier_additional.resize(10, 1);
234 ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
235 }
236 #endif // ROCKSDB_LITE
237
TEST_F(DBBasicTest,PutDeleteGet)238 TEST_F(DBBasicTest, PutDeleteGet) {
239 do {
240 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
241 ASSERT_OK(Put(1, "foo", "v1"));
242 ASSERT_EQ("v1", Get(1, "foo"));
243 ASSERT_OK(Put(1, "foo", "v2"));
244 ASSERT_EQ("v2", Get(1, "foo"));
245 ASSERT_OK(Delete(1, "foo"));
246 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
247 } while (ChangeOptions());
248 }
249
TEST_F(DBBasicTest,PutSingleDeleteGet)250 TEST_F(DBBasicTest, PutSingleDeleteGet) {
251 do {
252 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
253 ASSERT_OK(Put(1, "foo", "v1"));
254 ASSERT_EQ("v1", Get(1, "foo"));
255 ASSERT_OK(Put(1, "foo2", "v2"));
256 ASSERT_EQ("v2", Get(1, "foo2"));
257 ASSERT_OK(SingleDelete(1, "foo"));
258 ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
259 // Ski FIFO and universal compaction because they do not apply to the test
260 // case. Skip MergePut because single delete does not get removed when it
261 // encounters a merge.
262 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
263 kSkipMergePut));
264 }
265
TEST_F(DBBasicTest,EmptyFlush)266 TEST_F(DBBasicTest, EmptyFlush) {
267 // It is possible to produce empty flushes when using single deletes. Tests
268 // whether empty flushes cause issues.
269 do {
270 Random rnd(301);
271
272 Options options = CurrentOptions();
273 options.disable_auto_compactions = true;
274 CreateAndReopenWithCF({"pikachu"}, options);
275
276 Put(1, "a", Slice());
277 SingleDelete(1, "a");
278 ASSERT_OK(Flush(1));
279
280 ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
281 // Skip FIFO and universal compaction as they do not apply to the test
282 // case. Skip MergePut because merges cannot be combined with single
283 // deletions.
284 } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
285 kSkipMergePut));
286 }
287
TEST_F(DBBasicTest,GetFromVersions)288 TEST_F(DBBasicTest, GetFromVersions) {
289 do {
290 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
291 ASSERT_OK(Put(1, "foo", "v1"));
292 ASSERT_OK(Flush(1));
293 ASSERT_EQ("v1", Get(1, "foo"));
294 ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
295 } while (ChangeOptions());
296 }
297
298 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,GetSnapshot)299 TEST_F(DBBasicTest, GetSnapshot) {
300 anon::OptionsOverride options_override;
301 options_override.skip_policy = kSkipNoSnapshot;
302 do {
303 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
304 // Try with both a short key and a long key
305 for (int i = 0; i < 2; i++) {
306 std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
307 ASSERT_OK(Put(1, key, "v1"));
308 const Snapshot* s1 = db_->GetSnapshot();
309 ASSERT_OK(Put(1, key, "v2"));
310 ASSERT_EQ("v2", Get(1, key));
311 ASSERT_EQ("v1", Get(1, key, s1));
312 ASSERT_OK(Flush(1));
313 ASSERT_EQ("v2", Get(1, key));
314 ASSERT_EQ("v1", Get(1, key, s1));
315 db_->ReleaseSnapshot(s1);
316 }
317 } while (ChangeOptions());
318 }
319 #endif // ROCKSDB_LITE
320
TEST_F(DBBasicTest,CheckLock)321 TEST_F(DBBasicTest, CheckLock) {
322 do {
323 DB* localdb;
324 Options options = CurrentOptions();
325 ASSERT_OK(TryReopen(options));
326
327 // second open should fail
328 Status s = DB::Open(options, dbname_, &localdb);
329 ASSERT_NOK(s);
330 #ifdef OS_LINUX
331 ASSERT_TRUE(s.ToString().find("lock hold by current process") !=
332 std::string::npos);
333 #endif // OS_LINUX
334 } while (ChangeCompactOptions());
335 }
336
TEST_F(DBBasicTest,FlushMultipleMemtable)337 TEST_F(DBBasicTest, FlushMultipleMemtable) {
338 do {
339 Options options = CurrentOptions();
340 WriteOptions writeOpt = WriteOptions();
341 writeOpt.disableWAL = true;
342 options.max_write_buffer_number = 4;
343 options.min_write_buffer_number_to_merge = 3;
344 options.max_write_buffer_size_to_maintain = -1;
345 CreateAndReopenWithCF({"pikachu"}, options);
346 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
347 ASSERT_OK(Flush(1));
348 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
349
350 ASSERT_EQ("v1", Get(1, "foo"));
351 ASSERT_EQ("v1", Get(1, "bar"));
352 ASSERT_OK(Flush(1));
353 } while (ChangeCompactOptions());
354 }
355
TEST_F(DBBasicTest,FlushEmptyColumnFamily)356 TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
357 // Block flush thread and disable compaction thread
358 env_->SetBackgroundThreads(1, Env::HIGH);
359 env_->SetBackgroundThreads(1, Env::LOW);
360 test::SleepingBackgroundTask sleeping_task_low;
361 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
362 Env::Priority::LOW);
363 test::SleepingBackgroundTask sleeping_task_high;
364 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
365 &sleeping_task_high, Env::Priority::HIGH);
366
367 Options options = CurrentOptions();
368 // disable compaction
369 options.disable_auto_compactions = true;
370 WriteOptions writeOpt = WriteOptions();
371 writeOpt.disableWAL = true;
372 options.max_write_buffer_number = 2;
373 options.min_write_buffer_number_to_merge = 1;
374 options.max_write_buffer_size_to_maintain =
375 static_cast<int64_t>(options.write_buffer_size);
376 CreateAndReopenWithCF({"pikachu"}, options);
377
378 // Compaction can still go through even if no thread can flush the
379 // mem table.
380 ASSERT_OK(Flush(0));
381 ASSERT_OK(Flush(1));
382
383 // Insert can go through
384 ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
385 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
386
387 ASSERT_EQ("v1", Get(0, "foo"));
388 ASSERT_EQ("v1", Get(1, "bar"));
389
390 sleeping_task_high.WakeUp();
391 sleeping_task_high.WaitUntilDone();
392
393 // Flush can still go through.
394 ASSERT_OK(Flush(0));
395 ASSERT_OK(Flush(1));
396
397 sleeping_task_low.WakeUp();
398 sleeping_task_low.WaitUntilDone();
399 }
400
TEST_F(DBBasicTest,Flush)401 TEST_F(DBBasicTest, Flush) {
402 do {
403 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
404 WriteOptions writeOpt = WriteOptions();
405 writeOpt.disableWAL = true;
406 SetPerfLevel(kEnableTime);
407 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
408 // this will now also flush the last 2 writes
409 ASSERT_OK(Flush(1));
410 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
411
412 get_perf_context()->Reset();
413 Get(1, "foo");
414 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
415 ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
416
417 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
418 ASSERT_EQ("v1", Get(1, "foo"));
419 ASSERT_EQ("v1", Get(1, "bar"));
420
421 writeOpt.disableWAL = true;
422 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
423 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
424 ASSERT_OK(Flush(1));
425
426 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
427 ASSERT_EQ("v2", Get(1, "bar"));
428 get_perf_context()->Reset();
429 ASSERT_EQ("v2", Get(1, "foo"));
430 ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
431
432 writeOpt.disableWAL = false;
433 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
434 ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
435 ASSERT_OK(Flush(1));
436
437 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
438 // 'foo' should be there because its put
439 // has WAL enabled.
440 ASSERT_EQ("v3", Get(1, "foo"));
441 ASSERT_EQ("v3", Get(1, "bar"));
442
443 SetPerfLevel(kDisable);
444 } while (ChangeCompactOptions());
445 }
446
TEST_F(DBBasicTest,ManifestRollOver)447 TEST_F(DBBasicTest, ManifestRollOver) {
448 do {
449 Options options;
450 options.max_manifest_file_size = 10; // 10 bytes
451 options = CurrentOptions(options);
452 CreateAndReopenWithCF({"pikachu"}, options);
453 {
454 ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
455 ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
456 ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
457 uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
458 ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
459 uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
460 ASSERT_GT(manifest_after_flush, manifest_before_flush);
461 ReopenWithColumnFamilies({"default", "pikachu"}, options);
462 ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
463 // check if a new manifest file got inserted or not.
464 ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
465 ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
466 ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
467 }
468 } while (ChangeCompactOptions());
469 }
470
TEST_F(DBBasicTest,IdentityAcrossRestarts1)471 TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
472 do {
473 std::string id1;
474 ASSERT_OK(db_->GetDbIdentity(id1));
475
476 Options options = CurrentOptions();
477 Reopen(options);
478 std::string id2;
479 ASSERT_OK(db_->GetDbIdentity(id2));
480 // id1 should match id2 because identity was not regenerated
481 ASSERT_EQ(id1.compare(id2), 0);
482
483 std::string idfilename = IdentityFileName(dbname_);
484 ASSERT_OK(env_->DeleteFile(idfilename));
485 Reopen(options);
486 std::string id3;
487 ASSERT_OK(db_->GetDbIdentity(id3));
488 if (options.write_dbid_to_manifest) {
489 ASSERT_EQ(id1.compare(id3), 0);
490 } else {
491 // id1 should NOT match id3 because identity was regenerated
492 ASSERT_NE(id1.compare(id3), 0);
493 }
494 } while (ChangeCompactOptions());
495 }
496
TEST_F(DBBasicTest,IdentityAcrossRestarts2)497 TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
498 do {
499 std::string id1;
500 ASSERT_OK(db_->GetDbIdentity(id1));
501
502 Options options = CurrentOptions();
503 options.write_dbid_to_manifest = true;
504 Reopen(options);
505 std::string id2;
506 ASSERT_OK(db_->GetDbIdentity(id2));
507 // id1 should match id2 because identity was not regenerated
508 ASSERT_EQ(id1.compare(id2), 0);
509
510 std::string idfilename = IdentityFileName(dbname_);
511 ASSERT_OK(env_->DeleteFile(idfilename));
512 Reopen(options);
513 std::string id3;
514 ASSERT_OK(db_->GetDbIdentity(id3));
515 // id1 should NOT match id3 because identity was regenerated
516 ASSERT_EQ(id1, id3);
517 } while (ChangeCompactOptions());
518 }
519
520 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,Snapshot)521 TEST_F(DBBasicTest, Snapshot) {
522 anon::OptionsOverride options_override;
523 options_override.skip_policy = kSkipNoSnapshot;
524 do {
525 CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
526 Put(0, "foo", "0v1");
527 Put(1, "foo", "1v1");
528
529 const Snapshot* s1 = db_->GetSnapshot();
530 ASSERT_EQ(1U, GetNumSnapshots());
531 uint64_t time_snap1 = GetTimeOldestSnapshots();
532 ASSERT_GT(time_snap1, 0U);
533 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
534 Put(0, "foo", "0v2");
535 Put(1, "foo", "1v2");
536
537 env_->addon_time_.fetch_add(1);
538
539 const Snapshot* s2 = db_->GetSnapshot();
540 ASSERT_EQ(2U, GetNumSnapshots());
541 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
542 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
543 Put(0, "foo", "0v3");
544 Put(1, "foo", "1v3");
545
546 {
547 ManagedSnapshot s3(db_);
548 ASSERT_EQ(3U, GetNumSnapshots());
549 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
550 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
551
552 Put(0, "foo", "0v4");
553 Put(1, "foo", "1v4");
554 ASSERT_EQ("0v1", Get(0, "foo", s1));
555 ASSERT_EQ("1v1", Get(1, "foo", s1));
556 ASSERT_EQ("0v2", Get(0, "foo", s2));
557 ASSERT_EQ("1v2", Get(1, "foo", s2));
558 ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
559 ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
560 ASSERT_EQ("0v4", Get(0, "foo"));
561 ASSERT_EQ("1v4", Get(1, "foo"));
562 }
563
564 ASSERT_EQ(2U, GetNumSnapshots());
565 ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
566 ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
567 ASSERT_EQ("0v1", Get(0, "foo", s1));
568 ASSERT_EQ("1v1", Get(1, "foo", s1));
569 ASSERT_EQ("0v2", Get(0, "foo", s2));
570 ASSERT_EQ("1v2", Get(1, "foo", s2));
571 ASSERT_EQ("0v4", Get(0, "foo"));
572 ASSERT_EQ("1v4", Get(1, "foo"));
573
574 db_->ReleaseSnapshot(s1);
575 ASSERT_EQ("0v2", Get(0, "foo", s2));
576 ASSERT_EQ("1v2", Get(1, "foo", s2));
577 ASSERT_EQ("0v4", Get(0, "foo"));
578 ASSERT_EQ("1v4", Get(1, "foo"));
579 ASSERT_EQ(1U, GetNumSnapshots());
580 ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
581 ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
582
583 db_->ReleaseSnapshot(s2);
584 ASSERT_EQ(0U, GetNumSnapshots());
585 ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
586 ASSERT_EQ("0v4", Get(0, "foo"));
587 ASSERT_EQ("1v4", Get(1, "foo"));
588 } while (ChangeOptions());
589 }
590
591 #endif // ROCKSDB_LITE
592
TEST_F(DBBasicTest,CompactBetweenSnapshots)593 TEST_F(DBBasicTest, CompactBetweenSnapshots) {
594 anon::OptionsOverride options_override;
595 options_override.skip_policy = kSkipNoSnapshot;
596 do {
597 Options options = CurrentOptions(options_override);
598 options.disable_auto_compactions = true;
599 CreateAndReopenWithCF({"pikachu"}, options);
600 Random rnd(301);
601 FillLevels("a", "z", 1);
602
603 Put(1, "foo", "first");
604 const Snapshot* snapshot1 = db_->GetSnapshot();
605 Put(1, "foo", "second");
606 Put(1, "foo", "third");
607 Put(1, "foo", "fourth");
608 const Snapshot* snapshot2 = db_->GetSnapshot();
609 Put(1, "foo", "fifth");
610 Put(1, "foo", "sixth");
611
612 // All entries (including duplicates) exist
613 // before any compaction or flush is triggered.
614 ASSERT_EQ(AllEntriesFor("foo", 1),
615 "[ sixth, fifth, fourth, third, second, first ]");
616 ASSERT_EQ("sixth", Get(1, "foo"));
617 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
618 ASSERT_EQ("first", Get(1, "foo", snapshot1));
619
620 // After a flush, "second", "third" and "fifth" should
621 // be removed
622 ASSERT_OK(Flush(1));
623 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
624
625 // after we release the snapshot1, only two values left
626 db_->ReleaseSnapshot(snapshot1);
627 FillLevels("a", "z", 1);
628 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
629 nullptr);
630
631 // We have only one valid snapshot snapshot2. Since snapshot1 is
632 // not valid anymore, "first" should be removed by a compaction.
633 ASSERT_EQ("sixth", Get(1, "foo"));
634 ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
635 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
636
637 // after we release the snapshot2, only one value should be left
638 db_->ReleaseSnapshot(snapshot2);
639 FillLevels("a", "z", 1);
640 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
641 nullptr);
642 ASSERT_EQ("sixth", Get(1, "foo"));
643 ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
644 } while (ChangeOptions(kSkipFIFOCompaction));
645 }
646
TEST_F(DBBasicTest,DBOpen_Options)647 TEST_F(DBBasicTest, DBOpen_Options) {
648 Options options = CurrentOptions();
649 Close();
650 Destroy(options);
651
652 // Does not exist, and create_if_missing == false: error
653 DB* db = nullptr;
654 options.create_if_missing = false;
655 Status s = DB::Open(options, dbname_, &db);
656 ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
657 ASSERT_TRUE(db == nullptr);
658
659 // Does not exist, and create_if_missing == true: OK
660 options.create_if_missing = true;
661 s = DB::Open(options, dbname_, &db);
662 ASSERT_OK(s);
663 ASSERT_TRUE(db != nullptr);
664
665 delete db;
666 db = nullptr;
667
668 // Does exist, and error_if_exists == true: error
669 options.create_if_missing = false;
670 options.error_if_exists = true;
671 s = DB::Open(options, dbname_, &db);
672 ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
673 ASSERT_TRUE(db == nullptr);
674
675 // Does exist, and error_if_exists == false: OK
676 options.create_if_missing = true;
677 options.error_if_exists = false;
678 s = DB::Open(options, dbname_, &db);
679 ASSERT_OK(s);
680 ASSERT_TRUE(db != nullptr);
681
682 delete db;
683 db = nullptr;
684 }
685
TEST_F(DBBasicTest,CompactOnFlush)686 TEST_F(DBBasicTest, CompactOnFlush) {
687 anon::OptionsOverride options_override;
688 options_override.skip_policy = kSkipNoSnapshot;
689 do {
690 Options options = CurrentOptions(options_override);
691 options.disable_auto_compactions = true;
692 CreateAndReopenWithCF({"pikachu"}, options);
693
694 Put(1, "foo", "v1");
695 ASSERT_OK(Flush(1));
696 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
697
698 // Write two new keys
699 Put(1, "a", "begin");
700 Put(1, "z", "end");
701 Flush(1);
702
703 // Case1: Delete followed by a put
704 Delete(1, "foo");
705 Put(1, "foo", "v2");
706 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
707
708 // After the current memtable is flushed, the DEL should
709 // have been removed
710 ASSERT_OK(Flush(1));
711 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
712
713 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
714 nullptr);
715 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
716
717 // Case 2: Delete followed by another delete
718 Delete(1, "foo");
719 Delete(1, "foo");
720 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
721 ASSERT_OK(Flush(1));
722 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
723 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
724 nullptr);
725 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
726
727 // Case 3: Put followed by a delete
728 Put(1, "foo", "v3");
729 Delete(1, "foo");
730 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
731 ASSERT_OK(Flush(1));
732 ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
733 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
734 nullptr);
735 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
736
737 // Case 4: Put followed by another Put
738 Put(1, "foo", "v4");
739 Put(1, "foo", "v5");
740 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
741 ASSERT_OK(Flush(1));
742 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
743 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
744 nullptr);
745 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
746
747 // clear database
748 Delete(1, "foo");
749 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
750 nullptr);
751 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
752
753 // Case 5: Put followed by snapshot followed by another Put
754 // Both puts should remain.
755 Put(1, "foo", "v6");
756 const Snapshot* snapshot = db_->GetSnapshot();
757 Put(1, "foo", "v7");
758 ASSERT_OK(Flush(1));
759 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
760 db_->ReleaseSnapshot(snapshot);
761
762 // clear database
763 Delete(1, "foo");
764 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
765 nullptr);
766 ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
767
768 // Case 5: snapshot followed by a put followed by another Put
769 // Only the last put should remain.
770 const Snapshot* snapshot1 = db_->GetSnapshot();
771 Put(1, "foo", "v8");
772 Put(1, "foo", "v9");
773 ASSERT_OK(Flush(1));
774 ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
775 db_->ReleaseSnapshot(snapshot1);
776 } while (ChangeCompactOptions());
777 }
778
TEST_F(DBBasicTest,FlushOneColumnFamily)779 TEST_F(DBBasicTest, FlushOneColumnFamily) {
780 Options options = CurrentOptions();
781 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
782 "alyosha", "popovich"},
783 options);
784
785 ASSERT_OK(Put(0, "Default", "Default"));
786 ASSERT_OK(Put(1, "pikachu", "pikachu"));
787 ASSERT_OK(Put(2, "ilya", "ilya"));
788 ASSERT_OK(Put(3, "muromec", "muromec"));
789 ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
790 ASSERT_OK(Put(5, "nikitich", "nikitich"));
791 ASSERT_OK(Put(6, "alyosha", "alyosha"));
792 ASSERT_OK(Put(7, "popovich", "popovich"));
793
794 for (int i = 0; i < 8; ++i) {
795 Flush(i);
796 auto tables = ListTableFiles(env_, dbname_);
797 ASSERT_EQ(tables.size(), i + 1U);
798 }
799 }
800
TEST_F(DBBasicTest,MultiGetSimple)801 TEST_F(DBBasicTest, MultiGetSimple) {
802 do {
803 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
804 SetPerfLevel(kEnableCount);
805 ASSERT_OK(Put(1, "k1", "v1"));
806 ASSERT_OK(Put(1, "k2", "v2"));
807 ASSERT_OK(Put(1, "k3", "v3"));
808 ASSERT_OK(Put(1, "k4", "v4"));
809 ASSERT_OK(Delete(1, "k4"));
810 ASSERT_OK(Put(1, "k5", "v5"));
811 ASSERT_OK(Delete(1, "no_key"));
812
813 std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
814
815 std::vector<std::string> values(20, "Temporary data to be overwritten");
816 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
817
818 get_perf_context()->Reset();
819 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
820 ASSERT_EQ(values.size(), keys.size());
821 ASSERT_EQ(values[0], "v1");
822 ASSERT_EQ(values[1], "v2");
823 ASSERT_EQ(values[2], "v3");
824 ASSERT_EQ(values[4], "v5");
825 // four kv pairs * two bytes per value
826 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
827
828 ASSERT_OK(s[0]);
829 ASSERT_OK(s[1]);
830 ASSERT_OK(s[2]);
831 ASSERT_TRUE(s[3].IsNotFound());
832 ASSERT_OK(s[4]);
833 ASSERT_TRUE(s[5].IsNotFound());
834 SetPerfLevel(kDisable);
835 } while (ChangeCompactOptions());
836 }
837
TEST_F(DBBasicTest,MultiGetEmpty)838 TEST_F(DBBasicTest, MultiGetEmpty) {
839 do {
840 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
841 // Empty Key Set
842 std::vector<Slice> keys;
843 std::vector<std::string> values;
844 std::vector<ColumnFamilyHandle*> cfs;
845 std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
846 ASSERT_EQ(s.size(), 0U);
847
848 // Empty Database, Empty Key Set
849 Options options = CurrentOptions();
850 options.create_if_missing = true;
851 DestroyAndReopen(options);
852 CreateAndReopenWithCF({"pikachu"}, options);
853 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
854 ASSERT_EQ(s.size(), 0U);
855
856 // Empty Database, Search for Keys
857 keys.resize(2);
858 keys[0] = "a";
859 keys[1] = "b";
860 cfs.push_back(handles_[0]);
861 cfs.push_back(handles_[1]);
862 s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
863 ASSERT_EQ(static_cast<int>(s.size()), 2);
864 ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
865 } while (ChangeCompactOptions());
866 }
867
TEST_F(DBBasicTest,ChecksumTest)868 TEST_F(DBBasicTest, ChecksumTest) {
869 BlockBasedTableOptions table_options;
870 Options options = CurrentOptions();
871 // change when new checksum type added
872 int max_checksum = static_cast<int>(kxxHash64);
873 const int kNumPerFile = 2;
874
875 // generate one table with each type of checksum
876 for (int i = 0; i <= max_checksum; ++i) {
877 table_options.checksum = static_cast<ChecksumType>(i);
878 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
879 Reopen(options);
880 for (int j = 0; j < kNumPerFile; ++j) {
881 ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
882 }
883 ASSERT_OK(Flush());
884 }
885
886 // with each valid checksum type setting...
887 for (int i = 0; i <= max_checksum; ++i) {
888 table_options.checksum = static_cast<ChecksumType>(i);
889 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
890 Reopen(options);
891 // verify every type of checksum (should be regardless of that setting)
892 for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
893 ASSERT_EQ(Key(j), Get(Key(j)));
894 }
895 }
896 }
897
898 // On Windows you can have either memory mapped file or a file
899 // with unbuffered access. So this asserts and does not make
900 // sense to run
901 #ifndef OS_WIN
TEST_F(DBBasicTest,MmapAndBufferOptions)902 TEST_F(DBBasicTest, MmapAndBufferOptions) {
903 if (!IsMemoryMappedAccessSupported()) {
904 return;
905 }
906 Options options = CurrentOptions();
907
908 options.use_direct_reads = true;
909 options.allow_mmap_reads = true;
910 ASSERT_NOK(TryReopen(options));
911
912 // All other combinations are acceptable
913 options.use_direct_reads = false;
914 ASSERT_OK(TryReopen(options));
915
916 if (IsDirectIOSupported()) {
917 options.use_direct_reads = true;
918 options.allow_mmap_reads = false;
919 ASSERT_OK(TryReopen(options));
920 }
921
922 options.use_direct_reads = false;
923 ASSERT_OK(TryReopen(options));
924 }
925 #endif
926
927 class TestEnv : public EnvWrapper {
928 public:
TestEnv(Env * base_env)929 explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
930
931 class TestLogger : public Logger {
932 public:
933 using Logger::Logv;
TestLogger(TestEnv * env_ptr)934 explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger()935 ~TestLogger() override {
936 if (!closed_) {
937 CloseHelper();
938 }
939 }
Logv(const char *,va_list)940 void Logv(const char* /*format*/, va_list /*ap*/) override {}
941
942 protected:
CloseImpl()943 Status CloseImpl() override { return CloseHelper(); }
944
945 private:
CloseHelper()946 Status CloseHelper() {
947 env->CloseCountInc();
948 ;
949 return Status::IOError();
950 }
951 TestEnv* env;
952 };
953
CloseCountInc()954 void CloseCountInc() { close_count++; }
955
GetCloseCount()956 int GetCloseCount() { return close_count; }
957
NewLogger(const std::string &,std::shared_ptr<Logger> * result)958 Status NewLogger(const std::string& /*fname*/,
959 std::shared_ptr<Logger>* result) override {
960 result->reset(new TestLogger(this));
961 return Status::OK();
962 }
963
964 private:
965 int close_count;
966 };
967
TEST_F(DBBasicTest,DBClose)968 TEST_F(DBBasicTest, DBClose) {
969 Options options = GetDefaultOptions();
970 std::string dbname = test::PerThreadDBPath("db_close_test");
971 ASSERT_OK(DestroyDB(dbname, options));
972
973 DB* db = nullptr;
974 TestEnv* env = new TestEnv(env_);
975 std::unique_ptr<TestEnv> local_env_guard(env);
976 options.create_if_missing = true;
977 options.env = env;
978 Status s = DB::Open(options, dbname, &db);
979 ASSERT_OK(s);
980 ASSERT_TRUE(db != nullptr);
981
982 s = db->Close();
983 ASSERT_EQ(env->GetCloseCount(), 1);
984 ASSERT_EQ(s, Status::IOError());
985
986 delete db;
987 ASSERT_EQ(env->GetCloseCount(), 1);
988
989 // Do not call DB::Close() and ensure our logger Close() still gets called
990 s = DB::Open(options, dbname, &db);
991 ASSERT_OK(s);
992 ASSERT_TRUE(db != nullptr);
993 delete db;
994 ASSERT_EQ(env->GetCloseCount(), 2);
995
996 // Provide our own logger and ensure DB::Close() does not close it
997 options.info_log.reset(new TestEnv::TestLogger(env));
998 options.create_if_missing = false;
999 s = DB::Open(options, dbname, &db);
1000 ASSERT_OK(s);
1001 ASSERT_TRUE(db != nullptr);
1002
1003 s = db->Close();
1004 ASSERT_EQ(s, Status::OK());
1005 delete db;
1006 ASSERT_EQ(env->GetCloseCount(), 2);
1007 options.info_log.reset();
1008 ASSERT_EQ(env->GetCloseCount(), 3);
1009 }
1010
TEST_F(DBBasicTest,DBCloseFlushError)1011 TEST_F(DBBasicTest, DBCloseFlushError) {
1012 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1013 new FaultInjectionTestEnv(env_));
1014 Options options = GetDefaultOptions();
1015 options.create_if_missing = true;
1016 options.manual_wal_flush = true;
1017 options.write_buffer_size = 100;
1018 options.env = fault_injection_env.get();
1019
1020 Reopen(options);
1021 ASSERT_OK(Put("key1", "value1"));
1022 ASSERT_OK(Put("key2", "value2"));
1023 ASSERT_OK(dbfull()->TEST_SwitchMemtable());
1024 ASSERT_OK(Put("key3", "value3"));
1025 fault_injection_env->SetFilesystemActive(false);
1026 Status s = dbfull()->Close();
1027 fault_injection_env->SetFilesystemActive(true);
1028 ASSERT_NE(s, Status::OK());
1029
1030 Destroy(options);
1031 }
1032
1033 class DBMultiGetTestWithParam : public DBBasicTest,
1034 public testing::WithParamInterface<bool> {};
1035
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCF)1036 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
1037 Options options = CurrentOptions();
1038 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1039 "alyosha", "popovich"},
1040 options);
1041 // <CF, key, value> tuples
1042 std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
1043 static const int num_keys = 24;
1044 cf_kv_vec.reserve(num_keys);
1045
1046 for (int i = 0; i < num_keys; ++i) {
1047 int cf = i / 3;
1048 int cf_key = 1 % 3;
1049 cf_kv_vec.emplace_back(std::make_tuple(
1050 cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
1051 "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
1052 ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1053 std::get<2>(cf_kv_vec[i])));
1054 }
1055
1056 int get_sv_count = 0;
1057 ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1058 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1059 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1060 if (++get_sv_count == 2) {
1061 // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
1062 // is forced to repeat the process
1063 for (int i = 0; i < num_keys; ++i) {
1064 int cf = i / 3;
1065 int cf_key = i % 8;
1066 if (cf_key == 0) {
1067 ASSERT_OK(Flush(cf));
1068 }
1069 ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1070 std::get<2>(cf_kv_vec[i]) + "_2"));
1071 }
1072 }
1073 if (get_sv_count == 11) {
1074 for (int i = 0; i < 8; ++i) {
1075 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1076 db->GetColumnFamilyHandle(i))
1077 ->cfd();
1078 ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1079 }
1080 }
1081 });
1082 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1083
1084 std::vector<int> cfs;
1085 std::vector<std::string> keys;
1086 std::vector<std::string> values;
1087
1088 for (int i = 0; i < num_keys; ++i) {
1089 cfs.push_back(std::get<0>(cf_kv_vec[i]));
1090 keys.push_back(std::get<1>(cf_kv_vec[i]));
1091 }
1092
1093 values = MultiGet(cfs, keys, nullptr, GetParam());
1094 ASSERT_EQ(values.size(), num_keys);
1095 for (unsigned int j = 0; j < values.size(); ++j) {
1096 ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
1097 }
1098
1099 keys.clear();
1100 cfs.clear();
1101 cfs.push_back(std::get<0>(cf_kv_vec[0]));
1102 keys.push_back(std::get<1>(cf_kv_vec[0]));
1103 cfs.push_back(std::get<0>(cf_kv_vec[3]));
1104 keys.push_back(std::get<1>(cf_kv_vec[3]));
1105 cfs.push_back(std::get<0>(cf_kv_vec[4]));
1106 keys.push_back(std::get<1>(cf_kv_vec[4]));
1107 values = MultiGet(cfs, keys, nullptr, GetParam());
1108 ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
1109 ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
1110 ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
1111
1112 keys.clear();
1113 cfs.clear();
1114 cfs.push_back(std::get<0>(cf_kv_vec[7]));
1115 keys.push_back(std::get<1>(cf_kv_vec[7]));
1116 cfs.push_back(std::get<0>(cf_kv_vec[6]));
1117 keys.push_back(std::get<1>(cf_kv_vec[6]));
1118 cfs.push_back(std::get<0>(cf_kv_vec[1]));
1119 keys.push_back(std::get<1>(cf_kv_vec[1]));
1120 values = MultiGet(cfs, keys, nullptr, GetParam());
1121 ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
1122 ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
1123 ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
1124
1125 for (int cf = 0; cf < 8; ++cf) {
1126 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1127 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
1128 ->cfd();
1129 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1130 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
1131 }
1132 }
1133
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFMutex)1134 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
1135 Options options = CurrentOptions();
1136 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1137 "alyosha", "popovich"},
1138 options);
1139
1140 for (int i = 0; i < 8; ++i) {
1141 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1142 "cf" + std::to_string(i) + "_val"));
1143 }
1144
1145 int get_sv_count = 0;
1146 int retries = 0;
1147 bool last_try = false;
1148 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1149 "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1150 last_try = true;
1151 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1152 });
1153 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1154 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1155 if (last_try) {
1156 return;
1157 }
1158 if (++get_sv_count == 2) {
1159 ++retries;
1160 get_sv_count = 0;
1161 for (int i = 0; i < 8; ++i) {
1162 ASSERT_OK(Flush(i));
1163 ASSERT_OK(Put(
1164 i, "cf" + std::to_string(i) + "_key",
1165 "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
1166 }
1167 }
1168 });
1169 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1170
1171 std::vector<int> cfs;
1172 std::vector<std::string> keys;
1173 std::vector<std::string> values;
1174
1175 for (int i = 0; i < 8; ++i) {
1176 cfs.push_back(i);
1177 keys.push_back("cf" + std::to_string(i) + "_key");
1178 }
1179
1180 values = MultiGet(cfs, keys, nullptr, GetParam());
1181 ASSERT_TRUE(last_try);
1182 ASSERT_EQ(values.size(), 8);
1183 for (unsigned int j = 0; j < values.size(); ++j) {
1184 ASSERT_EQ(values[j],
1185 "cf" + std::to_string(j) + "_val" + std::to_string(retries));
1186 }
1187 for (int i = 0; i < 8; ++i) {
1188 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1189 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1190 ->cfd();
1191 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1192 }
1193 }
1194
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFSnapshot)1195 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
1196 Options options = CurrentOptions();
1197 CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1198 "alyosha", "popovich"},
1199 options);
1200
1201 for (int i = 0; i < 8; ++i) {
1202 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1203 "cf" + std::to_string(i) + "_val"));
1204 }
1205
1206 int get_sv_count = 0;
1207 ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1208 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1209 "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1210 if (++get_sv_count == 2) {
1211 for (int i = 0; i < 8; ++i) {
1212 ASSERT_OK(Flush(i));
1213 ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1214 "cf" + std::to_string(i) + "_val2"));
1215 }
1216 }
1217 if (get_sv_count == 8) {
1218 for (int i = 0; i < 8; ++i) {
1219 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1220 db->GetColumnFamilyHandle(i))
1221 ->cfd();
1222 ASSERT_TRUE(
1223 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
1224 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
1225 }
1226 }
1227 });
1228 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1229
1230 std::vector<int> cfs;
1231 std::vector<std::string> keys;
1232 std::vector<std::string> values;
1233
1234 for (int i = 0; i < 8; ++i) {
1235 cfs.push_back(i);
1236 keys.push_back("cf" + std::to_string(i) + "_key");
1237 }
1238
1239 const Snapshot* snapshot = db_->GetSnapshot();
1240 values = MultiGet(cfs, keys, snapshot, GetParam());
1241 db_->ReleaseSnapshot(snapshot);
1242 ASSERT_EQ(values.size(), 8);
1243 for (unsigned int j = 0; j < values.size(); ++j) {
1244 ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
1245 }
1246 for (int i = 0; i < 8; ++i) {
1247 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1248 reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1249 ->cfd();
1250 ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1251 }
1252 }
1253
1254 INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
1255 testing::Bool());
1256
TEST_F(DBBasicTest,MultiGetBatchedSimpleUnsorted)1257 TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
1258 do {
1259 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1260 SetPerfLevel(kEnableCount);
1261 ASSERT_OK(Put(1, "k1", "v1"));
1262 ASSERT_OK(Put(1, "k2", "v2"));
1263 ASSERT_OK(Put(1, "k3", "v3"));
1264 ASSERT_OK(Put(1, "k4", "v4"));
1265 ASSERT_OK(Delete(1, "k4"));
1266 ASSERT_OK(Put(1, "k5", "v5"));
1267 ASSERT_OK(Delete(1, "no_key"));
1268
1269 get_perf_context()->Reset();
1270
1271 std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
1272 std::vector<PinnableSlice> values(keys.size());
1273 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1274 std::vector<Status> s(keys.size());
1275
1276 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1277 values.data(), s.data(), false);
1278
1279 ASSERT_EQ(values.size(), keys.size());
1280 ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1281 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
1282 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1283 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1284 // four kv pairs * two bytes per value
1285 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1286
1287 ASSERT_TRUE(s[0].IsNotFound());
1288 ASSERT_OK(s[1]);
1289 ASSERT_TRUE(s[2].IsNotFound());
1290 ASSERT_OK(s[3]);
1291 ASSERT_OK(s[4]);
1292 ASSERT_OK(s[5]);
1293
1294 SetPerfLevel(kDisable);
1295 } while (ChangeCompactOptions());
1296 }
1297
TEST_F(DBBasicTest,MultiGetBatchedSimpleSorted)1298 TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
1299 do {
1300 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1301 SetPerfLevel(kEnableCount);
1302 ASSERT_OK(Put(1, "k1", "v1"));
1303 ASSERT_OK(Put(1, "k2", "v2"));
1304 ASSERT_OK(Put(1, "k3", "v3"));
1305 ASSERT_OK(Put(1, "k4", "v4"));
1306 ASSERT_OK(Delete(1, "k4"));
1307 ASSERT_OK(Put(1, "k5", "v5"));
1308 ASSERT_OK(Delete(1, "no_key"));
1309
1310 get_perf_context()->Reset();
1311
1312 std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
1313 std::vector<PinnableSlice> values(keys.size());
1314 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1315 std::vector<Status> s(keys.size());
1316
1317 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1318 values.data(), s.data(), true);
1319
1320 ASSERT_EQ(values.size(), keys.size());
1321 ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
1322 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
1323 ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1324 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
1325 // four kv pairs * two bytes per value
1326 ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1327
1328 ASSERT_OK(s[0]);
1329 ASSERT_OK(s[1]);
1330 ASSERT_OK(s[2]);
1331 ASSERT_TRUE(s[3].IsNotFound());
1332 ASSERT_OK(s[4]);
1333 ASSERT_TRUE(s[5].IsNotFound());
1334
1335 SetPerfLevel(kDisable);
1336 } while (ChangeCompactOptions());
1337 }
1338
TEST_F(DBBasicTest,MultiGetBatchedMultiLevel)1339 TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
1340 Options options = CurrentOptions();
1341 options.disable_auto_compactions = true;
1342 Reopen(options);
1343 int num_keys = 0;
1344
1345 for (int i = 0; i < 128; ++i) {
1346 ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1347 num_keys++;
1348 if (num_keys == 8) {
1349 Flush();
1350 num_keys = 0;
1351 }
1352 }
1353 if (num_keys > 0) {
1354 Flush();
1355 num_keys = 0;
1356 }
1357 MoveFilesToLevel(2);
1358
1359 for (int i = 0; i < 128; i += 3) {
1360 ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1361 num_keys++;
1362 if (num_keys == 8) {
1363 Flush();
1364 num_keys = 0;
1365 }
1366 }
1367 if (num_keys > 0) {
1368 Flush();
1369 num_keys = 0;
1370 }
1371 MoveFilesToLevel(1);
1372
1373 for (int i = 0; i < 128; i += 5) {
1374 ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1375 num_keys++;
1376 if (num_keys == 8) {
1377 Flush();
1378 num_keys = 0;
1379 }
1380 }
1381 if (num_keys > 0) {
1382 Flush();
1383 num_keys = 0;
1384 }
1385 ASSERT_EQ(0, num_keys);
1386
1387 for (int i = 0; i < 128; i += 9) {
1388 ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1389 }
1390
1391 std::vector<std::string> keys;
1392 std::vector<std::string> values;
1393
1394 for (int i = 64; i < 80; ++i) {
1395 keys.push_back("key_" + std::to_string(i));
1396 }
1397
1398 values = MultiGet(keys, nullptr);
1399 ASSERT_EQ(values.size(), 16);
1400 for (unsigned int j = 0; j < values.size(); ++j) {
1401 int key = j + 64;
1402 if (key % 9 == 0) {
1403 ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
1404 } else if (key % 5 == 0) {
1405 ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
1406 } else if (key % 3 == 0) {
1407 ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
1408 } else {
1409 ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
1410 }
1411 }
1412 }
1413
TEST_F(DBBasicTest,MultiGetBatchedMultiLevelMerge)1414 TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
1415 Options options = CurrentOptions();
1416 options.disable_auto_compactions = true;
1417 options.merge_operator = MergeOperators::CreateStringAppendOperator();
1418 BlockBasedTableOptions bbto;
1419 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1420 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1421 Reopen(options);
1422 int num_keys = 0;
1423
1424 for (int i = 0; i < 128; ++i) {
1425 ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1426 num_keys++;
1427 if (num_keys == 8) {
1428 Flush();
1429 num_keys = 0;
1430 }
1431 }
1432 if (num_keys > 0) {
1433 Flush();
1434 num_keys = 0;
1435 }
1436 MoveFilesToLevel(2);
1437
1438 for (int i = 0; i < 128; i += 3) {
1439 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1440 num_keys++;
1441 if (num_keys == 8) {
1442 Flush();
1443 num_keys = 0;
1444 }
1445 }
1446 if (num_keys > 0) {
1447 Flush();
1448 num_keys = 0;
1449 }
1450 MoveFilesToLevel(1);
1451
1452 for (int i = 0; i < 128; i += 5) {
1453 ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1454 num_keys++;
1455 if (num_keys == 8) {
1456 Flush();
1457 num_keys = 0;
1458 }
1459 }
1460 if (num_keys > 0) {
1461 Flush();
1462 num_keys = 0;
1463 }
1464 ASSERT_EQ(0, num_keys);
1465
1466 for (int i = 0; i < 128; i += 9) {
1467 ASSERT_OK(
1468 Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1469 }
1470
1471 std::vector<std::string> keys;
1472 std::vector<std::string> values;
1473
1474 for (int i = 32; i < 80; ++i) {
1475 keys.push_back("key_" + std::to_string(i));
1476 }
1477
1478 values = MultiGet(keys, nullptr);
1479 ASSERT_EQ(values.size(), keys.size());
1480 for (unsigned int j = 0; j < 48; ++j) {
1481 int key = j + 32;
1482 std::string value;
1483 value.append("val_l2_" + std::to_string(key));
1484 if (key % 3 == 0) {
1485 value.append(",");
1486 value.append("val_l1_" + std::to_string(key));
1487 }
1488 if (key % 5 == 0) {
1489 value.append(",");
1490 value.append("val_l0_" + std::to_string(key));
1491 }
1492 if (key % 9 == 0) {
1493 value.append(",");
1494 value.append("val_mem_" + std::to_string(key));
1495 }
1496 ASSERT_EQ(values[j], value);
1497 }
1498 }
1499
1500 // Test class for batched MultiGet with prefix extractor
1501 // Param bool - If true, use partitioned filters
1502 // If false, use full filter block
1503 class MultiGetPrefixExtractorTest : public DBBasicTest,
1504 public ::testing::WithParamInterface<bool> {
1505 };
1506
TEST_P(MultiGetPrefixExtractorTest,Batched)1507 TEST_P(MultiGetPrefixExtractorTest, Batched) {
1508 Options options = CurrentOptions();
1509 options.prefix_extractor.reset(NewFixedPrefixTransform(2));
1510 options.memtable_prefix_bloom_size_ratio = 10;
1511 BlockBasedTableOptions bbto;
1512 if (GetParam()) {
1513 bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1514 bbto.partition_filters = true;
1515 }
1516 bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1517 bbto.whole_key_filtering = false;
1518 bbto.cache_index_and_filter_blocks = false;
1519 options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1520 Reopen(options);
1521
1522 SetPerfLevel(kEnableCount);
1523 get_perf_context()->Reset();
1524
1525 // First key is not in the prefix_extractor domain
1526 ASSERT_OK(Put("k", "v0"));
1527 ASSERT_OK(Put("kk1", "v1"));
1528 ASSERT_OK(Put("kk2", "v2"));
1529 ASSERT_OK(Put("kk3", "v3"));
1530 ASSERT_OK(Put("kk4", "v4"));
1531 std::vector<std::string> mem_keys(
1532 {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
1533 std::vector<std::string> inmem_values;
1534 inmem_values = MultiGet(mem_keys, nullptr);
1535 ASSERT_EQ(inmem_values[0], "v0");
1536 ASSERT_EQ(inmem_values[1], "v1");
1537 ASSERT_EQ(inmem_values[2], "v2");
1538 ASSERT_EQ(inmem_values[3], "v3");
1539 ASSERT_EQ(inmem_values[4], "v4");
1540 ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
1541 ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
1542 ASSERT_OK(Flush());
1543
1544 std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
1545 std::vector<std::string> values;
1546 get_perf_context()->Reset();
1547 values = MultiGet(keys, nullptr);
1548 ASSERT_EQ(values[0], "v0");
1549 ASSERT_EQ(values[1], "v1");
1550 ASSERT_EQ(values[2], "v2");
1551 ASSERT_EQ(values[3], "v3");
1552 ASSERT_EQ(values[4], "v4");
1553 // Filter hits for 4 in-domain keys
1554 ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
1555 }
1556
1557 INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
1558 ::testing::Bool());
1559
1560 #ifndef ROCKSDB_LITE
1561 class DBMultiGetRowCacheTest : public DBBasicTest,
1562 public ::testing::WithParamInterface<bool> {};
1563
TEST_P(DBMultiGetRowCacheTest,MultiGetBatched)1564 TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
1565 do {
1566 option_config_ = kRowCache;
1567 Options options = CurrentOptions();
1568 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1569 CreateAndReopenWithCF({"pikachu"}, options);
1570 SetPerfLevel(kEnableCount);
1571 ASSERT_OK(Put(1, "k1", "v1"));
1572 ASSERT_OK(Put(1, "k2", "v2"));
1573 ASSERT_OK(Put(1, "k3", "v3"));
1574 ASSERT_OK(Put(1, "k4", "v4"));
1575 Flush(1);
1576 ASSERT_OK(Put(1, "k5", "v5"));
1577 const Snapshot* snap1 = dbfull()->GetSnapshot();
1578 ASSERT_OK(Delete(1, "k4"));
1579 Flush(1);
1580 const Snapshot* snap2 = dbfull()->GetSnapshot();
1581
1582 get_perf_context()->Reset();
1583
1584 std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
1585 std::vector<PinnableSlice> values(keys.size());
1586 std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1587 std::vector<Status> s(keys.size());
1588
1589 ReadOptions ro;
1590 bool use_snapshots = GetParam();
1591 if (use_snapshots) {
1592 ro.snapshot = snap2;
1593 }
1594 db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1595 s.data(), false);
1596
1597 ASSERT_EQ(values.size(), keys.size());
1598 ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
1599 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1600 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1601 // four kv pairs * two bytes per value
1602 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1603
1604 ASSERT_TRUE(s[0].IsNotFound());
1605 ASSERT_OK(s[1]);
1606 ASSERT_TRUE(s[2].IsNotFound());
1607 ASSERT_OK(s[3]);
1608 ASSERT_OK(s[4]);
1609
1610 // Call MultiGet() again with some intersection with the previous set of
1611 // keys. Those should already be in the row cache.
1612 keys.assign({"no_key", "k5", "k3", "k2"});
1613 for (size_t i = 0; i < keys.size(); ++i) {
1614 values[i].Reset();
1615 s[i] = Status::OK();
1616 }
1617 get_perf_context()->Reset();
1618
1619 if (use_snapshots) {
1620 ro.snapshot = snap1;
1621 }
1622 db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1623 values.data(), s.data(), false);
1624
1625 ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
1626 ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1627 ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1628 // four kv pairs * two bytes per value
1629 ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1630
1631 ASSERT_TRUE(s[0].IsNotFound());
1632 ASSERT_OK(s[1]);
1633 ASSERT_OK(s[2]);
1634 ASSERT_OK(s[3]);
1635 if (use_snapshots) {
1636 // Only reads from the first SST file would have been cached, since
1637 // snapshot seq no is > fd.largest_seqno
1638 ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
1639 } else {
1640 ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
1641 }
1642
1643 SetPerfLevel(kDisable);
1644 dbfull()->ReleaseSnapshot(snap1);
1645 dbfull()->ReleaseSnapshot(snap2);
1646 } while (ChangeCompactOptions());
1647 }
1648
1649 INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
1650 testing::Values(true, false));
1651
TEST_F(DBBasicTest,GetAllKeyVersions)1652 TEST_F(DBBasicTest, GetAllKeyVersions) {
1653 Options options = CurrentOptions();
1654 options.env = env_;
1655 options.create_if_missing = true;
1656 options.disable_auto_compactions = true;
1657 CreateAndReopenWithCF({"pikachu"}, options);
1658 ASSERT_EQ(2, handles_.size());
1659 const size_t kNumInserts = 4;
1660 const size_t kNumDeletes = 4;
1661 const size_t kNumUpdates = 4;
1662
1663 // Check default column family
1664 for (size_t i = 0; i != kNumInserts; ++i) {
1665 ASSERT_OK(Put(std::to_string(i), "value"));
1666 }
1667 for (size_t i = 0; i != kNumUpdates; ++i) {
1668 ASSERT_OK(Put(std::to_string(i), "value1"));
1669 }
1670 for (size_t i = 0; i != kNumDeletes; ++i) {
1671 ASSERT_OK(Delete(std::to_string(i)));
1672 }
1673 std::vector<KeyVersion> key_versions;
1674 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1675 db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
1676 &key_versions));
1677 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1678 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1679 db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1680 &key_versions));
1681 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1682
1683 // Check non-default column family
1684 for (size_t i = 0; i != kNumInserts - 1; ++i) {
1685 ASSERT_OK(Put(1, std::to_string(i), "value"));
1686 }
1687 for (size_t i = 0; i != kNumUpdates - 1; ++i) {
1688 ASSERT_OK(Put(1, std::to_string(i), "value1"));
1689 }
1690 for (size_t i = 0; i != kNumDeletes - 1; ++i) {
1691 ASSERT_OK(Delete(1, std::to_string(i)));
1692 }
1693 ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1694 db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1695 &key_versions));
1696 ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
1697 }
1698 #endif // !ROCKSDB_LITE
1699
TEST_F(DBBasicTest,MultiGetIOBufferOverrun)1700 TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
1701 Options options = CurrentOptions();
1702 Random rnd(301);
1703 BlockBasedTableOptions table_options;
1704 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1705 table_options.block_size = 16 * 1024;
1706 ASSERT_TRUE(table_options.block_size >
1707 BlockBasedTable::kMultiGetReadStackBufSize);
1708 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1709 Reopen(options);
1710
1711 std::string zero_str(128, '\0');
1712 for (int i = 0; i < 100; ++i) {
1713 // Make the value compressible. A purely random string doesn't compress
1714 // and the resultant data block will not be compressed
1715 std::string value(RandomString(&rnd, 128) + zero_str);
1716 assert(Put(Key(i), value) == Status::OK());
1717 }
1718 Flush();
1719
1720 std::vector<std::string> key_data(10);
1721 std::vector<Slice> keys;
1722 // We cannot resize a PinnableSlice vector, so just set initial size to
1723 // largest we think we will need
1724 std::vector<PinnableSlice> values(10);
1725 std::vector<Status> statuses;
1726 ReadOptions ro;
1727
1728 // Warm up the cache first
1729 key_data.emplace_back(Key(0));
1730 keys.emplace_back(Slice(key_data.back()));
1731 key_data.emplace_back(Key(50));
1732 keys.emplace_back(Slice(key_data.back()));
1733 statuses.resize(keys.size());
1734
1735 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
1736 keys.data(), values.data(), statuses.data(), true);
1737 }
1738
TEST_F(DBBasicTest,IncrementalRecoveryNoCorrupt)1739 TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
1740 Options options = CurrentOptions();
1741 DestroyAndReopen(options);
1742 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
1743 size_t num_cfs = handles_.size();
1744 ASSERT_EQ(3, num_cfs);
1745 WriteOptions write_opts;
1746 write_opts.disableWAL = true;
1747 for (size_t cf = 0; cf != num_cfs; ++cf) {
1748 for (size_t i = 0; i != 10000; ++i) {
1749 std::string key_str = Key(static_cast<int>(i));
1750 std::string value_str = std::to_string(cf) + "_" + std::to_string(i);
1751
1752 ASSERT_OK(Put(static_cast<int>(cf), key_str, value_str));
1753 if (0 == (i % 1000)) {
1754 ASSERT_OK(Flush(static_cast<int>(cf)));
1755 }
1756 }
1757 }
1758 for (size_t cf = 0; cf != num_cfs; ++cf) {
1759 ASSERT_OK(Flush(static_cast<int>(cf)));
1760 }
1761 Close();
1762 options.best_efforts_recovery = true;
1763 ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
1764 options);
1765 num_cfs = handles_.size();
1766 ASSERT_EQ(3, num_cfs);
1767 for (size_t cf = 0; cf != num_cfs; ++cf) {
1768 for (int i = 0; i != 10000; ++i) {
1769 std::string key_str = Key(static_cast<int>(i));
1770 std::string expected_value_str =
1771 std::to_string(cf) + "_" + std::to_string(i);
1772 ASSERT_EQ(expected_value_str, Get(static_cast<int>(cf), key_str));
1773 }
1774 }
1775 }
1776
1777 #ifndef ROCKSDB_LITE
1778 namespace {
1779 class TableFileListener : public EventListener {
1780 public:
OnTableFileCreated(const TableFileCreationInfo & info)1781 void OnTableFileCreated(const TableFileCreationInfo& info) override {
1782 InstrumentedMutexLock lock(&mutex_);
1783 cf_to_paths_[info.cf_name].push_back(info.file_path);
1784 }
GetFiles(const std::string & cf_name)1785 std::vector<std::string>& GetFiles(const std::string& cf_name) {
1786 InstrumentedMutexLock lock(&mutex_);
1787 return cf_to_paths_[cf_name];
1788 }
1789
1790 private:
1791 InstrumentedMutex mutex_;
1792 std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
1793 };
1794 } // namespace
1795
TEST_F(DBBasicTest,RecoverWithMissingFiles)1796 TEST_F(DBBasicTest, RecoverWithMissingFiles) {
1797 Options options = CurrentOptions();
1798 DestroyAndReopen(options);
1799 TableFileListener* listener = new TableFileListener();
1800 // Disable auto compaction to simplify SST file name tracking.
1801 options.disable_auto_compactions = true;
1802 options.listeners.emplace_back(listener);
1803 CreateAndReopenWithCF({"pikachu", "eevee"}, options);
1804 std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
1805 "eevee"};
1806 size_t num_cfs = handles_.size();
1807 ASSERT_EQ(3, num_cfs);
1808 for (size_t cf = 0; cf != num_cfs; ++cf) {
1809 ASSERT_OK(Put(static_cast<int>(cf), "a", "0_value"));
1810 ASSERT_OK(Flush(static_cast<int>(cf)));
1811 ASSERT_OK(Put(static_cast<int>(cf), "b", "0_value"));
1812 ASSERT_OK(Flush(static_cast<int>(cf)));
1813 ASSERT_OK(Put(static_cast<int>(cf), "c", "0_value"));
1814 ASSERT_OK(Flush(static_cast<int>(cf)));
1815 }
1816
1817 // Delete files
1818 for (size_t i = 0; i < all_cf_names.size(); ++i) {
1819 std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
1820 ASSERT_EQ(3, files.size());
1821 for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
1822 --j) {
1823 ASSERT_OK(env_->DeleteFile(files[j]));
1824 }
1825 }
1826 options.best_efforts_recovery = true;
1827 ReopenWithColumnFamilies(all_cf_names, options);
1828 // Verify data
1829 ReadOptions read_opts;
1830 read_opts.total_order_seek = true;
1831 {
1832 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
1833 iter->SeekToFirst();
1834 ASSERT_FALSE(iter->Valid());
1835 iter.reset(db_->NewIterator(read_opts, handles_[1]));
1836 iter->SeekToFirst();
1837 ASSERT_TRUE(iter->Valid());
1838 ASSERT_EQ("a", iter->key());
1839 iter->Next();
1840 ASSERT_FALSE(iter->Valid());
1841 iter.reset(db_->NewIterator(read_opts, handles_[2]));
1842 iter->SeekToFirst();
1843 ASSERT_TRUE(iter->Valid());
1844 ASSERT_EQ("a", iter->key());
1845 iter->Next();
1846 ASSERT_TRUE(iter->Valid());
1847 ASSERT_EQ("b", iter->key());
1848 iter->Next();
1849 ASSERT_FALSE(iter->Valid());
1850 }
1851 }
1852
TEST_F(DBBasicTest,SkipWALIfMissingTableFiles)1853 TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
1854 Options options = CurrentOptions();
1855 DestroyAndReopen(options);
1856 TableFileListener* listener = new TableFileListener();
1857 options.listeners.emplace_back(listener);
1858 CreateAndReopenWithCF({"pikachu"}, options);
1859 std::vector<std::string> kAllCfNames = {kDefaultColumnFamilyName, "pikachu"};
1860 size_t num_cfs = handles_.size();
1861 ASSERT_EQ(2, num_cfs);
1862 for (int cf = 0; cf < static_cast<int>(kAllCfNames.size()); ++cf) {
1863 ASSERT_OK(Put(cf, "a", "0_value"));
1864 ASSERT_OK(Flush(cf));
1865 ASSERT_OK(Put(cf, "b", "0_value"));
1866 }
1867 // Delete files
1868 for (size_t i = 0; i < kAllCfNames.size(); ++i) {
1869 std::vector<std::string>& files = listener->GetFiles(kAllCfNames[i]);
1870 ASSERT_EQ(1, files.size());
1871 for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
1872 --j) {
1873 ASSERT_OK(env_->DeleteFile(files[j]));
1874 }
1875 }
1876 options.best_efforts_recovery = true;
1877 ReopenWithColumnFamilies(kAllCfNames, options);
1878 // Verify WAL is not applied
1879 ReadOptions read_opts;
1880 read_opts.total_order_seek = true;
1881 std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
1882 iter->SeekToFirst();
1883 ASSERT_FALSE(iter->Valid());
1884 iter.reset(db_->NewIterator(read_opts, handles_[1]));
1885 iter->SeekToFirst();
1886 ASSERT_TRUE(iter->Valid());
1887 ASSERT_EQ("a", iter->key());
1888 iter->Next();
1889 ASSERT_FALSE(iter->Valid());
1890 }
1891 #endif // !ROCKSDB_LITE
1892
1893 class DBBasicTestWithParallelIO
1894 : public DBTestBase,
1895 public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
1896 public:
DBBasicTestWithParallelIO()1897 DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
1898 bool compressed_cache = std::get<0>(GetParam());
1899 bool uncompressed_cache = std::get<1>(GetParam());
1900 compression_enabled_ = std::get<2>(GetParam());
1901 fill_cache_ = std::get<3>(GetParam());
1902
1903 if (compressed_cache) {
1904 std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1905 compressed_cache_ = std::make_shared<MyBlockCache>(cache);
1906 }
1907 if (uncompressed_cache) {
1908 std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1909 uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
1910 }
1911
1912 env_->count_random_reads_ = true;
1913
1914 Options options = CurrentOptions();
1915 Random rnd(301);
1916 BlockBasedTableOptions table_options;
1917
1918 #ifndef ROCKSDB_LITE
1919 if (compression_enabled_) {
1920 std::vector<CompressionType> compression_types;
1921 compression_types = GetSupportedCompressions();
1922 // Not every platform may have compression libraries available, so
1923 // dynamically pick based on what's available
1924 CompressionType tmp_type = kNoCompression;
1925 for (auto c_type : compression_types) {
1926 if (c_type != kNoCompression) {
1927 tmp_type = c_type;
1928 break;
1929 }
1930 }
1931 if (tmp_type != kNoCompression) {
1932 options.compression = tmp_type;
1933 } else {
1934 compression_enabled_ = false;
1935 }
1936 }
1937 #else
1938 // GetSupportedCompressions() is not available in LITE build
1939 if (!Snappy_Supported()) {
1940 compression_enabled_ = false;
1941 }
1942 #endif // ROCKSDB_LITE
1943
1944 table_options.block_cache = uncompressed_cache_;
1945 if (table_options.block_cache == nullptr) {
1946 table_options.no_block_cache = true;
1947 } else {
1948 table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1949 }
1950 table_options.block_cache_compressed = compressed_cache_;
1951 table_options.flush_block_policy_factory.reset(
1952 new MyFlushBlockPolicyFactory());
1953 options.table_factory.reset(new BlockBasedTableFactory(table_options));
1954 if (!compression_enabled_) {
1955 options.compression = kNoCompression;
1956 }
1957 Reopen(options);
1958
1959 std::string zero_str(128, '\0');
1960 for (int i = 0; i < 100; ++i) {
1961 // Make the value compressible. A purely random string doesn't compress
1962 // and the resultant data block will not be compressed
1963 values_.emplace_back(RandomString(&rnd, 128) + zero_str);
1964 assert(Put(Key(i), values_[i]) == Status::OK());
1965 }
1966 Flush();
1967
1968 for (int i = 0; i < 100; ++i) {
1969 // block cannot gain space by compression
1970 uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
1971 std::string tmp_key = "a" + Key(i);
1972 assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
1973 }
1974 Flush();
1975 }
1976
CheckValue(int i,const std::string & value)1977 bool CheckValue(int i, const std::string& value) {
1978 if (values_[i].compare(value) == 0) {
1979 return true;
1980 }
1981 return false;
1982 }
1983
CheckUncompressableValue(int i,const std::string & value)1984 bool CheckUncompressableValue(int i, const std::string& value) {
1985 if (uncompressable_values_[i].compare(value) == 0) {
1986 return true;
1987 }
1988 return false;
1989 }
1990
num_lookups()1991 int num_lookups() { return uncompressed_cache_->num_lookups(); }
num_found()1992 int num_found() { return uncompressed_cache_->num_found(); }
num_inserts()1993 int num_inserts() { return uncompressed_cache_->num_inserts(); }
1994
num_lookups_compressed()1995 int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
num_found_compressed()1996 int num_found_compressed() { return compressed_cache_->num_found(); }
num_inserts_compressed()1997 int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
1998
fill_cache()1999 bool fill_cache() { return fill_cache_; }
compression_enabled()2000 bool compression_enabled() { return compression_enabled_; }
has_compressed_cache()2001 bool has_compressed_cache() { return compressed_cache_ != nullptr; }
has_uncompressed_cache()2002 bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
2003
SetUpTestCase()2004 static void SetUpTestCase() {}
TearDownTestCase()2005 static void TearDownTestCase() {}
2006
2007 private:
2008 class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
2009 public:
MyFlushBlockPolicyFactory()2010 MyFlushBlockPolicyFactory() {}
2011
Name() const2012 virtual const char* Name() const override {
2013 return "MyFlushBlockPolicyFactory";
2014 }
2015
NewFlushBlockPolicy(const BlockBasedTableOptions &,const BlockBuilder & data_block_builder) const2016 virtual FlushBlockPolicy* NewFlushBlockPolicy(
2017 const BlockBasedTableOptions& /*table_options*/,
2018 const BlockBuilder& data_block_builder) const override {
2019 return new MyFlushBlockPolicy(data_block_builder);
2020 }
2021 };
2022
2023 class MyFlushBlockPolicy : public FlushBlockPolicy {
2024 public:
MyFlushBlockPolicy(const BlockBuilder & data_block_builder)2025 explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
2026 : num_keys_(0), data_block_builder_(data_block_builder) {}
2027
Update(const Slice &,const Slice &)2028 bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
2029 if (data_block_builder_.empty()) {
2030 // First key in this block
2031 num_keys_ = 1;
2032 return false;
2033 }
2034 // Flush every 10 keys
2035 if (num_keys_ == 10) {
2036 num_keys_ = 1;
2037 return true;
2038 }
2039 num_keys_++;
2040 return false;
2041 }
2042
2043 private:
2044 int num_keys_;
2045 const BlockBuilder& data_block_builder_;
2046 };
2047
2048 class MyBlockCache : public Cache {
2049 public:
MyBlockCache(std::shared_ptr<Cache> & target)2050 explicit MyBlockCache(std::shared_ptr<Cache>& target)
2051 : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
2052
Name() const2053 virtual const char* Name() const override { return "MyBlockCache"; }
2054
Insert(const Slice & key,void * value,size_t charge,Deleter * deleter,Handle ** handle=nullptr,Priority priority=Priority::LOW)2055 virtual Status Insert(const Slice& key, void* value, size_t charge,
2056 Deleter* deleter, Handle** handle = nullptr,
2057 Priority priority = Priority::LOW) override {
2058 num_inserts_++;
2059 return target_->Insert(key, value, charge, deleter, handle, priority);
2060 }
2061
Lookup(const Slice & key,Statistics * stats=nullptr)2062 virtual Handle* Lookup(const Slice& key,
2063 Statistics* stats = nullptr) override {
2064 num_lookups_++;
2065 Handle* handle = target_->Lookup(key, stats);
2066 if (handle != nullptr) {
2067 num_found_++;
2068 }
2069 return handle;
2070 }
2071
Ref(Handle * handle)2072 virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
2073
Release(Handle * handle,bool force_erase=false)2074 virtual bool Release(Handle* handle, bool force_erase = false) override {
2075 return target_->Release(handle, force_erase);
2076 }
2077
Value(Handle * handle)2078 virtual void* Value(Handle* handle) override {
2079 return target_->Value(handle);
2080 }
2081
Erase(const Slice & key)2082 virtual void Erase(const Slice& key) override { target_->Erase(key); }
NewId()2083 virtual uint64_t NewId() override { return target_->NewId(); }
2084
SetCapacity(size_t capacity)2085 virtual void SetCapacity(size_t capacity) override {
2086 target_->SetCapacity(capacity);
2087 }
2088
SetStrictCapacityLimit(bool strict_capacity_limit)2089 virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
2090 target_->SetStrictCapacityLimit(strict_capacity_limit);
2091 }
2092
HasStrictCapacityLimit() const2093 virtual bool HasStrictCapacityLimit() const override {
2094 return target_->HasStrictCapacityLimit();
2095 }
2096
GetCapacity() const2097 virtual size_t GetCapacity() const override {
2098 return target_->GetCapacity();
2099 }
2100
GetUsage() const2101 virtual size_t GetUsage() const override { return target_->GetUsage(); }
2102
GetUsage(Handle * handle) const2103 virtual size_t GetUsage(Handle* handle) const override {
2104 return target_->GetUsage(handle);
2105 }
2106
GetPinnedUsage() const2107 virtual size_t GetPinnedUsage() const override {
2108 return target_->GetPinnedUsage();
2109 }
2110
GetCharge(Handle *) const2111 virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
2112
ApplyToAllCacheEntries(void (* callback)(void *,size_t),bool thread_safe)2113 virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
2114 bool thread_safe) override {
2115 return target_->ApplyToAllCacheEntries(callback, thread_safe);
2116 }
2117
EraseUnRefEntries()2118 virtual void EraseUnRefEntries() override {
2119 return target_->EraseUnRefEntries();
2120 }
2121
num_lookups()2122 int num_lookups() { return num_lookups_; }
2123
num_found()2124 int num_found() { return num_found_; }
2125
num_inserts()2126 int num_inserts() { return num_inserts_; }
2127
2128 private:
2129 std::shared_ptr<Cache> target_;
2130 int num_lookups_;
2131 int num_found_;
2132 int num_inserts_;
2133 };
2134
2135 std::shared_ptr<MyBlockCache> compressed_cache_;
2136 std::shared_ptr<MyBlockCache> uncompressed_cache_;
2137 bool compression_enabled_;
2138 std::vector<std::string> values_;
2139 std::vector<std::string> uncompressable_values_;
2140 bool fill_cache_;
2141 };
2142
TEST_P(DBBasicTestWithParallelIO,MultiGet)2143 TEST_P(DBBasicTestWithParallelIO, MultiGet) {
2144 std::vector<std::string> key_data(10);
2145 std::vector<Slice> keys;
2146 // We cannot resize a PinnableSlice vector, so just set initial size to
2147 // largest we think we will need
2148 std::vector<PinnableSlice> values(10);
2149 std::vector<Status> statuses;
2150 ReadOptions ro;
2151 ro.fill_cache = fill_cache();
2152
2153 // Warm up the cache first
2154 key_data.emplace_back(Key(0));
2155 keys.emplace_back(Slice(key_data.back()));
2156 key_data.emplace_back(Key(50));
2157 keys.emplace_back(Slice(key_data.back()));
2158 statuses.resize(keys.size());
2159
2160 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2161 keys.data(), values.data(), statuses.data(), true);
2162 ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2163 ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2164
2165 int random_reads = env_->random_read_counter_.Read();
2166 key_data[0] = Key(1);
2167 key_data[1] = Key(51);
2168 keys[0] = Slice(key_data[0]);
2169 keys[1] = Slice(key_data[1]);
2170 values[0].Reset();
2171 values[1].Reset();
2172 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2173 keys.data(), values.data(), statuses.data(), true);
2174 ASSERT_TRUE(CheckValue(1, values[0].ToString()));
2175 ASSERT_TRUE(CheckValue(51, values[1].ToString()));
2176
2177 bool read_from_cache = false;
2178 if (fill_cache()) {
2179 if (has_uncompressed_cache()) {
2180 read_from_cache = true;
2181 } else if (has_compressed_cache() && compression_enabled()) {
2182 read_from_cache = true;
2183 }
2184 }
2185
2186 int expected_reads = random_reads + (read_from_cache ? 0 : 2);
2187 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2188
2189 keys.resize(10);
2190 statuses.resize(10);
2191 std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2192 for (size_t i = 0; i < key_ints.size(); ++i) {
2193 key_data[i] = Key(key_ints[i]);
2194 keys[i] = Slice(key_data[i]);
2195 statuses[i] = Status::OK();
2196 values[i].Reset();
2197 }
2198 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2199 keys.data(), values.data(), statuses.data(), true);
2200 for (size_t i = 0; i < key_ints.size(); ++i) {
2201 ASSERT_OK(statuses[i]);
2202 ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
2203 }
2204 if (compression_enabled() && !has_compressed_cache()) {
2205 expected_reads += (read_from_cache ? 2 : 3);
2206 } else {
2207 expected_reads += (read_from_cache ? 2 : 4);
2208 }
2209 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2210
2211 keys.resize(10);
2212 statuses.resize(10);
2213 std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2214 for (size_t i = 0; i < key_uncmp.size(); ++i) {
2215 key_data[i] = "a" + Key(key_uncmp[i]);
2216 keys[i] = Slice(key_data[i]);
2217 statuses[i] = Status::OK();
2218 values[i].Reset();
2219 }
2220 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2221 keys.data(), values.data(), statuses.data(), true);
2222 for (size_t i = 0; i < key_uncmp.size(); ++i) {
2223 ASSERT_OK(statuses[i]);
2224 ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
2225 }
2226 if (compression_enabled() && !has_compressed_cache()) {
2227 expected_reads += (read_from_cache ? 3 : 3);
2228 } else {
2229 expected_reads += (read_from_cache ? 4 : 4);
2230 }
2231 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2232
2233 keys.resize(5);
2234 statuses.resize(5);
2235 std::vector<int> key_tr{1, 2, 15, 16, 55};
2236 for (size_t i = 0; i < key_tr.size(); ++i) {
2237 key_data[i] = "a" + Key(key_tr[i]);
2238 keys[i] = Slice(key_data[i]);
2239 statuses[i] = Status::OK();
2240 values[i].Reset();
2241 }
2242 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2243 keys.data(), values.data(), statuses.data(), true);
2244 for (size_t i = 0; i < key_tr.size(); ++i) {
2245 ASSERT_OK(statuses[i]);
2246 ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
2247 }
2248 if (compression_enabled() && !has_compressed_cache()) {
2249 expected_reads += (read_from_cache ? 0 : 2);
2250 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2251 } else {
2252 if (has_uncompressed_cache()) {
2253 expected_reads += (read_from_cache ? 0 : 3);
2254 ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2255 } else {
2256 // A rare case, even we enable the block compression but some of data
2257 // blocks are not compressed due to content. If user only enable the
2258 // compressed cache, the uncompressed blocks will not tbe cached, and
2259 // block reads will be triggered. The number of reads is related to
2260 // the compression algorithm.
2261 ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
2262 }
2263 }
2264 }
2265
TEST_P(DBBasicTestWithParallelIO,MultiGetWithChecksumMismatch)2266 TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
2267 std::vector<std::string> key_data(10);
2268 std::vector<Slice> keys;
2269 // We cannot resize a PinnableSlice vector, so just set initial size to
2270 // largest we think we will need
2271 std::vector<PinnableSlice> values(10);
2272 std::vector<Status> statuses;
2273 int read_count = 0;
2274 ReadOptions ro;
2275 ro.fill_cache = fill_cache();
2276
2277 SyncPoint::GetInstance()->SetCallBack(
2278 "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) {
2279 Status* s = static_cast<Status*>(status);
2280 read_count++;
2281 if (read_count == 2) {
2282 *s = Status::Corruption();
2283 }
2284 });
2285 SyncPoint::GetInstance()->EnableProcessing();
2286
2287 // Warm up the cache first
2288 key_data.emplace_back(Key(0));
2289 keys.emplace_back(Slice(key_data.back()));
2290 key_data.emplace_back(Key(50));
2291 keys.emplace_back(Slice(key_data.back()));
2292 statuses.resize(keys.size());
2293
2294 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2295 keys.data(), values.data(), statuses.data(), true);
2296 ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2297 // ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2298 ASSERT_EQ(statuses[0], Status::OK());
2299 ASSERT_EQ(statuses[1], Status::Corruption());
2300
2301 SyncPoint::GetInstance()->DisableProcessing();
2302 }
2303
TEST_P(DBBasicTestWithParallelIO,MultiGetWithMissingFile)2304 TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
2305 std::vector<std::string> key_data(10);
2306 std::vector<Slice> keys;
2307 // We cannot resize a PinnableSlice vector, so just set initial size to
2308 // largest we think we will need
2309 std::vector<PinnableSlice> values(10);
2310 std::vector<Status> statuses;
2311 ReadOptions ro;
2312 ro.fill_cache = fill_cache();
2313
2314 SyncPoint::GetInstance()->SetCallBack(
2315 "TableCache::MultiGet:FindTable", [&](void* status) {
2316 Status* s = static_cast<Status*>(status);
2317 *s = Status::IOError();
2318 });
2319 // DB open will create table readers unless we reduce the table cache
2320 // capacity.
2321 // SanitizeOptions will set max_open_files to minimum of 20. Table cache
2322 // is allocated with max_open_files - 10 as capacity. So override
2323 // max_open_files to 11 so table cache capacity will become 1. This will
2324 // prevent file open during DB open and force the file to be opened
2325 // during MultiGet
2326 SyncPoint::GetInstance()->SetCallBack(
2327 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
2328 int* max_open_files = (int*)arg;
2329 *max_open_files = 11;
2330 });
2331 SyncPoint::GetInstance()->EnableProcessing();
2332
2333 Reopen(CurrentOptions());
2334
2335 // Warm up the cache first
2336 key_data.emplace_back(Key(0));
2337 keys.emplace_back(Slice(key_data.back()));
2338 key_data.emplace_back(Key(50));
2339 keys.emplace_back(Slice(key_data.back()));
2340 statuses.resize(keys.size());
2341
2342 dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2343 keys.data(), values.data(), statuses.data(), true);
2344 ASSERT_EQ(statuses[0], Status::IOError());
2345 ASSERT_EQ(statuses[1], Status::IOError());
2346
2347 SyncPoint::GetInstance()->DisableProcessing();
2348 }
2349
2350 INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
2351 // Params are as follows -
2352 // Param 0 - Compressed cache enabled
2353 // Param 1 - Uncompressed cache enabled
2354 // Param 2 - Data compression enabled
2355 // Param 3 - ReadOptions::fill_cache
2356 ::testing::Combine(::testing::Bool(), ::testing::Bool(),
2357 ::testing::Bool(),
2358 ::testing::Bool()));
2359
2360
2361 } // namespace ROCKSDB_NAMESPACE
2362
2363 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2364 extern "C" {
2365 void RegisterCustomObjects(int argc, char** argv);
2366 }
2367 #else
RegisterCustomObjects(int,char **)2368 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
2369 #endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2370
main(int argc,char ** argv)2371 int main(int argc, char** argv) {
2372 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2373 ::testing::InitGoogleTest(&argc, argv);
2374 RegisterCustomObjects(argc, argv);
2375 return RUN_ALL_TESTS();
2376 }
2377