1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/db_test_util.h"
11 #include "port/stack_trace.h"
12 #include "rocksdb/perf_context.h"
13 #include "rocksdb/utilities/debug.h"
14 #include "table/block_based/block_based_table_reader.h"
15 #include "table/block_based/block_builder.h"
16 #include "test_util/fault_injection_test_env.h"
17 #if !defined(ROCKSDB_LITE)
18 #include "test_util/sync_point.h"
19 #endif
20 
21 namespace ROCKSDB_NAMESPACE {
22 
23 class DBBasicTest : public DBTestBase {
24  public:
DBBasicTest()25   DBBasicTest() : DBTestBase("/db_basic_test") {}
26 };
27 
TEST_F(DBBasicTest,OpenWhenOpen)28 TEST_F(DBBasicTest, OpenWhenOpen) {
29   Options options = CurrentOptions();
30   options.env = env_;
31   ROCKSDB_NAMESPACE::DB* db2 = nullptr;
32   ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
33 
34   ASSERT_EQ(Status::Code::kIOError, s.code());
35   ASSERT_EQ(Status::SubCode::kNone, s.subcode());
36   ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
37 
38   delete db2;
39 }
40 
41 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,ReadOnlyDB)42 TEST_F(DBBasicTest, ReadOnlyDB) {
43   ASSERT_OK(Put("foo", "v1"));
44   ASSERT_OK(Put("bar", "v2"));
45   ASSERT_OK(Put("foo", "v3"));
46   Close();
47 
48   auto options = CurrentOptions();
49   assert(options.env == env_);
50   ASSERT_OK(ReadOnlyReopen(options));
51   ASSERT_EQ("v3", Get("foo"));
52   ASSERT_EQ("v2", Get("bar"));
53   Iterator* iter = db_->NewIterator(ReadOptions());
54   int count = 0;
55   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
56     ASSERT_OK(iter->status());
57     ++count;
58   }
59   ASSERT_EQ(count, 2);
60   delete iter;
61   Close();
62 
63   // Reopen and flush memtable.
64   Reopen(options);
65   Flush();
66   Close();
67   // Now check keys in read only mode.
68   ASSERT_OK(ReadOnlyReopen(options));
69   ASSERT_EQ("v3", Get("foo"));
70   ASSERT_EQ("v2", Get("bar"));
71   ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
72 }
73 
TEST_F(DBBasicTest,ReadOnlyDBWithWriteDBIdToManifestSet)74 TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
75   ASSERT_OK(Put("foo", "v1"));
76   ASSERT_OK(Put("bar", "v2"));
77   ASSERT_OK(Put("foo", "v3"));
78   Close();
79 
80   auto options = CurrentOptions();
81   options.write_dbid_to_manifest = true;
82   assert(options.env == env_);
83   ASSERT_OK(ReadOnlyReopen(options));
84   std::string db_id1;
85   db_->GetDbIdentity(db_id1);
86   ASSERT_EQ("v3", Get("foo"));
87   ASSERT_EQ("v2", Get("bar"));
88   Iterator* iter = db_->NewIterator(ReadOptions());
89   int count = 0;
90   for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
91     ASSERT_OK(iter->status());
92     ++count;
93   }
94   ASSERT_EQ(count, 2);
95   delete iter;
96   Close();
97 
98   // Reopen and flush memtable.
99   Reopen(options);
100   Flush();
101   Close();
102   // Now check keys in read only mode.
103   ASSERT_OK(ReadOnlyReopen(options));
104   ASSERT_EQ("v3", Get("foo"));
105   ASSERT_EQ("v2", Get("bar"));
106   ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
107   std::string db_id2;
108   db_->GetDbIdentity(db_id2);
109   ASSERT_EQ(db_id1, db_id2);
110 }
111 
TEST_F(DBBasicTest,CompactedDB)112 TEST_F(DBBasicTest, CompactedDB) {
113   const uint64_t kFileSize = 1 << 20;
114   Options options = CurrentOptions();
115   options.disable_auto_compactions = true;
116   options.write_buffer_size = kFileSize;
117   options.target_file_size_base = kFileSize;
118   options.max_bytes_for_level_base = 1 << 30;
119   options.compression = kNoCompression;
120   Reopen(options);
121   // 1 L0 file, use CompactedDB if max_open_files = -1
122   ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
123   Flush();
124   Close();
125   ASSERT_OK(ReadOnlyReopen(options));
126   Status s = Put("new", "value");
127   ASSERT_EQ(s.ToString(),
128             "Not implemented: Not supported operation in read only mode.");
129   ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
130   Close();
131   options.max_open_files = -1;
132   ASSERT_OK(ReadOnlyReopen(options));
133   s = Put("new", "value");
134   ASSERT_EQ(s.ToString(),
135             "Not implemented: Not supported in compacted db mode.");
136   ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
137   Close();
138   Reopen(options);
139   // Add more L0 files
140   ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
141   Flush();
142   ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
143   Flush();
144   ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
145   ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
146   Flush();
147   Close();
148 
149   ASSERT_OK(ReadOnlyReopen(options));
150   // Fallback to read-only DB
151   s = Put("new", "value");
152   ASSERT_EQ(s.ToString(),
153             "Not implemented: Not supported operation in read only mode.");
154   Close();
155 
156   // Full compaction
157   Reopen(options);
158   // Add more keys
159   ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
160   ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
161   ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
162   ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
163   db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
164   ASSERT_EQ(3, NumTableFilesAtLevel(1));
165   Close();
166 
167   // CompactedDB
168   ASSERT_OK(ReadOnlyReopen(options));
169   s = Put("new", "value");
170   ASSERT_EQ(s.ToString(),
171             "Not implemented: Not supported in compacted db mode.");
172   ASSERT_EQ("NOT_FOUND", Get("abc"));
173   ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
174   ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
175   ASSERT_EQ("NOT_FOUND", Get("ccc"));
176   ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
177   ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
178   ASSERT_EQ("NOT_FOUND", Get("ggg"));
179   ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
180   ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
181   ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
182   ASSERT_EQ("NOT_FOUND", Get("kkk"));
183 
184   // MultiGet
185   std::vector<std::string> values;
186   std::vector<Status> status_list = dbfull()->MultiGet(
187       ReadOptions(),
188       std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
189                           Slice("ggg"), Slice("iii"), Slice("kkk")}),
190       &values);
191   ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
192   ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
193   ASSERT_OK(status_list[0]);
194   ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
195   ASSERT_TRUE(status_list[1].IsNotFound());
196   ASSERT_OK(status_list[2]);
197   ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
198   ASSERT_TRUE(status_list[3].IsNotFound());
199   ASSERT_OK(status_list[4]);
200   ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
201   ASSERT_TRUE(status_list[5].IsNotFound());
202 
203   Reopen(options);
204   // Add a key
205   ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
206   Close();
207   ASSERT_OK(ReadOnlyReopen(options));
208   s = Put("new", "value");
209   ASSERT_EQ(s.ToString(),
210             "Not implemented: Not supported operation in read only mode.");
211 }
212 
TEST_F(DBBasicTest,LevelLimitReopen)213 TEST_F(DBBasicTest, LevelLimitReopen) {
214   Options options = CurrentOptions();
215   CreateAndReopenWithCF({"pikachu"}, options);
216 
217   const std::string value(1024 * 1024, ' ');
218   int i = 0;
219   while (NumTableFilesAtLevel(2, 1) == 0) {
220     ASSERT_OK(Put(1, Key(i++), value));
221     dbfull()->TEST_WaitForFlushMemTable();
222     dbfull()->TEST_WaitForCompact();
223   }
224 
225   options.num_levels = 1;
226   options.max_bytes_for_level_multiplier_additional.resize(1, 1);
227   Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
228   ASSERT_EQ(s.IsInvalidArgument(), true);
229   ASSERT_EQ(s.ToString(),
230             "Invalid argument: db has more levels than options.num_levels");
231 
232   options.num_levels = 10;
233   options.max_bytes_for_level_multiplier_additional.resize(10, 1);
234   ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
235 }
236 #endif  // ROCKSDB_LITE
237 
TEST_F(DBBasicTest,PutDeleteGet)238 TEST_F(DBBasicTest, PutDeleteGet) {
239   do {
240     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
241     ASSERT_OK(Put(1, "foo", "v1"));
242     ASSERT_EQ("v1", Get(1, "foo"));
243     ASSERT_OK(Put(1, "foo", "v2"));
244     ASSERT_EQ("v2", Get(1, "foo"));
245     ASSERT_OK(Delete(1, "foo"));
246     ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
247   } while (ChangeOptions());
248 }
249 
TEST_F(DBBasicTest,PutSingleDeleteGet)250 TEST_F(DBBasicTest, PutSingleDeleteGet) {
251   do {
252     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
253     ASSERT_OK(Put(1, "foo", "v1"));
254     ASSERT_EQ("v1", Get(1, "foo"));
255     ASSERT_OK(Put(1, "foo2", "v2"));
256     ASSERT_EQ("v2", Get(1, "foo2"));
257     ASSERT_OK(SingleDelete(1, "foo"));
258     ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
259     // Ski FIFO and universal compaction because they do not apply to the test
260     // case. Skip MergePut because single delete does not get removed when it
261     // encounters a merge.
262   } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
263                          kSkipMergePut));
264 }
265 
TEST_F(DBBasicTest,EmptyFlush)266 TEST_F(DBBasicTest, EmptyFlush) {
267   // It is possible to produce empty flushes when using single deletes. Tests
268   // whether empty flushes cause issues.
269   do {
270     Random rnd(301);
271 
272     Options options = CurrentOptions();
273     options.disable_auto_compactions = true;
274     CreateAndReopenWithCF({"pikachu"}, options);
275 
276     Put(1, "a", Slice());
277     SingleDelete(1, "a");
278     ASSERT_OK(Flush(1));
279 
280     ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
281     // Skip FIFO and  universal compaction as they do not apply to the test
282     // case. Skip MergePut because merges cannot be combined with single
283     // deletions.
284   } while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
285                          kSkipMergePut));
286 }
287 
TEST_F(DBBasicTest,GetFromVersions)288 TEST_F(DBBasicTest, GetFromVersions) {
289   do {
290     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
291     ASSERT_OK(Put(1, "foo", "v1"));
292     ASSERT_OK(Flush(1));
293     ASSERT_EQ("v1", Get(1, "foo"));
294     ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
295   } while (ChangeOptions());
296 }
297 
298 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,GetSnapshot)299 TEST_F(DBBasicTest, GetSnapshot) {
300   anon::OptionsOverride options_override;
301   options_override.skip_policy = kSkipNoSnapshot;
302   do {
303     CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
304     // Try with both a short key and a long key
305     for (int i = 0; i < 2; i++) {
306       std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
307       ASSERT_OK(Put(1, key, "v1"));
308       const Snapshot* s1 = db_->GetSnapshot();
309       ASSERT_OK(Put(1, key, "v2"));
310       ASSERT_EQ("v2", Get(1, key));
311       ASSERT_EQ("v1", Get(1, key, s1));
312       ASSERT_OK(Flush(1));
313       ASSERT_EQ("v2", Get(1, key));
314       ASSERT_EQ("v1", Get(1, key, s1));
315       db_->ReleaseSnapshot(s1);
316     }
317   } while (ChangeOptions());
318 }
319 #endif  // ROCKSDB_LITE
320 
TEST_F(DBBasicTest,CheckLock)321 TEST_F(DBBasicTest, CheckLock) {
322   do {
323     DB* localdb;
324     Options options = CurrentOptions();
325     ASSERT_OK(TryReopen(options));
326 
327     // second open should fail
328     Status s = DB::Open(options, dbname_, &localdb);
329     ASSERT_NOK(s);
330 #ifdef OS_LINUX
331     ASSERT_TRUE(s.ToString().find("lock hold by current process") !=
332                 std::string::npos);
333 #endif  // OS_LINUX
334   } while (ChangeCompactOptions());
335 }
336 
TEST_F(DBBasicTest,FlushMultipleMemtable)337 TEST_F(DBBasicTest, FlushMultipleMemtable) {
338   do {
339     Options options = CurrentOptions();
340     WriteOptions writeOpt = WriteOptions();
341     writeOpt.disableWAL = true;
342     options.max_write_buffer_number = 4;
343     options.min_write_buffer_number_to_merge = 3;
344     options.max_write_buffer_size_to_maintain = -1;
345     CreateAndReopenWithCF({"pikachu"}, options);
346     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
347     ASSERT_OK(Flush(1));
348     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
349 
350     ASSERT_EQ("v1", Get(1, "foo"));
351     ASSERT_EQ("v1", Get(1, "bar"));
352     ASSERT_OK(Flush(1));
353   } while (ChangeCompactOptions());
354 }
355 
TEST_F(DBBasicTest,FlushEmptyColumnFamily)356 TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
357   // Block flush thread and disable compaction thread
358   env_->SetBackgroundThreads(1, Env::HIGH);
359   env_->SetBackgroundThreads(1, Env::LOW);
360   test::SleepingBackgroundTask sleeping_task_low;
361   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
362                  Env::Priority::LOW);
363   test::SleepingBackgroundTask sleeping_task_high;
364   env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
365                  &sleeping_task_high, Env::Priority::HIGH);
366 
367   Options options = CurrentOptions();
368   // disable compaction
369   options.disable_auto_compactions = true;
370   WriteOptions writeOpt = WriteOptions();
371   writeOpt.disableWAL = true;
372   options.max_write_buffer_number = 2;
373   options.min_write_buffer_number_to_merge = 1;
374   options.max_write_buffer_size_to_maintain =
375       static_cast<int64_t>(options.write_buffer_size);
376   CreateAndReopenWithCF({"pikachu"}, options);
377 
378   // Compaction can still go through even if no thread can flush the
379   // mem table.
380   ASSERT_OK(Flush(0));
381   ASSERT_OK(Flush(1));
382 
383   // Insert can go through
384   ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
385   ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
386 
387   ASSERT_EQ("v1", Get(0, "foo"));
388   ASSERT_EQ("v1", Get(1, "bar"));
389 
390   sleeping_task_high.WakeUp();
391   sleeping_task_high.WaitUntilDone();
392 
393   // Flush can still go through.
394   ASSERT_OK(Flush(0));
395   ASSERT_OK(Flush(1));
396 
397   sleeping_task_low.WakeUp();
398   sleeping_task_low.WaitUntilDone();
399 }
400 
TEST_F(DBBasicTest,Flush)401 TEST_F(DBBasicTest, Flush) {
402   do {
403     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
404     WriteOptions writeOpt = WriteOptions();
405     writeOpt.disableWAL = true;
406     SetPerfLevel(kEnableTime);
407     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
408     // this will now also flush the last 2 writes
409     ASSERT_OK(Flush(1));
410     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
411 
412     get_perf_context()->Reset();
413     Get(1, "foo");
414     ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
415     ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
416 
417     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
418     ASSERT_EQ("v1", Get(1, "foo"));
419     ASSERT_EQ("v1", Get(1, "bar"));
420 
421     writeOpt.disableWAL = true;
422     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
423     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
424     ASSERT_OK(Flush(1));
425 
426     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
427     ASSERT_EQ("v2", Get(1, "bar"));
428     get_perf_context()->Reset();
429     ASSERT_EQ("v2", Get(1, "foo"));
430     ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
431 
432     writeOpt.disableWAL = false;
433     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
434     ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
435     ASSERT_OK(Flush(1));
436 
437     ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
438     // 'foo' should be there because its put
439     // has WAL enabled.
440     ASSERT_EQ("v3", Get(1, "foo"));
441     ASSERT_EQ("v3", Get(1, "bar"));
442 
443     SetPerfLevel(kDisable);
444   } while (ChangeCompactOptions());
445 }
446 
TEST_F(DBBasicTest,ManifestRollOver)447 TEST_F(DBBasicTest, ManifestRollOver) {
448   do {
449     Options options;
450     options.max_manifest_file_size = 10;  // 10 bytes
451     options = CurrentOptions(options);
452     CreateAndReopenWithCF({"pikachu"}, options);
453     {
454       ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
455       ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
456       ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
457       uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
458       ASSERT_OK(Flush(1));  // This should trigger LogAndApply.
459       uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
460       ASSERT_GT(manifest_after_flush, manifest_before_flush);
461       ReopenWithColumnFamilies({"default", "pikachu"}, options);
462       ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
463       // check if a new manifest file got inserted or not.
464       ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
465       ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
466       ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
467     }
468   } while (ChangeCompactOptions());
469 }
470 
TEST_F(DBBasicTest,IdentityAcrossRestarts1)471 TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
472   do {
473     std::string id1;
474     ASSERT_OK(db_->GetDbIdentity(id1));
475 
476     Options options = CurrentOptions();
477     Reopen(options);
478     std::string id2;
479     ASSERT_OK(db_->GetDbIdentity(id2));
480     // id1 should match id2 because identity was not regenerated
481     ASSERT_EQ(id1.compare(id2), 0);
482 
483     std::string idfilename = IdentityFileName(dbname_);
484     ASSERT_OK(env_->DeleteFile(idfilename));
485     Reopen(options);
486     std::string id3;
487     ASSERT_OK(db_->GetDbIdentity(id3));
488     if (options.write_dbid_to_manifest) {
489       ASSERT_EQ(id1.compare(id3), 0);
490     } else {
491       // id1 should NOT match id3 because identity was regenerated
492       ASSERT_NE(id1.compare(id3), 0);
493     }
494   } while (ChangeCompactOptions());
495 }
496 
TEST_F(DBBasicTest,IdentityAcrossRestarts2)497 TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
498   do {
499     std::string id1;
500     ASSERT_OK(db_->GetDbIdentity(id1));
501 
502     Options options = CurrentOptions();
503     options.write_dbid_to_manifest = true;
504     Reopen(options);
505     std::string id2;
506     ASSERT_OK(db_->GetDbIdentity(id2));
507     // id1 should match id2 because identity was not regenerated
508     ASSERT_EQ(id1.compare(id2), 0);
509 
510     std::string idfilename = IdentityFileName(dbname_);
511     ASSERT_OK(env_->DeleteFile(idfilename));
512     Reopen(options);
513     std::string id3;
514     ASSERT_OK(db_->GetDbIdentity(id3));
515     // id1 should NOT match id3 because identity was regenerated
516     ASSERT_EQ(id1, id3);
517   } while (ChangeCompactOptions());
518 }
519 
520 #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest,Snapshot)521 TEST_F(DBBasicTest, Snapshot) {
522   anon::OptionsOverride options_override;
523   options_override.skip_policy = kSkipNoSnapshot;
524   do {
525     CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
526     Put(0, "foo", "0v1");
527     Put(1, "foo", "1v1");
528 
529     const Snapshot* s1 = db_->GetSnapshot();
530     ASSERT_EQ(1U, GetNumSnapshots());
531     uint64_t time_snap1 = GetTimeOldestSnapshots();
532     ASSERT_GT(time_snap1, 0U);
533     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
534     Put(0, "foo", "0v2");
535     Put(1, "foo", "1v2");
536 
537     env_->addon_time_.fetch_add(1);
538 
539     const Snapshot* s2 = db_->GetSnapshot();
540     ASSERT_EQ(2U, GetNumSnapshots());
541     ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
542     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
543     Put(0, "foo", "0v3");
544     Put(1, "foo", "1v3");
545 
546     {
547       ManagedSnapshot s3(db_);
548       ASSERT_EQ(3U, GetNumSnapshots());
549       ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
550       ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
551 
552       Put(0, "foo", "0v4");
553       Put(1, "foo", "1v4");
554       ASSERT_EQ("0v1", Get(0, "foo", s1));
555       ASSERT_EQ("1v1", Get(1, "foo", s1));
556       ASSERT_EQ("0v2", Get(0, "foo", s2));
557       ASSERT_EQ("1v2", Get(1, "foo", s2));
558       ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
559       ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
560       ASSERT_EQ("0v4", Get(0, "foo"));
561       ASSERT_EQ("1v4", Get(1, "foo"));
562     }
563 
564     ASSERT_EQ(2U, GetNumSnapshots());
565     ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
566     ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
567     ASSERT_EQ("0v1", Get(0, "foo", s1));
568     ASSERT_EQ("1v1", Get(1, "foo", s1));
569     ASSERT_EQ("0v2", Get(0, "foo", s2));
570     ASSERT_EQ("1v2", Get(1, "foo", s2));
571     ASSERT_EQ("0v4", Get(0, "foo"));
572     ASSERT_EQ("1v4", Get(1, "foo"));
573 
574     db_->ReleaseSnapshot(s1);
575     ASSERT_EQ("0v2", Get(0, "foo", s2));
576     ASSERT_EQ("1v2", Get(1, "foo", s2));
577     ASSERT_EQ("0v4", Get(0, "foo"));
578     ASSERT_EQ("1v4", Get(1, "foo"));
579     ASSERT_EQ(1U, GetNumSnapshots());
580     ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
581     ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
582 
583     db_->ReleaseSnapshot(s2);
584     ASSERT_EQ(0U, GetNumSnapshots());
585     ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
586     ASSERT_EQ("0v4", Get(0, "foo"));
587     ASSERT_EQ("1v4", Get(1, "foo"));
588   } while (ChangeOptions());
589 }
590 
591 #endif  // ROCKSDB_LITE
592 
TEST_F(DBBasicTest,CompactBetweenSnapshots)593 TEST_F(DBBasicTest, CompactBetweenSnapshots) {
594   anon::OptionsOverride options_override;
595   options_override.skip_policy = kSkipNoSnapshot;
596   do {
597     Options options = CurrentOptions(options_override);
598     options.disable_auto_compactions = true;
599     CreateAndReopenWithCF({"pikachu"}, options);
600     Random rnd(301);
601     FillLevels("a", "z", 1);
602 
603     Put(1, "foo", "first");
604     const Snapshot* snapshot1 = db_->GetSnapshot();
605     Put(1, "foo", "second");
606     Put(1, "foo", "third");
607     Put(1, "foo", "fourth");
608     const Snapshot* snapshot2 = db_->GetSnapshot();
609     Put(1, "foo", "fifth");
610     Put(1, "foo", "sixth");
611 
612     // All entries (including duplicates) exist
613     // before any compaction or flush is triggered.
614     ASSERT_EQ(AllEntriesFor("foo", 1),
615               "[ sixth, fifth, fourth, third, second, first ]");
616     ASSERT_EQ("sixth", Get(1, "foo"));
617     ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
618     ASSERT_EQ("first", Get(1, "foo", snapshot1));
619 
620     // After a flush, "second", "third" and "fifth" should
621     // be removed
622     ASSERT_OK(Flush(1));
623     ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
624 
625     // after we release the snapshot1, only two values left
626     db_->ReleaseSnapshot(snapshot1);
627     FillLevels("a", "z", 1);
628     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
629                            nullptr);
630 
631     // We have only one valid snapshot snapshot2. Since snapshot1 is
632     // not valid anymore, "first" should be removed by a compaction.
633     ASSERT_EQ("sixth", Get(1, "foo"));
634     ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
635     ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
636 
637     // after we release the snapshot2, only one value should be left
638     db_->ReleaseSnapshot(snapshot2);
639     FillLevels("a", "z", 1);
640     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
641                            nullptr);
642     ASSERT_EQ("sixth", Get(1, "foo"));
643     ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
644   } while (ChangeOptions(kSkipFIFOCompaction));
645 }
646 
TEST_F(DBBasicTest,DBOpen_Options)647 TEST_F(DBBasicTest, DBOpen_Options) {
648   Options options = CurrentOptions();
649   Close();
650   Destroy(options);
651 
652   // Does not exist, and create_if_missing == false: error
653   DB* db = nullptr;
654   options.create_if_missing = false;
655   Status s = DB::Open(options, dbname_, &db);
656   ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
657   ASSERT_TRUE(db == nullptr);
658 
659   // Does not exist, and create_if_missing == true: OK
660   options.create_if_missing = true;
661   s = DB::Open(options, dbname_, &db);
662   ASSERT_OK(s);
663   ASSERT_TRUE(db != nullptr);
664 
665   delete db;
666   db = nullptr;
667 
668   // Does exist, and error_if_exists == true: error
669   options.create_if_missing = false;
670   options.error_if_exists = true;
671   s = DB::Open(options, dbname_, &db);
672   ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
673   ASSERT_TRUE(db == nullptr);
674 
675   // Does exist, and error_if_exists == false: OK
676   options.create_if_missing = true;
677   options.error_if_exists = false;
678   s = DB::Open(options, dbname_, &db);
679   ASSERT_OK(s);
680   ASSERT_TRUE(db != nullptr);
681 
682   delete db;
683   db = nullptr;
684 }
685 
TEST_F(DBBasicTest,CompactOnFlush)686 TEST_F(DBBasicTest, CompactOnFlush) {
687   anon::OptionsOverride options_override;
688   options_override.skip_policy = kSkipNoSnapshot;
689   do {
690     Options options = CurrentOptions(options_override);
691     options.disable_auto_compactions = true;
692     CreateAndReopenWithCF({"pikachu"}, options);
693 
694     Put(1, "foo", "v1");
695     ASSERT_OK(Flush(1));
696     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
697 
698     // Write two new keys
699     Put(1, "a", "begin");
700     Put(1, "z", "end");
701     Flush(1);
702 
703     // Case1: Delete followed by a put
704     Delete(1, "foo");
705     Put(1, "foo", "v2");
706     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
707 
708     // After the current memtable is flushed, the DEL should
709     // have been removed
710     ASSERT_OK(Flush(1));
711     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
712 
713     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
714                            nullptr);
715     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
716 
717     // Case 2: Delete followed by another delete
718     Delete(1, "foo");
719     Delete(1, "foo");
720     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
721     ASSERT_OK(Flush(1));
722     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
723     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
724                            nullptr);
725     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
726 
727     // Case 3: Put followed by a delete
728     Put(1, "foo", "v3");
729     Delete(1, "foo");
730     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
731     ASSERT_OK(Flush(1));
732     ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
733     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
734                            nullptr);
735     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
736 
737     // Case 4: Put followed by another Put
738     Put(1, "foo", "v4");
739     Put(1, "foo", "v5");
740     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
741     ASSERT_OK(Flush(1));
742     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
743     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
744                            nullptr);
745     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
746 
747     // clear database
748     Delete(1, "foo");
749     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
750                            nullptr);
751     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
752 
753     // Case 5: Put followed by snapshot followed by another Put
754     // Both puts should remain.
755     Put(1, "foo", "v6");
756     const Snapshot* snapshot = db_->GetSnapshot();
757     Put(1, "foo", "v7");
758     ASSERT_OK(Flush(1));
759     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
760     db_->ReleaseSnapshot(snapshot);
761 
762     // clear database
763     Delete(1, "foo");
764     dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
765                            nullptr);
766     ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
767 
768     // Case 5: snapshot followed by a put followed by another Put
769     // Only the last put should remain.
770     const Snapshot* snapshot1 = db_->GetSnapshot();
771     Put(1, "foo", "v8");
772     Put(1, "foo", "v9");
773     ASSERT_OK(Flush(1));
774     ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
775     db_->ReleaseSnapshot(snapshot1);
776   } while (ChangeCompactOptions());
777 }
778 
TEST_F(DBBasicTest,FlushOneColumnFamily)779 TEST_F(DBBasicTest, FlushOneColumnFamily) {
780   Options options = CurrentOptions();
781   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
782                          "alyosha", "popovich"},
783                         options);
784 
785   ASSERT_OK(Put(0, "Default", "Default"));
786   ASSERT_OK(Put(1, "pikachu", "pikachu"));
787   ASSERT_OK(Put(2, "ilya", "ilya"));
788   ASSERT_OK(Put(3, "muromec", "muromec"));
789   ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
790   ASSERT_OK(Put(5, "nikitich", "nikitich"));
791   ASSERT_OK(Put(6, "alyosha", "alyosha"));
792   ASSERT_OK(Put(7, "popovich", "popovich"));
793 
794   for (int i = 0; i < 8; ++i) {
795     Flush(i);
796     auto tables = ListTableFiles(env_, dbname_);
797     ASSERT_EQ(tables.size(), i + 1U);
798   }
799 }
800 
TEST_F(DBBasicTest,MultiGetSimple)801 TEST_F(DBBasicTest, MultiGetSimple) {
802   do {
803     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
804     SetPerfLevel(kEnableCount);
805     ASSERT_OK(Put(1, "k1", "v1"));
806     ASSERT_OK(Put(1, "k2", "v2"));
807     ASSERT_OK(Put(1, "k3", "v3"));
808     ASSERT_OK(Put(1, "k4", "v4"));
809     ASSERT_OK(Delete(1, "k4"));
810     ASSERT_OK(Put(1, "k5", "v5"));
811     ASSERT_OK(Delete(1, "no_key"));
812 
813     std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
814 
815     std::vector<std::string> values(20, "Temporary data to be overwritten");
816     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
817 
818     get_perf_context()->Reset();
819     std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
820     ASSERT_EQ(values.size(), keys.size());
821     ASSERT_EQ(values[0], "v1");
822     ASSERT_EQ(values[1], "v2");
823     ASSERT_EQ(values[2], "v3");
824     ASSERT_EQ(values[4], "v5");
825     // four kv pairs * two bytes per value
826     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
827 
828     ASSERT_OK(s[0]);
829     ASSERT_OK(s[1]);
830     ASSERT_OK(s[2]);
831     ASSERT_TRUE(s[3].IsNotFound());
832     ASSERT_OK(s[4]);
833     ASSERT_TRUE(s[5].IsNotFound());
834     SetPerfLevel(kDisable);
835   } while (ChangeCompactOptions());
836 }
837 
TEST_F(DBBasicTest,MultiGetEmpty)838 TEST_F(DBBasicTest, MultiGetEmpty) {
839   do {
840     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
841     // Empty Key Set
842     std::vector<Slice> keys;
843     std::vector<std::string> values;
844     std::vector<ColumnFamilyHandle*> cfs;
845     std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
846     ASSERT_EQ(s.size(), 0U);
847 
848     // Empty Database, Empty Key Set
849     Options options = CurrentOptions();
850     options.create_if_missing = true;
851     DestroyAndReopen(options);
852     CreateAndReopenWithCF({"pikachu"}, options);
853     s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
854     ASSERT_EQ(s.size(), 0U);
855 
856     // Empty Database, Search for Keys
857     keys.resize(2);
858     keys[0] = "a";
859     keys[1] = "b";
860     cfs.push_back(handles_[0]);
861     cfs.push_back(handles_[1]);
862     s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
863     ASSERT_EQ(static_cast<int>(s.size()), 2);
864     ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
865   } while (ChangeCompactOptions());
866 }
867 
TEST_F(DBBasicTest,ChecksumTest)868 TEST_F(DBBasicTest, ChecksumTest) {
869   BlockBasedTableOptions table_options;
870   Options options = CurrentOptions();
871   // change when new checksum type added
872   int max_checksum = static_cast<int>(kxxHash64);
873   const int kNumPerFile = 2;
874 
875   // generate one table with each type of checksum
876   for (int i = 0; i <= max_checksum; ++i) {
877     table_options.checksum = static_cast<ChecksumType>(i);
878     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
879     Reopen(options);
880     for (int j = 0; j < kNumPerFile; ++j) {
881       ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
882     }
883     ASSERT_OK(Flush());
884   }
885 
886   // with each valid checksum type setting...
887   for (int i = 0; i <= max_checksum; ++i) {
888     table_options.checksum = static_cast<ChecksumType>(i);
889     options.table_factory.reset(NewBlockBasedTableFactory(table_options));
890     Reopen(options);
891     // verify every type of checksum (should be regardless of that setting)
892     for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
893       ASSERT_EQ(Key(j), Get(Key(j)));
894     }
895   }
896 }
897 
898 // On Windows you can have either memory mapped file or a file
899 // with unbuffered access. So this asserts and does not make
900 // sense to run
901 #ifndef OS_WIN
TEST_F(DBBasicTest,MmapAndBufferOptions)902 TEST_F(DBBasicTest, MmapAndBufferOptions) {
903   if (!IsMemoryMappedAccessSupported()) {
904     return;
905   }
906   Options options = CurrentOptions();
907 
908   options.use_direct_reads = true;
909   options.allow_mmap_reads = true;
910   ASSERT_NOK(TryReopen(options));
911 
912   // All other combinations are acceptable
913   options.use_direct_reads = false;
914   ASSERT_OK(TryReopen(options));
915 
916   if (IsDirectIOSupported()) {
917     options.use_direct_reads = true;
918     options.allow_mmap_reads = false;
919     ASSERT_OK(TryReopen(options));
920   }
921 
922   options.use_direct_reads = false;
923   ASSERT_OK(TryReopen(options));
924 }
925 #endif
926 
927 class TestEnv : public EnvWrapper {
928  public:
TestEnv(Env * base_env)929   explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
930 
931   class TestLogger : public Logger {
932    public:
933     using Logger::Logv;
TestLogger(TestEnv * env_ptr)934     explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger()935     ~TestLogger() override {
936       if (!closed_) {
937         CloseHelper();
938       }
939     }
Logv(const char *,va_list)940     void Logv(const char* /*format*/, va_list /*ap*/) override {}
941 
942    protected:
CloseImpl()943     Status CloseImpl() override { return CloseHelper(); }
944 
945    private:
CloseHelper()946     Status CloseHelper() {
947       env->CloseCountInc();
948       ;
949       return Status::IOError();
950     }
951     TestEnv* env;
952   };
953 
CloseCountInc()954   void CloseCountInc() { close_count++; }
955 
GetCloseCount()956   int GetCloseCount() { return close_count; }
957 
NewLogger(const std::string &,std::shared_ptr<Logger> * result)958   Status NewLogger(const std::string& /*fname*/,
959                    std::shared_ptr<Logger>* result) override {
960     result->reset(new TestLogger(this));
961     return Status::OK();
962   }
963 
964  private:
965   int close_count;
966 };
967 
TEST_F(DBBasicTest,DBClose)968 TEST_F(DBBasicTest, DBClose) {
969   Options options = GetDefaultOptions();
970   std::string dbname = test::PerThreadDBPath("db_close_test");
971   ASSERT_OK(DestroyDB(dbname, options));
972 
973   DB* db = nullptr;
974   TestEnv* env = new TestEnv(env_);
975   std::unique_ptr<TestEnv> local_env_guard(env);
976   options.create_if_missing = true;
977   options.env = env;
978   Status s = DB::Open(options, dbname, &db);
979   ASSERT_OK(s);
980   ASSERT_TRUE(db != nullptr);
981 
982   s = db->Close();
983   ASSERT_EQ(env->GetCloseCount(), 1);
984   ASSERT_EQ(s, Status::IOError());
985 
986   delete db;
987   ASSERT_EQ(env->GetCloseCount(), 1);
988 
989   // Do not call DB::Close() and ensure our logger Close() still gets called
990   s = DB::Open(options, dbname, &db);
991   ASSERT_OK(s);
992   ASSERT_TRUE(db != nullptr);
993   delete db;
994   ASSERT_EQ(env->GetCloseCount(), 2);
995 
996   // Provide our own logger and ensure DB::Close() does not close it
997   options.info_log.reset(new TestEnv::TestLogger(env));
998   options.create_if_missing = false;
999   s = DB::Open(options, dbname, &db);
1000   ASSERT_OK(s);
1001   ASSERT_TRUE(db != nullptr);
1002 
1003   s = db->Close();
1004   ASSERT_EQ(s, Status::OK());
1005   delete db;
1006   ASSERT_EQ(env->GetCloseCount(), 2);
1007   options.info_log.reset();
1008   ASSERT_EQ(env->GetCloseCount(), 3);
1009 }
1010 
TEST_F(DBBasicTest,DBCloseFlushError)1011 TEST_F(DBBasicTest, DBCloseFlushError) {
1012   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
1013       new FaultInjectionTestEnv(env_));
1014   Options options = GetDefaultOptions();
1015   options.create_if_missing = true;
1016   options.manual_wal_flush = true;
1017   options.write_buffer_size = 100;
1018   options.env = fault_injection_env.get();
1019 
1020   Reopen(options);
1021   ASSERT_OK(Put("key1", "value1"));
1022   ASSERT_OK(Put("key2", "value2"));
1023   ASSERT_OK(dbfull()->TEST_SwitchMemtable());
1024   ASSERT_OK(Put("key3", "value3"));
1025   fault_injection_env->SetFilesystemActive(false);
1026   Status s = dbfull()->Close();
1027   fault_injection_env->SetFilesystemActive(true);
1028   ASSERT_NE(s, Status::OK());
1029 
1030   Destroy(options);
1031 }
1032 
1033 class DBMultiGetTestWithParam : public DBBasicTest,
1034                                 public testing::WithParamInterface<bool> {};
1035 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCF)1036 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
1037   Options options = CurrentOptions();
1038   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1039                          "alyosha", "popovich"},
1040                         options);
1041   // <CF, key, value> tuples
1042   std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
1043   static const int num_keys = 24;
1044   cf_kv_vec.reserve(num_keys);
1045 
1046   for (int i = 0; i < num_keys; ++i) {
1047     int cf = i / 3;
1048     int cf_key = 1 % 3;
1049     cf_kv_vec.emplace_back(std::make_tuple(
1050         cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
1051         "cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
1052     ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1053                   std::get<2>(cf_kv_vec[i])));
1054   }
1055 
1056   int get_sv_count = 0;
1057   ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1058   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1059       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1060         if (++get_sv_count == 2) {
1061           // After MultiGet refs a couple of CFs, flush all CFs so MultiGet
1062           // is forced to repeat the process
1063           for (int i = 0; i < num_keys; ++i) {
1064             int cf = i / 3;
1065             int cf_key = i % 8;
1066             if (cf_key == 0) {
1067               ASSERT_OK(Flush(cf));
1068             }
1069             ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
1070                           std::get<2>(cf_kv_vec[i]) + "_2"));
1071           }
1072         }
1073         if (get_sv_count == 11) {
1074           for (int i = 0; i < 8; ++i) {
1075             auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1076                             db->GetColumnFamilyHandle(i))
1077                             ->cfd();
1078             ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1079           }
1080         }
1081       });
1082   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1083 
1084   std::vector<int> cfs;
1085   std::vector<std::string> keys;
1086   std::vector<std::string> values;
1087 
1088   for (int i = 0; i < num_keys; ++i) {
1089     cfs.push_back(std::get<0>(cf_kv_vec[i]));
1090     keys.push_back(std::get<1>(cf_kv_vec[i]));
1091   }
1092 
1093   values = MultiGet(cfs, keys, nullptr, GetParam());
1094   ASSERT_EQ(values.size(), num_keys);
1095   for (unsigned int j = 0; j < values.size(); ++j) {
1096     ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
1097   }
1098 
1099   keys.clear();
1100   cfs.clear();
1101   cfs.push_back(std::get<0>(cf_kv_vec[0]));
1102   keys.push_back(std::get<1>(cf_kv_vec[0]));
1103   cfs.push_back(std::get<0>(cf_kv_vec[3]));
1104   keys.push_back(std::get<1>(cf_kv_vec[3]));
1105   cfs.push_back(std::get<0>(cf_kv_vec[4]));
1106   keys.push_back(std::get<1>(cf_kv_vec[4]));
1107   values = MultiGet(cfs, keys, nullptr, GetParam());
1108   ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
1109   ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
1110   ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
1111 
1112   keys.clear();
1113   cfs.clear();
1114   cfs.push_back(std::get<0>(cf_kv_vec[7]));
1115   keys.push_back(std::get<1>(cf_kv_vec[7]));
1116   cfs.push_back(std::get<0>(cf_kv_vec[6]));
1117   keys.push_back(std::get<1>(cf_kv_vec[6]));
1118   cfs.push_back(std::get<0>(cf_kv_vec[1]));
1119   keys.push_back(std::get<1>(cf_kv_vec[1]));
1120   values = MultiGet(cfs, keys, nullptr, GetParam());
1121   ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
1122   ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
1123   ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
1124 
1125   for (int cf = 0; cf < 8; ++cf) {
1126     auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1127                     reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
1128                     ->cfd();
1129     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1130     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
1131   }
1132 }
1133 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFMutex)1134 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
1135   Options options = CurrentOptions();
1136   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1137                          "alyosha", "popovich"},
1138                         options);
1139 
1140   for (int i = 0; i < 8; ++i) {
1141     ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1142                   "cf" + std::to_string(i) + "_val"));
1143   }
1144 
1145   int get_sv_count = 0;
1146   int retries = 0;
1147   bool last_try = false;
1148   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1149       "DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
1150         last_try = true;
1151         ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
1152       });
1153   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1154       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1155         if (last_try) {
1156           return;
1157         }
1158         if (++get_sv_count == 2) {
1159           ++retries;
1160           get_sv_count = 0;
1161           for (int i = 0; i < 8; ++i) {
1162             ASSERT_OK(Flush(i));
1163             ASSERT_OK(Put(
1164                 i, "cf" + std::to_string(i) + "_key",
1165                 "cf" + std::to_string(i) + "_val" + std::to_string(retries)));
1166           }
1167         }
1168       });
1169   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1170 
1171   std::vector<int> cfs;
1172   std::vector<std::string> keys;
1173   std::vector<std::string> values;
1174 
1175   for (int i = 0; i < 8; ++i) {
1176     cfs.push_back(i);
1177     keys.push_back("cf" + std::to_string(i) + "_key");
1178   }
1179 
1180   values = MultiGet(cfs, keys, nullptr, GetParam());
1181   ASSERT_TRUE(last_try);
1182   ASSERT_EQ(values.size(), 8);
1183   for (unsigned int j = 0; j < values.size(); ++j) {
1184     ASSERT_EQ(values[j],
1185               "cf" + std::to_string(j) + "_val" + std::to_string(retries));
1186   }
1187   for (int i = 0; i < 8; ++i) {
1188     auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1189                     reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1190                     ->cfd();
1191     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1192   }
1193 }
1194 
TEST_P(DBMultiGetTestWithParam,MultiGetMultiCFSnapshot)1195 TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
1196   Options options = CurrentOptions();
1197   CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
1198                          "alyosha", "popovich"},
1199                         options);
1200 
1201   for (int i = 0; i < 8; ++i) {
1202     ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1203                   "cf" + std::to_string(i) + "_val"));
1204   }
1205 
1206   int get_sv_count = 0;
1207   ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
1208   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1209       "DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
1210         if (++get_sv_count == 2) {
1211           for (int i = 0; i < 8; ++i) {
1212             ASSERT_OK(Flush(i));
1213             ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
1214                           "cf" + std::to_string(i) + "_val2"));
1215           }
1216         }
1217         if (get_sv_count == 8) {
1218           for (int i = 0; i < 8; ++i) {
1219             auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1220                             db->GetColumnFamilyHandle(i))
1221                             ->cfd();
1222             ASSERT_TRUE(
1223                 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
1224                 (cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
1225           }
1226         }
1227       });
1228   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
1229 
1230   std::vector<int> cfs;
1231   std::vector<std::string> keys;
1232   std::vector<std::string> values;
1233 
1234   for (int i = 0; i < 8; ++i) {
1235     cfs.push_back(i);
1236     keys.push_back("cf" + std::to_string(i) + "_key");
1237   }
1238 
1239   const Snapshot* snapshot = db_->GetSnapshot();
1240   values = MultiGet(cfs, keys, snapshot, GetParam());
1241   db_->ReleaseSnapshot(snapshot);
1242   ASSERT_EQ(values.size(), 8);
1243   for (unsigned int j = 0; j < values.size(); ++j) {
1244     ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
1245   }
1246   for (int i = 0; i < 8; ++i) {
1247     auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
1248                     reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
1249                     ->cfd();
1250     ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
1251   }
1252 }
1253 
1254 INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
1255                         testing::Bool());
1256 
TEST_F(DBBasicTest,MultiGetBatchedSimpleUnsorted)1257 TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
1258   do {
1259     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1260     SetPerfLevel(kEnableCount);
1261     ASSERT_OK(Put(1, "k1", "v1"));
1262     ASSERT_OK(Put(1, "k2", "v2"));
1263     ASSERT_OK(Put(1, "k3", "v3"));
1264     ASSERT_OK(Put(1, "k4", "v4"));
1265     ASSERT_OK(Delete(1, "k4"));
1266     ASSERT_OK(Put(1, "k5", "v5"));
1267     ASSERT_OK(Delete(1, "no_key"));
1268 
1269     get_perf_context()->Reset();
1270 
1271     std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
1272     std::vector<PinnableSlice> values(keys.size());
1273     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1274     std::vector<Status> s(keys.size());
1275 
1276     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1277                   values.data(), s.data(), false);
1278 
1279     ASSERT_EQ(values.size(), keys.size());
1280     ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
1281     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
1282     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1283     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1284     // four kv pairs * two bytes per value
1285     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1286 
1287     ASSERT_TRUE(s[0].IsNotFound());
1288     ASSERT_OK(s[1]);
1289     ASSERT_TRUE(s[2].IsNotFound());
1290     ASSERT_OK(s[3]);
1291     ASSERT_OK(s[4]);
1292     ASSERT_OK(s[5]);
1293 
1294     SetPerfLevel(kDisable);
1295   } while (ChangeCompactOptions());
1296 }
1297 
TEST_F(DBBasicTest,MultiGetBatchedSimpleSorted)1298 TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
1299   do {
1300     CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
1301     SetPerfLevel(kEnableCount);
1302     ASSERT_OK(Put(1, "k1", "v1"));
1303     ASSERT_OK(Put(1, "k2", "v2"));
1304     ASSERT_OK(Put(1, "k3", "v3"));
1305     ASSERT_OK(Put(1, "k4", "v4"));
1306     ASSERT_OK(Delete(1, "k4"));
1307     ASSERT_OK(Put(1, "k5", "v5"));
1308     ASSERT_OK(Delete(1, "no_key"));
1309 
1310     get_perf_context()->Reset();
1311 
1312     std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
1313     std::vector<PinnableSlice> values(keys.size());
1314     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1315     std::vector<Status> s(keys.size());
1316 
1317     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1318                   values.data(), s.data(), true);
1319 
1320     ASSERT_EQ(values.size(), keys.size());
1321     ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
1322     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
1323     ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1324     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
1325     // four kv pairs * two bytes per value
1326     ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
1327 
1328     ASSERT_OK(s[0]);
1329     ASSERT_OK(s[1]);
1330     ASSERT_OK(s[2]);
1331     ASSERT_TRUE(s[3].IsNotFound());
1332     ASSERT_OK(s[4]);
1333     ASSERT_TRUE(s[5].IsNotFound());
1334 
1335     SetPerfLevel(kDisable);
1336   } while (ChangeCompactOptions());
1337 }
1338 
TEST_F(DBBasicTest,MultiGetBatchedMultiLevel)1339 TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
1340   Options options = CurrentOptions();
1341   options.disable_auto_compactions = true;
1342   Reopen(options);
1343   int num_keys = 0;
1344 
1345   for (int i = 0; i < 128; ++i) {
1346     ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1347     num_keys++;
1348     if (num_keys == 8) {
1349       Flush();
1350       num_keys = 0;
1351     }
1352   }
1353   if (num_keys > 0) {
1354     Flush();
1355     num_keys = 0;
1356   }
1357   MoveFilesToLevel(2);
1358 
1359   for (int i = 0; i < 128; i += 3) {
1360     ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1361     num_keys++;
1362     if (num_keys == 8) {
1363       Flush();
1364       num_keys = 0;
1365     }
1366   }
1367   if (num_keys > 0) {
1368     Flush();
1369     num_keys = 0;
1370   }
1371   MoveFilesToLevel(1);
1372 
1373   for (int i = 0; i < 128; i += 5) {
1374     ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1375     num_keys++;
1376     if (num_keys == 8) {
1377       Flush();
1378       num_keys = 0;
1379     }
1380   }
1381   if (num_keys > 0) {
1382     Flush();
1383     num_keys = 0;
1384   }
1385   ASSERT_EQ(0, num_keys);
1386 
1387   for (int i = 0; i < 128; i += 9) {
1388     ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1389   }
1390 
1391   std::vector<std::string> keys;
1392   std::vector<std::string> values;
1393 
1394   for (int i = 64; i < 80; ++i) {
1395     keys.push_back("key_" + std::to_string(i));
1396   }
1397 
1398   values = MultiGet(keys, nullptr);
1399   ASSERT_EQ(values.size(), 16);
1400   for (unsigned int j = 0; j < values.size(); ++j) {
1401     int key = j + 64;
1402     if (key % 9 == 0) {
1403       ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
1404     } else if (key % 5 == 0) {
1405       ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
1406     } else if (key % 3 == 0) {
1407       ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
1408     } else {
1409       ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
1410     }
1411   }
1412 }
1413 
TEST_F(DBBasicTest,MultiGetBatchedMultiLevelMerge)1414 TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
1415   Options options = CurrentOptions();
1416   options.disable_auto_compactions = true;
1417   options.merge_operator = MergeOperators::CreateStringAppendOperator();
1418   BlockBasedTableOptions bbto;
1419   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1420   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1421   Reopen(options);
1422   int num_keys = 0;
1423 
1424   for (int i = 0; i < 128; ++i) {
1425     ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
1426     num_keys++;
1427     if (num_keys == 8) {
1428       Flush();
1429       num_keys = 0;
1430     }
1431   }
1432   if (num_keys > 0) {
1433     Flush();
1434     num_keys = 0;
1435   }
1436   MoveFilesToLevel(2);
1437 
1438   for (int i = 0; i < 128; i += 3) {
1439     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
1440     num_keys++;
1441     if (num_keys == 8) {
1442       Flush();
1443       num_keys = 0;
1444     }
1445   }
1446   if (num_keys > 0) {
1447     Flush();
1448     num_keys = 0;
1449   }
1450   MoveFilesToLevel(1);
1451 
1452   for (int i = 0; i < 128; i += 5) {
1453     ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
1454     num_keys++;
1455     if (num_keys == 8) {
1456       Flush();
1457       num_keys = 0;
1458     }
1459   }
1460   if (num_keys > 0) {
1461     Flush();
1462     num_keys = 0;
1463   }
1464   ASSERT_EQ(0, num_keys);
1465 
1466   for (int i = 0; i < 128; i += 9) {
1467     ASSERT_OK(
1468         Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
1469   }
1470 
1471   std::vector<std::string> keys;
1472   std::vector<std::string> values;
1473 
1474   for (int i = 32; i < 80; ++i) {
1475     keys.push_back("key_" + std::to_string(i));
1476   }
1477 
1478   values = MultiGet(keys, nullptr);
1479   ASSERT_EQ(values.size(), keys.size());
1480   for (unsigned int j = 0; j < 48; ++j) {
1481     int key = j + 32;
1482     std::string value;
1483     value.append("val_l2_" + std::to_string(key));
1484     if (key % 3 == 0) {
1485       value.append(",");
1486       value.append("val_l1_" + std::to_string(key));
1487     }
1488     if (key % 5 == 0) {
1489       value.append(",");
1490       value.append("val_l0_" + std::to_string(key));
1491     }
1492     if (key % 9 == 0) {
1493       value.append(",");
1494       value.append("val_mem_" + std::to_string(key));
1495     }
1496     ASSERT_EQ(values[j], value);
1497   }
1498 }
1499 
1500 // Test class for batched MultiGet with prefix extractor
1501 // Param bool - If true, use partitioned filters
1502 //              If false, use full filter block
1503 class MultiGetPrefixExtractorTest : public DBBasicTest,
1504                                     public ::testing::WithParamInterface<bool> {
1505 };
1506 
TEST_P(MultiGetPrefixExtractorTest,Batched)1507 TEST_P(MultiGetPrefixExtractorTest, Batched) {
1508   Options options = CurrentOptions();
1509   options.prefix_extractor.reset(NewFixedPrefixTransform(2));
1510   options.memtable_prefix_bloom_size_ratio = 10;
1511   BlockBasedTableOptions bbto;
1512   if (GetParam()) {
1513     bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1514     bbto.partition_filters = true;
1515   }
1516   bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
1517   bbto.whole_key_filtering = false;
1518   bbto.cache_index_and_filter_blocks = false;
1519   options.table_factory.reset(NewBlockBasedTableFactory(bbto));
1520   Reopen(options);
1521 
1522   SetPerfLevel(kEnableCount);
1523   get_perf_context()->Reset();
1524 
1525   // First key is not in the prefix_extractor domain
1526   ASSERT_OK(Put("k", "v0"));
1527   ASSERT_OK(Put("kk1", "v1"));
1528   ASSERT_OK(Put("kk2", "v2"));
1529   ASSERT_OK(Put("kk3", "v3"));
1530   ASSERT_OK(Put("kk4", "v4"));
1531   std::vector<std::string> mem_keys(
1532       {"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
1533   std::vector<std::string> inmem_values;
1534   inmem_values = MultiGet(mem_keys, nullptr);
1535   ASSERT_EQ(inmem_values[0], "v0");
1536   ASSERT_EQ(inmem_values[1], "v1");
1537   ASSERT_EQ(inmem_values[2], "v2");
1538   ASSERT_EQ(inmem_values[3], "v3");
1539   ASSERT_EQ(inmem_values[4], "v4");
1540   ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
1541   ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
1542   ASSERT_OK(Flush());
1543 
1544   std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
1545   std::vector<std::string> values;
1546   get_perf_context()->Reset();
1547   values = MultiGet(keys, nullptr);
1548   ASSERT_EQ(values[0], "v0");
1549   ASSERT_EQ(values[1], "v1");
1550   ASSERT_EQ(values[2], "v2");
1551   ASSERT_EQ(values[3], "v3");
1552   ASSERT_EQ(values[4], "v4");
1553   // Filter hits for 4 in-domain keys
1554   ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
1555 }
1556 
1557 INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
1558                         ::testing::Bool());
1559 
1560 #ifndef ROCKSDB_LITE
1561 class DBMultiGetRowCacheTest : public DBBasicTest,
1562                                public ::testing::WithParamInterface<bool> {};
1563 
TEST_P(DBMultiGetRowCacheTest,MultiGetBatched)1564 TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
1565   do {
1566     option_config_ = kRowCache;
1567     Options options = CurrentOptions();
1568     options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1569     CreateAndReopenWithCF({"pikachu"}, options);
1570     SetPerfLevel(kEnableCount);
1571     ASSERT_OK(Put(1, "k1", "v1"));
1572     ASSERT_OK(Put(1, "k2", "v2"));
1573     ASSERT_OK(Put(1, "k3", "v3"));
1574     ASSERT_OK(Put(1, "k4", "v4"));
1575     Flush(1);
1576     ASSERT_OK(Put(1, "k5", "v5"));
1577     const Snapshot* snap1 = dbfull()->GetSnapshot();
1578     ASSERT_OK(Delete(1, "k4"));
1579     Flush(1);
1580     const Snapshot* snap2 = dbfull()->GetSnapshot();
1581 
1582     get_perf_context()->Reset();
1583 
1584     std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
1585     std::vector<PinnableSlice> values(keys.size());
1586     std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
1587     std::vector<Status> s(keys.size());
1588 
1589     ReadOptions ro;
1590     bool use_snapshots = GetParam();
1591     if (use_snapshots) {
1592       ro.snapshot = snap2;
1593     }
1594     db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
1595                   s.data(), false);
1596 
1597     ASSERT_EQ(values.size(), keys.size());
1598     ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
1599     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
1600     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1601     // four kv pairs * two bytes per value
1602     ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1603 
1604     ASSERT_TRUE(s[0].IsNotFound());
1605     ASSERT_OK(s[1]);
1606     ASSERT_TRUE(s[2].IsNotFound());
1607     ASSERT_OK(s[3]);
1608     ASSERT_OK(s[4]);
1609 
1610     // Call MultiGet() again with some intersection with the previous set of
1611     // keys. Those should already be in the row cache.
1612     keys.assign({"no_key", "k5", "k3", "k2"});
1613     for (size_t i = 0; i < keys.size(); ++i) {
1614       values[i].Reset();
1615       s[i] = Status::OK();
1616     }
1617     get_perf_context()->Reset();
1618 
1619     if (use_snapshots) {
1620       ro.snapshot = snap1;
1621     }
1622     db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
1623                   values.data(), s.data(), false);
1624 
1625     ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
1626     ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
1627     ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
1628     // four kv pairs * two bytes per value
1629     ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
1630 
1631     ASSERT_TRUE(s[0].IsNotFound());
1632     ASSERT_OK(s[1]);
1633     ASSERT_OK(s[2]);
1634     ASSERT_OK(s[3]);
1635     if (use_snapshots) {
1636       // Only reads from the first SST file would have been cached, since
1637       // snapshot seq no is > fd.largest_seqno
1638       ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
1639     } else {
1640       ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
1641     }
1642 
1643     SetPerfLevel(kDisable);
1644     dbfull()->ReleaseSnapshot(snap1);
1645     dbfull()->ReleaseSnapshot(snap2);
1646   } while (ChangeCompactOptions());
1647 }
1648 
1649 INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
1650                         testing::Values(true, false));
1651 
TEST_F(DBBasicTest,GetAllKeyVersions)1652 TEST_F(DBBasicTest, GetAllKeyVersions) {
1653   Options options = CurrentOptions();
1654   options.env = env_;
1655   options.create_if_missing = true;
1656   options.disable_auto_compactions = true;
1657   CreateAndReopenWithCF({"pikachu"}, options);
1658   ASSERT_EQ(2, handles_.size());
1659   const size_t kNumInserts = 4;
1660   const size_t kNumDeletes = 4;
1661   const size_t kNumUpdates = 4;
1662 
1663   // Check default column family
1664   for (size_t i = 0; i != kNumInserts; ++i) {
1665     ASSERT_OK(Put(std::to_string(i), "value"));
1666   }
1667   for (size_t i = 0; i != kNumUpdates; ++i) {
1668     ASSERT_OK(Put(std::to_string(i), "value1"));
1669   }
1670   for (size_t i = 0; i != kNumDeletes; ++i) {
1671     ASSERT_OK(Delete(std::to_string(i)));
1672   }
1673   std::vector<KeyVersion> key_versions;
1674   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1675       db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
1676       &key_versions));
1677   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1678   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1679       db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1680       &key_versions));
1681   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
1682 
1683   // Check non-default column family
1684   for (size_t i = 0; i != kNumInserts - 1; ++i) {
1685     ASSERT_OK(Put(1, std::to_string(i), "value"));
1686   }
1687   for (size_t i = 0; i != kNumUpdates - 1; ++i) {
1688     ASSERT_OK(Put(1, std::to_string(i), "value1"));
1689   }
1690   for (size_t i = 0; i != kNumDeletes - 1; ++i) {
1691     ASSERT_OK(Delete(1, std::to_string(i)));
1692   }
1693   ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
1694       db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
1695       &key_versions));
1696   ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
1697 }
1698 #endif  // !ROCKSDB_LITE
1699 
TEST_F(DBBasicTest,MultiGetIOBufferOverrun)1700 TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
1701   Options options = CurrentOptions();
1702   Random rnd(301);
1703   BlockBasedTableOptions table_options;
1704   table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1705   table_options.block_size = 16 * 1024;
1706   ASSERT_TRUE(table_options.block_size >
1707             BlockBasedTable::kMultiGetReadStackBufSize);
1708   options.table_factory.reset(new BlockBasedTableFactory(table_options));
1709   Reopen(options);
1710 
1711   std::string zero_str(128, '\0');
1712   for (int i = 0; i < 100; ++i) {
1713     // Make the value compressible. A purely random string doesn't compress
1714     // and the resultant data block will not be compressed
1715     std::string value(RandomString(&rnd, 128) + zero_str);
1716     assert(Put(Key(i), value) == Status::OK());
1717   }
1718   Flush();
1719 
1720   std::vector<std::string> key_data(10);
1721   std::vector<Slice> keys;
1722   // We cannot resize a PinnableSlice vector, so just set initial size to
1723   // largest we think we will need
1724   std::vector<PinnableSlice> values(10);
1725   std::vector<Status> statuses;
1726   ReadOptions ro;
1727 
1728   // Warm up the cache first
1729   key_data.emplace_back(Key(0));
1730   keys.emplace_back(Slice(key_data.back()));
1731   key_data.emplace_back(Key(50));
1732   keys.emplace_back(Slice(key_data.back()));
1733   statuses.resize(keys.size());
1734 
1735   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
1736                      keys.data(), values.data(), statuses.data(), true);
1737 }
1738 
TEST_F(DBBasicTest,IncrementalRecoveryNoCorrupt)1739 TEST_F(DBBasicTest, IncrementalRecoveryNoCorrupt) {
1740   Options options = CurrentOptions();
1741   DestroyAndReopen(options);
1742   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
1743   size_t num_cfs = handles_.size();
1744   ASSERT_EQ(3, num_cfs);
1745   WriteOptions write_opts;
1746   write_opts.disableWAL = true;
1747   for (size_t cf = 0; cf != num_cfs; ++cf) {
1748     for (size_t i = 0; i != 10000; ++i) {
1749       std::string key_str = Key(static_cast<int>(i));
1750       std::string value_str = std::to_string(cf) + "_" + std::to_string(i);
1751 
1752       ASSERT_OK(Put(static_cast<int>(cf), key_str, value_str));
1753       if (0 == (i % 1000)) {
1754         ASSERT_OK(Flush(static_cast<int>(cf)));
1755       }
1756     }
1757   }
1758   for (size_t cf = 0; cf != num_cfs; ++cf) {
1759     ASSERT_OK(Flush(static_cast<int>(cf)));
1760   }
1761   Close();
1762   options.best_efforts_recovery = true;
1763   ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu", "eevee"},
1764                            options);
1765   num_cfs = handles_.size();
1766   ASSERT_EQ(3, num_cfs);
1767   for (size_t cf = 0; cf != num_cfs; ++cf) {
1768     for (int i = 0; i != 10000; ++i) {
1769       std::string key_str = Key(static_cast<int>(i));
1770       std::string expected_value_str =
1771           std::to_string(cf) + "_" + std::to_string(i);
1772       ASSERT_EQ(expected_value_str, Get(static_cast<int>(cf), key_str));
1773     }
1774   }
1775 }
1776 
1777 #ifndef ROCKSDB_LITE
1778 namespace {
1779 class TableFileListener : public EventListener {
1780  public:
OnTableFileCreated(const TableFileCreationInfo & info)1781   void OnTableFileCreated(const TableFileCreationInfo& info) override {
1782     InstrumentedMutexLock lock(&mutex_);
1783     cf_to_paths_[info.cf_name].push_back(info.file_path);
1784   }
GetFiles(const std::string & cf_name)1785   std::vector<std::string>& GetFiles(const std::string& cf_name) {
1786     InstrumentedMutexLock lock(&mutex_);
1787     return cf_to_paths_[cf_name];
1788   }
1789 
1790  private:
1791   InstrumentedMutex mutex_;
1792   std::unordered_map<std::string, std::vector<std::string>> cf_to_paths_;
1793 };
1794 }  // namespace
1795 
TEST_F(DBBasicTest,RecoverWithMissingFiles)1796 TEST_F(DBBasicTest, RecoverWithMissingFiles) {
1797   Options options = CurrentOptions();
1798   DestroyAndReopen(options);
1799   TableFileListener* listener = new TableFileListener();
1800   // Disable auto compaction to simplify SST file name tracking.
1801   options.disable_auto_compactions = true;
1802   options.listeners.emplace_back(listener);
1803   CreateAndReopenWithCF({"pikachu", "eevee"}, options);
1804   std::vector<std::string> all_cf_names = {kDefaultColumnFamilyName, "pikachu",
1805                                            "eevee"};
1806   size_t num_cfs = handles_.size();
1807   ASSERT_EQ(3, num_cfs);
1808   for (size_t cf = 0; cf != num_cfs; ++cf) {
1809     ASSERT_OK(Put(static_cast<int>(cf), "a", "0_value"));
1810     ASSERT_OK(Flush(static_cast<int>(cf)));
1811     ASSERT_OK(Put(static_cast<int>(cf), "b", "0_value"));
1812     ASSERT_OK(Flush(static_cast<int>(cf)));
1813     ASSERT_OK(Put(static_cast<int>(cf), "c", "0_value"));
1814     ASSERT_OK(Flush(static_cast<int>(cf)));
1815   }
1816 
1817   // Delete files
1818   for (size_t i = 0; i < all_cf_names.size(); ++i) {
1819     std::vector<std::string>& files = listener->GetFiles(all_cf_names[i]);
1820     ASSERT_EQ(3, files.size());
1821     for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
1822          --j) {
1823       ASSERT_OK(env_->DeleteFile(files[j]));
1824     }
1825   }
1826   options.best_efforts_recovery = true;
1827   ReopenWithColumnFamilies(all_cf_names, options);
1828   // Verify data
1829   ReadOptions read_opts;
1830   read_opts.total_order_seek = true;
1831   {
1832     std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
1833     iter->SeekToFirst();
1834     ASSERT_FALSE(iter->Valid());
1835     iter.reset(db_->NewIterator(read_opts, handles_[1]));
1836     iter->SeekToFirst();
1837     ASSERT_TRUE(iter->Valid());
1838     ASSERT_EQ("a", iter->key());
1839     iter->Next();
1840     ASSERT_FALSE(iter->Valid());
1841     iter.reset(db_->NewIterator(read_opts, handles_[2]));
1842     iter->SeekToFirst();
1843     ASSERT_TRUE(iter->Valid());
1844     ASSERT_EQ("a", iter->key());
1845     iter->Next();
1846     ASSERT_TRUE(iter->Valid());
1847     ASSERT_EQ("b", iter->key());
1848     iter->Next();
1849     ASSERT_FALSE(iter->Valid());
1850   }
1851 }
1852 
TEST_F(DBBasicTest,SkipWALIfMissingTableFiles)1853 TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
1854   Options options = CurrentOptions();
1855   DestroyAndReopen(options);
1856   TableFileListener* listener = new TableFileListener();
1857   options.listeners.emplace_back(listener);
1858   CreateAndReopenWithCF({"pikachu"}, options);
1859   std::vector<std::string> kAllCfNames = {kDefaultColumnFamilyName, "pikachu"};
1860   size_t num_cfs = handles_.size();
1861   ASSERT_EQ(2, num_cfs);
1862   for (int cf = 0; cf < static_cast<int>(kAllCfNames.size()); ++cf) {
1863     ASSERT_OK(Put(cf, "a", "0_value"));
1864     ASSERT_OK(Flush(cf));
1865     ASSERT_OK(Put(cf, "b", "0_value"));
1866   }
1867   // Delete files
1868   for (size_t i = 0; i < kAllCfNames.size(); ++i) {
1869     std::vector<std::string>& files = listener->GetFiles(kAllCfNames[i]);
1870     ASSERT_EQ(1, files.size());
1871     for (int j = static_cast<int>(files.size() - 1); j >= static_cast<int>(i);
1872          --j) {
1873       ASSERT_OK(env_->DeleteFile(files[j]));
1874     }
1875   }
1876   options.best_efforts_recovery = true;
1877   ReopenWithColumnFamilies(kAllCfNames, options);
1878   // Verify WAL is not applied
1879   ReadOptions read_opts;
1880   read_opts.total_order_seek = true;
1881   std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
1882   iter->SeekToFirst();
1883   ASSERT_FALSE(iter->Valid());
1884   iter.reset(db_->NewIterator(read_opts, handles_[1]));
1885   iter->SeekToFirst();
1886   ASSERT_TRUE(iter->Valid());
1887   ASSERT_EQ("a", iter->key());
1888   iter->Next();
1889   ASSERT_FALSE(iter->Valid());
1890 }
1891 #endif  // !ROCKSDB_LITE
1892 
1893 class DBBasicTestWithParallelIO
1894     : public DBTestBase,
1895       public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
1896  public:
DBBasicTestWithParallelIO()1897   DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
1898     bool compressed_cache = std::get<0>(GetParam());
1899     bool uncompressed_cache = std::get<1>(GetParam());
1900     compression_enabled_ = std::get<2>(GetParam());
1901     fill_cache_ = std::get<3>(GetParam());
1902 
1903     if (compressed_cache) {
1904       std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1905       compressed_cache_ = std::make_shared<MyBlockCache>(cache);
1906     }
1907     if (uncompressed_cache) {
1908       std::shared_ptr<Cache> cache = NewLRUCache(1048576);
1909       uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
1910     }
1911 
1912     env_->count_random_reads_ = true;
1913 
1914     Options options = CurrentOptions();
1915     Random rnd(301);
1916     BlockBasedTableOptions table_options;
1917 
1918 #ifndef ROCKSDB_LITE
1919     if (compression_enabled_) {
1920       std::vector<CompressionType> compression_types;
1921       compression_types = GetSupportedCompressions();
1922       // Not every platform may have compression libraries available, so
1923       // dynamically pick based on what's available
1924       CompressionType tmp_type = kNoCompression;
1925       for (auto c_type : compression_types) {
1926         if (c_type != kNoCompression) {
1927           tmp_type = c_type;
1928           break;
1929         }
1930       }
1931       if (tmp_type != kNoCompression) {
1932         options.compression = tmp_type;
1933       } else {
1934         compression_enabled_ = false;
1935       }
1936     }
1937 #else
1938     // GetSupportedCompressions() is not available in LITE build
1939     if (!Snappy_Supported()) {
1940       compression_enabled_ = false;
1941     }
1942 #endif  // ROCKSDB_LITE
1943 
1944     table_options.block_cache = uncompressed_cache_;
1945     if (table_options.block_cache == nullptr) {
1946       table_options.no_block_cache = true;
1947     } else {
1948       table_options.pin_l0_filter_and_index_blocks_in_cache = true;
1949     }
1950     table_options.block_cache_compressed = compressed_cache_;
1951     table_options.flush_block_policy_factory.reset(
1952         new MyFlushBlockPolicyFactory());
1953     options.table_factory.reset(new BlockBasedTableFactory(table_options));
1954     if (!compression_enabled_) {
1955       options.compression = kNoCompression;
1956     }
1957     Reopen(options);
1958 
1959     std::string zero_str(128, '\0');
1960     for (int i = 0; i < 100; ++i) {
1961       // Make the value compressible. A purely random string doesn't compress
1962       // and the resultant data block will not be compressed
1963       values_.emplace_back(RandomString(&rnd, 128) + zero_str);
1964       assert(Put(Key(i), values_[i]) == Status::OK());
1965     }
1966     Flush();
1967 
1968     for (int i = 0; i < 100; ++i) {
1969       // block cannot gain space by compression
1970       uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
1971       std::string tmp_key = "a" + Key(i);
1972       assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
1973     }
1974     Flush();
1975   }
1976 
CheckValue(int i,const std::string & value)1977   bool CheckValue(int i, const std::string& value) {
1978     if (values_[i].compare(value) == 0) {
1979       return true;
1980     }
1981     return false;
1982   }
1983 
CheckUncompressableValue(int i,const std::string & value)1984   bool CheckUncompressableValue(int i, const std::string& value) {
1985     if (uncompressable_values_[i].compare(value) == 0) {
1986       return true;
1987     }
1988     return false;
1989   }
1990 
num_lookups()1991   int num_lookups() { return uncompressed_cache_->num_lookups(); }
num_found()1992   int num_found() { return uncompressed_cache_->num_found(); }
num_inserts()1993   int num_inserts() { return uncompressed_cache_->num_inserts(); }
1994 
num_lookups_compressed()1995   int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
num_found_compressed()1996   int num_found_compressed() { return compressed_cache_->num_found(); }
num_inserts_compressed()1997   int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
1998 
fill_cache()1999   bool fill_cache() { return fill_cache_; }
compression_enabled()2000   bool compression_enabled() { return compression_enabled_; }
has_compressed_cache()2001   bool has_compressed_cache() { return compressed_cache_ != nullptr; }
has_uncompressed_cache()2002   bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
2003 
SetUpTestCase()2004   static void SetUpTestCase() {}
TearDownTestCase()2005   static void TearDownTestCase() {}
2006 
2007  private:
2008   class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
2009    public:
MyFlushBlockPolicyFactory()2010     MyFlushBlockPolicyFactory() {}
2011 
Name() const2012     virtual const char* Name() const override {
2013       return "MyFlushBlockPolicyFactory";
2014     }
2015 
NewFlushBlockPolicy(const BlockBasedTableOptions &,const BlockBuilder & data_block_builder) const2016     virtual FlushBlockPolicy* NewFlushBlockPolicy(
2017         const BlockBasedTableOptions& /*table_options*/,
2018         const BlockBuilder& data_block_builder) const override {
2019       return new MyFlushBlockPolicy(data_block_builder);
2020     }
2021   };
2022 
2023   class MyFlushBlockPolicy : public FlushBlockPolicy {
2024    public:
MyFlushBlockPolicy(const BlockBuilder & data_block_builder)2025     explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
2026         : num_keys_(0), data_block_builder_(data_block_builder) {}
2027 
Update(const Slice &,const Slice &)2028     bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
2029       if (data_block_builder_.empty()) {
2030         // First key in this block
2031         num_keys_ = 1;
2032         return false;
2033       }
2034       // Flush every 10 keys
2035       if (num_keys_ == 10) {
2036         num_keys_ = 1;
2037         return true;
2038       }
2039       num_keys_++;
2040       return false;
2041     }
2042 
2043    private:
2044     int num_keys_;
2045     const BlockBuilder& data_block_builder_;
2046   };
2047 
2048   class MyBlockCache : public Cache {
2049    public:
MyBlockCache(std::shared_ptr<Cache> & target)2050     explicit MyBlockCache(std::shared_ptr<Cache>& target)
2051         : target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
2052 
Name() const2053     virtual const char* Name() const override { return "MyBlockCache"; }
2054 
Insert(const Slice & key,void * value,size_t charge,Deleter * deleter,Handle ** handle=nullptr,Priority priority=Priority::LOW)2055     virtual Status Insert(const Slice& key, void* value, size_t charge,
2056                           Deleter* deleter, Handle** handle = nullptr,
2057                           Priority priority = Priority::LOW) override {
2058       num_inserts_++;
2059       return target_->Insert(key, value, charge, deleter, handle, priority);
2060     }
2061 
Lookup(const Slice & key,Statistics * stats=nullptr)2062     virtual Handle* Lookup(const Slice& key,
2063                            Statistics* stats = nullptr) override {
2064       num_lookups_++;
2065       Handle* handle = target_->Lookup(key, stats);
2066       if (handle != nullptr) {
2067         num_found_++;
2068       }
2069       return handle;
2070     }
2071 
Ref(Handle * handle)2072     virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
2073 
Release(Handle * handle,bool force_erase=false)2074     virtual bool Release(Handle* handle, bool force_erase = false) override {
2075       return target_->Release(handle, force_erase);
2076     }
2077 
Value(Handle * handle)2078     virtual void* Value(Handle* handle) override {
2079       return target_->Value(handle);
2080     }
2081 
Erase(const Slice & key)2082     virtual void Erase(const Slice& key) override { target_->Erase(key); }
NewId()2083     virtual uint64_t NewId() override { return target_->NewId(); }
2084 
SetCapacity(size_t capacity)2085     virtual void SetCapacity(size_t capacity) override {
2086       target_->SetCapacity(capacity);
2087     }
2088 
SetStrictCapacityLimit(bool strict_capacity_limit)2089     virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
2090       target_->SetStrictCapacityLimit(strict_capacity_limit);
2091     }
2092 
HasStrictCapacityLimit() const2093     virtual bool HasStrictCapacityLimit() const override {
2094       return target_->HasStrictCapacityLimit();
2095     }
2096 
GetCapacity() const2097     virtual size_t GetCapacity() const override {
2098       return target_->GetCapacity();
2099     }
2100 
GetUsage() const2101     virtual size_t GetUsage() const override { return target_->GetUsage(); }
2102 
GetUsage(Handle * handle) const2103     virtual size_t GetUsage(Handle* handle) const override {
2104       return target_->GetUsage(handle);
2105     }
2106 
GetPinnedUsage() const2107     virtual size_t GetPinnedUsage() const override {
2108       return target_->GetPinnedUsage();
2109     }
2110 
GetCharge(Handle *) const2111     virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
2112 
ApplyToAllCacheEntries(void (* callback)(void *,size_t),bool thread_safe)2113     virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
2114                                         bool thread_safe) override {
2115       return target_->ApplyToAllCacheEntries(callback, thread_safe);
2116     }
2117 
EraseUnRefEntries()2118     virtual void EraseUnRefEntries() override {
2119       return target_->EraseUnRefEntries();
2120     }
2121 
num_lookups()2122     int num_lookups() { return num_lookups_; }
2123 
num_found()2124     int num_found() { return num_found_; }
2125 
num_inserts()2126     int num_inserts() { return num_inserts_; }
2127 
2128    private:
2129     std::shared_ptr<Cache> target_;
2130     int num_lookups_;
2131     int num_found_;
2132     int num_inserts_;
2133   };
2134 
2135   std::shared_ptr<MyBlockCache> compressed_cache_;
2136   std::shared_ptr<MyBlockCache> uncompressed_cache_;
2137   bool compression_enabled_;
2138   std::vector<std::string> values_;
2139   std::vector<std::string> uncompressable_values_;
2140   bool fill_cache_;
2141 };
2142 
TEST_P(DBBasicTestWithParallelIO,MultiGet)2143 TEST_P(DBBasicTestWithParallelIO, MultiGet) {
2144   std::vector<std::string> key_data(10);
2145   std::vector<Slice> keys;
2146   // We cannot resize a PinnableSlice vector, so just set initial size to
2147   // largest we think we will need
2148   std::vector<PinnableSlice> values(10);
2149   std::vector<Status> statuses;
2150   ReadOptions ro;
2151   ro.fill_cache = fill_cache();
2152 
2153   // Warm up the cache first
2154   key_data.emplace_back(Key(0));
2155   keys.emplace_back(Slice(key_data.back()));
2156   key_data.emplace_back(Key(50));
2157   keys.emplace_back(Slice(key_data.back()));
2158   statuses.resize(keys.size());
2159 
2160   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2161                      keys.data(), values.data(), statuses.data(), true);
2162   ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2163   ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2164 
2165   int random_reads = env_->random_read_counter_.Read();
2166   key_data[0] = Key(1);
2167   key_data[1] = Key(51);
2168   keys[0] = Slice(key_data[0]);
2169   keys[1] = Slice(key_data[1]);
2170   values[0].Reset();
2171   values[1].Reset();
2172   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2173                      keys.data(), values.data(), statuses.data(), true);
2174   ASSERT_TRUE(CheckValue(1, values[0].ToString()));
2175   ASSERT_TRUE(CheckValue(51, values[1].ToString()));
2176 
2177   bool read_from_cache = false;
2178   if (fill_cache()) {
2179     if (has_uncompressed_cache()) {
2180       read_from_cache = true;
2181     } else if (has_compressed_cache() && compression_enabled()) {
2182       read_from_cache = true;
2183     }
2184   }
2185 
2186   int expected_reads = random_reads + (read_from_cache ? 0 : 2);
2187   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2188 
2189   keys.resize(10);
2190   statuses.resize(10);
2191   std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2192   for (size_t i = 0; i < key_ints.size(); ++i) {
2193     key_data[i] = Key(key_ints[i]);
2194     keys[i] = Slice(key_data[i]);
2195     statuses[i] = Status::OK();
2196     values[i].Reset();
2197   }
2198   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2199                      keys.data(), values.data(), statuses.data(), true);
2200   for (size_t i = 0; i < key_ints.size(); ++i) {
2201     ASSERT_OK(statuses[i]);
2202     ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
2203   }
2204   if (compression_enabled() && !has_compressed_cache()) {
2205     expected_reads += (read_from_cache ? 2 : 3);
2206   } else {
2207     expected_reads += (read_from_cache ? 2 : 4);
2208   }
2209   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2210 
2211   keys.resize(10);
2212   statuses.resize(10);
2213   std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
2214   for (size_t i = 0; i < key_uncmp.size(); ++i) {
2215     key_data[i] = "a" + Key(key_uncmp[i]);
2216     keys[i] = Slice(key_data[i]);
2217     statuses[i] = Status::OK();
2218     values[i].Reset();
2219   }
2220   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2221                      keys.data(), values.data(), statuses.data(), true);
2222   for (size_t i = 0; i < key_uncmp.size(); ++i) {
2223     ASSERT_OK(statuses[i]);
2224     ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
2225   }
2226   if (compression_enabled() && !has_compressed_cache()) {
2227     expected_reads += (read_from_cache ? 3 : 3);
2228   } else {
2229     expected_reads += (read_from_cache ? 4 : 4);
2230   }
2231   ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2232 
2233   keys.resize(5);
2234   statuses.resize(5);
2235   std::vector<int> key_tr{1, 2, 15, 16, 55};
2236   for (size_t i = 0; i < key_tr.size(); ++i) {
2237     key_data[i] = "a" + Key(key_tr[i]);
2238     keys[i] = Slice(key_data[i]);
2239     statuses[i] = Status::OK();
2240     values[i].Reset();
2241   }
2242   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2243                      keys.data(), values.data(), statuses.data(), true);
2244   for (size_t i = 0; i < key_tr.size(); ++i) {
2245     ASSERT_OK(statuses[i]);
2246     ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
2247   }
2248   if (compression_enabled() && !has_compressed_cache()) {
2249     expected_reads += (read_from_cache ? 0 : 2);
2250     ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2251   } else {
2252     if (has_uncompressed_cache()) {
2253       expected_reads += (read_from_cache ? 0 : 3);
2254       ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
2255     } else {
2256       // A rare case, even we enable the block compression but some of data
2257       // blocks are not compressed due to content. If user only enable the
2258       // compressed cache, the uncompressed blocks will not tbe cached, and
2259       // block reads will be triggered. The number of reads is related to
2260       // the compression algorithm.
2261       ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
2262     }
2263   }
2264 }
2265 
TEST_P(DBBasicTestWithParallelIO,MultiGetWithChecksumMismatch)2266 TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
2267   std::vector<std::string> key_data(10);
2268   std::vector<Slice> keys;
2269   // We cannot resize a PinnableSlice vector, so just set initial size to
2270   // largest we think we will need
2271   std::vector<PinnableSlice> values(10);
2272   std::vector<Status> statuses;
2273   int read_count = 0;
2274   ReadOptions ro;
2275   ro.fill_cache = fill_cache();
2276 
2277   SyncPoint::GetInstance()->SetCallBack(
2278       "RetrieveMultipleBlocks:VerifyChecksum", [&](void* status) {
2279         Status* s = static_cast<Status*>(status);
2280         read_count++;
2281         if (read_count == 2) {
2282           *s = Status::Corruption();
2283         }
2284       });
2285   SyncPoint::GetInstance()->EnableProcessing();
2286 
2287   // Warm up the cache first
2288   key_data.emplace_back(Key(0));
2289   keys.emplace_back(Slice(key_data.back()));
2290   key_data.emplace_back(Key(50));
2291   keys.emplace_back(Slice(key_data.back()));
2292   statuses.resize(keys.size());
2293 
2294   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2295                      keys.data(), values.data(), statuses.data(), true);
2296   ASSERT_TRUE(CheckValue(0, values[0].ToString()));
2297   // ASSERT_TRUE(CheckValue(50, values[1].ToString()));
2298   ASSERT_EQ(statuses[0], Status::OK());
2299   ASSERT_EQ(statuses[1], Status::Corruption());
2300 
2301   SyncPoint::GetInstance()->DisableProcessing();
2302 }
2303 
TEST_P(DBBasicTestWithParallelIO,MultiGetWithMissingFile)2304 TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
2305   std::vector<std::string> key_data(10);
2306   std::vector<Slice> keys;
2307   // We cannot resize a PinnableSlice vector, so just set initial size to
2308   // largest we think we will need
2309   std::vector<PinnableSlice> values(10);
2310   std::vector<Status> statuses;
2311   ReadOptions ro;
2312   ro.fill_cache = fill_cache();
2313 
2314   SyncPoint::GetInstance()->SetCallBack(
2315       "TableCache::MultiGet:FindTable", [&](void* status) {
2316         Status* s = static_cast<Status*>(status);
2317         *s = Status::IOError();
2318       });
2319   // DB open will create table readers unless we reduce the table cache
2320   // capacity.
2321   // SanitizeOptions will set max_open_files to minimum of 20. Table cache
2322   // is allocated with max_open_files - 10 as capacity. So override
2323   // max_open_files to 11 so table cache capacity will become 1. This will
2324   // prevent file open during DB open and force the file to be opened
2325   // during MultiGet
2326   SyncPoint::GetInstance()->SetCallBack(
2327       "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
2328         int* max_open_files = (int*)arg;
2329         *max_open_files = 11;
2330       });
2331   SyncPoint::GetInstance()->EnableProcessing();
2332 
2333   Reopen(CurrentOptions());
2334 
2335   // Warm up the cache first
2336   key_data.emplace_back(Key(0));
2337   keys.emplace_back(Slice(key_data.back()));
2338   key_data.emplace_back(Key(50));
2339   keys.emplace_back(Slice(key_data.back()));
2340   statuses.resize(keys.size());
2341 
2342   dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
2343                      keys.data(), values.data(), statuses.data(), true);
2344   ASSERT_EQ(statuses[0], Status::IOError());
2345   ASSERT_EQ(statuses[1], Status::IOError());
2346 
2347   SyncPoint::GetInstance()->DisableProcessing();
2348 }
2349 
2350 INSTANTIATE_TEST_CASE_P(ParallelIO, DBBasicTestWithParallelIO,
2351                         // Params are as follows -
2352                         // Param 0 - Compressed cache enabled
2353                         // Param 1 - Uncompressed cache enabled
2354                         // Param 2 - Data compression enabled
2355                         // Param 3 - ReadOptions::fill_cache
2356                         ::testing::Combine(::testing::Bool(), ::testing::Bool(),
2357                                            ::testing::Bool(),
2358                                            ::testing::Bool()));
2359 
2360 
2361 }  // namespace ROCKSDB_NAMESPACE
2362 
2363 #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2364 extern "C" {
2365 void RegisterCustomObjects(int argc, char** argv);
2366 }
2367 #else
RegisterCustomObjects(int,char **)2368 void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
2369 #endif  // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
2370 
main(int argc,char ** argv)2371 int main(int argc, char** argv) {
2372   ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2373   ::testing::InitGoogleTest(&argc, argv);
2374   RegisterCustomObjects(argc, argv);
2375   return RUN_ALL_TESTS();
2376 }
2377