1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include <algorithm>
9 #include <chrono>
10 #include <cstdlib>
11 #include <iomanip>
12 #include <map>
13 #include <memory>
14 #include <sstream>
15 #include <string>
16 #include <vector>
17 
18 #include "db/blob/blob_index.h"
19 #include "db/db_test_util.h"
20 #include "env/composite_env_wrapper.h"
21 #include "file/file_util.h"
22 #include "file/sst_file_manager_impl.h"
23 #include "port/port.h"
24 #include "rocksdb/utilities/debug.h"
25 #include "test_util/fault_injection_test_env.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "util/cast_util.h"
29 #include "util/random.h"
30 #include "util/string_util.h"
31 #include "utilities/blob_db/blob_db.h"
32 #include "utilities/blob_db/blob_db_impl.h"
33 
34 namespace ROCKSDB_NAMESPACE {
35 namespace blob_db {
36 
37 class BlobDBTest : public testing::Test {
38  public:
39   const int kMaxBlobSize = 1 << 14;
40 
41   struct BlobIndexVersion {
42     BlobIndexVersion() = default;
BlobIndexVersionROCKSDB_NAMESPACE::blob_db::BlobDBTest::BlobIndexVersion43     BlobIndexVersion(std::string _user_key, uint64_t _file_number,
44                      uint64_t _expiration, SequenceNumber _sequence,
45                      ValueType _type)
46         : user_key(std::move(_user_key)),
47           file_number(_file_number),
48           expiration(_expiration),
49           sequence(_sequence),
50           type(_type) {}
51 
52     std::string user_key;
53     uint64_t file_number = kInvalidBlobFileNumber;
54     uint64_t expiration = kNoExpiration;
55     SequenceNumber sequence = 0;
56     ValueType type = kTypeValue;
57   };
58 
BlobDBTest()59   BlobDBTest()
60       : dbname_(test::PerThreadDBPath("blob_db_test")),
61         mock_env_(new MockTimeEnv(Env::Default())),
62         fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
63         blob_db_(nullptr) {
64     Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
65     assert(s.ok());
66   }
67 
~BlobDBTest()68   ~BlobDBTest() override {
69     SyncPoint::GetInstance()->ClearAllCallBacks();
70     Destroy();
71   }
72 
TryOpen(BlobDBOptions bdb_options=BlobDBOptions (),Options options=Options ())73   Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
74                  Options options = Options()) {
75     options.create_if_missing = true;
76     return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
77   }
78 
Open(BlobDBOptions bdb_options=BlobDBOptions (),Options options=Options ())79   void Open(BlobDBOptions bdb_options = BlobDBOptions(),
80             Options options = Options()) {
81     ASSERT_OK(TryOpen(bdb_options, options));
82   }
83 
Reopen(BlobDBOptions bdb_options=BlobDBOptions (),Options options=Options ())84   void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
85               Options options = Options()) {
86     assert(blob_db_ != nullptr);
87     delete blob_db_;
88     blob_db_ = nullptr;
89     Open(bdb_options, options);
90   }
91 
Close()92   void Close() {
93     assert(blob_db_ != nullptr);
94     delete blob_db_;
95     blob_db_ = nullptr;
96   }
97 
Destroy()98   void Destroy() {
99     if (blob_db_) {
100       Options options = blob_db_->GetOptions();
101       BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
102       delete blob_db_;
103       blob_db_ = nullptr;
104       ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
105     }
106   }
107 
blob_db_impl()108   BlobDBImpl *blob_db_impl() {
109     return reinterpret_cast<BlobDBImpl *>(blob_db_);
110   }
111 
Put(const Slice & key,const Slice & value,std::map<std::string,std::string> * data=nullptr)112   Status Put(const Slice &key, const Slice &value,
113              std::map<std::string, std::string> *data = nullptr) {
114     Status s = blob_db_->Put(WriteOptions(), key, value);
115     if (data != nullptr) {
116       (*data)[key.ToString()] = value.ToString();
117     }
118     return s;
119   }
120 
Delete(const std::string & key,std::map<std::string,std::string> * data=nullptr)121   void Delete(const std::string &key,
122               std::map<std::string, std::string> *data = nullptr) {
123     ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
124     if (data != nullptr) {
125       data->erase(key);
126     }
127   }
128 
PutWithTTL(const Slice & key,const Slice & value,uint64_t ttl,std::map<std::string,std::string> * data=nullptr)129   Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
130                     std::map<std::string, std::string> *data = nullptr) {
131     Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
132     if (data != nullptr) {
133       (*data)[key.ToString()] = value.ToString();
134     }
135     return s;
136   }
137 
PutUntil(const Slice & key,const Slice & value,uint64_t expiration)138   Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
139     return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
140   }
141 
PutRandomWithTTL(const std::string & key,uint64_t ttl,Random * rnd,std::map<std::string,std::string> * data=nullptr)142   void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
143                         std::map<std::string, std::string> *data = nullptr) {
144     int len = rnd->Next() % kMaxBlobSize + 1;
145     std::string value = test::RandomHumanReadableString(rnd, len);
146     ASSERT_OK(
147         blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
148     if (data != nullptr) {
149       (*data)[key] = value;
150     }
151   }
152 
PutRandomUntil(const std::string & key,uint64_t expiration,Random * rnd,std::map<std::string,std::string> * data=nullptr)153   void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
154                       std::map<std::string, std::string> *data = nullptr) {
155     int len = rnd->Next() % kMaxBlobSize + 1;
156     std::string value = test::RandomHumanReadableString(rnd, len);
157     ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
158                                  expiration));
159     if (data != nullptr) {
160       (*data)[key] = value;
161     }
162   }
163 
PutRandom(const std::string & key,Random * rnd,std::map<std::string,std::string> * data=nullptr)164   void PutRandom(const std::string &key, Random *rnd,
165                  std::map<std::string, std::string> *data = nullptr) {
166     PutRandom(blob_db_, key, rnd, data);
167   }
168 
PutRandom(DB * db,const std::string & key,Random * rnd,std::map<std::string,std::string> * data=nullptr)169   void PutRandom(DB *db, const std::string &key, Random *rnd,
170                  std::map<std::string, std::string> *data = nullptr) {
171     int len = rnd->Next() % kMaxBlobSize + 1;
172     std::string value = test::RandomHumanReadableString(rnd, len);
173     ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
174     if (data != nullptr) {
175       (*data)[key] = value;
176     }
177   }
178 
PutRandomToWriteBatch(const std::string & key,Random * rnd,WriteBatch * batch,std::map<std::string,std::string> * data=nullptr)179   void PutRandomToWriteBatch(
180       const std::string &key, Random *rnd, WriteBatch *batch,
181       std::map<std::string, std::string> *data = nullptr) {
182     int len = rnd->Next() % kMaxBlobSize + 1;
183     std::string value = test::RandomHumanReadableString(rnd, len);
184     ASSERT_OK(batch->Put(key, value));
185     if (data != nullptr) {
186       (*data)[key] = value;
187     }
188   }
189 
190   // Verify blob db contain expected data and nothing more.
VerifyDB(const std::map<std::string,std::string> & data)191   void VerifyDB(const std::map<std::string, std::string> &data) {
192     VerifyDB(blob_db_, data);
193   }
194 
VerifyDB(DB * db,const std::map<std::string,std::string> & data)195   void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
196     // Verify normal Get
197     auto* cfh = db->DefaultColumnFamily();
198     for (auto &p : data) {
199       PinnableSlice value_slice;
200       ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
201       ASSERT_EQ(p.second, value_slice.ToString());
202       std::string value;
203       ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
204       ASSERT_EQ(p.second, value);
205     }
206 
207     // Verify iterators
208     Iterator *iter = db->NewIterator(ReadOptions());
209     iter->SeekToFirst();
210     for (auto &p : data) {
211       ASSERT_TRUE(iter->Valid());
212       ASSERT_EQ(p.first, iter->key().ToString());
213       ASSERT_EQ(p.second, iter->value().ToString());
214       iter->Next();
215     }
216     ASSERT_FALSE(iter->Valid());
217     ASSERT_OK(iter->status());
218     delete iter;
219   }
220 
VerifyBaseDB(const std::map<std::string,KeyVersion> & expected_versions)221   void VerifyBaseDB(
222       const std::map<std::string, KeyVersion> &expected_versions) {
223     auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
224     DB *db = blob_db_->GetRootDB();
225     const size_t kMaxKeys = 10000;
226     std::vector<KeyVersion> versions;
227     GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
228     ASSERT_EQ(expected_versions.size(), versions.size());
229     size_t i = 0;
230     for (auto &key_version : expected_versions) {
231       const KeyVersion &expected_version = key_version.second;
232       ASSERT_EQ(expected_version.user_key, versions[i].user_key);
233       ASSERT_EQ(expected_version.sequence, versions[i].sequence);
234       ASSERT_EQ(expected_version.type, versions[i].type);
235       if (versions[i].type == kTypeValue) {
236         ASSERT_EQ(expected_version.value, versions[i].value);
237       } else {
238         ASSERT_EQ(kTypeBlobIndex, versions[i].type);
239         PinnableSlice value;
240         ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
241                                               versions[i].value, &value));
242         ASSERT_EQ(expected_version.value, value.ToString());
243       }
244       i++;
245     }
246   }
247 
VerifyBaseDBBlobIndex(const std::map<std::string,BlobIndexVersion> & expected_versions)248   void VerifyBaseDBBlobIndex(
249       const std::map<std::string, BlobIndexVersion> &expected_versions) {
250     const size_t kMaxKeys = 10000;
251     std::vector<KeyVersion> versions;
252     ASSERT_OK(
253         GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
254     ASSERT_EQ(versions.size(), expected_versions.size());
255 
256     size_t i = 0;
257     for (const auto &expected_pair : expected_versions) {
258       const BlobIndexVersion &expected_version = expected_pair.second;
259 
260       ASSERT_EQ(versions[i].user_key, expected_version.user_key);
261       ASSERT_EQ(versions[i].sequence, expected_version.sequence);
262       ASSERT_EQ(versions[i].type, expected_version.type);
263       if (versions[i].type != kTypeBlobIndex) {
264         ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
265         ASSERT_EQ(kNoExpiration, expected_version.expiration);
266 
267         ++i;
268         continue;
269       }
270 
271       BlobIndex blob_index;
272       ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
273 
274       const uint64_t file_number = !blob_index.IsInlined()
275                                        ? blob_index.file_number()
276                                        : kInvalidBlobFileNumber;
277       ASSERT_EQ(file_number, expected_version.file_number);
278 
279       const uint64_t expiration =
280           blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
281       ASSERT_EQ(expiration, expected_version.expiration);
282 
283       ++i;
284     }
285   }
286 
InsertBlobs()287   void InsertBlobs() {
288     WriteOptions wo;
289     std::string value;
290 
291     Random rnd(301);
292     for (size_t i = 0; i < 100000; i++) {
293       uint64_t ttl = rnd.Next() % 86400;
294       PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
295     }
296 
297     for (size_t i = 0; i < 10; i++) {
298       Delete("key" + ToString(i % 500));
299     }
300   }
301 
302   const std::string dbname_;
303   std::unique_ptr<MockTimeEnv> mock_env_;
304   std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
305   BlobDB *blob_db_;
306 };  // class BlobDBTest
307 
TEST_F(BlobDBTest,Put)308 TEST_F(BlobDBTest, Put) {
309   Random rnd(301);
310   BlobDBOptions bdb_options;
311   bdb_options.min_blob_size = 0;
312   bdb_options.disable_background_tasks = true;
313   Open(bdb_options);
314   std::map<std::string, std::string> data;
315   for (size_t i = 0; i < 100; i++) {
316     PutRandom("key" + ToString(i), &rnd, &data);
317   }
318   VerifyDB(data);
319 }
320 
TEST_F(BlobDBTest,PutWithTTL)321 TEST_F(BlobDBTest, PutWithTTL) {
322   Random rnd(301);
323   Options options;
324   options.env = mock_env_.get();
325   BlobDBOptions bdb_options;
326   bdb_options.ttl_range_secs = 1000;
327   bdb_options.min_blob_size = 0;
328   bdb_options.blob_file_size = 256 * 1000 * 1000;
329   bdb_options.disable_background_tasks = true;
330   Open(bdb_options, options);
331   std::map<std::string, std::string> data;
332   mock_env_->set_current_time(50);
333   for (size_t i = 0; i < 100; i++) {
334     uint64_t ttl = rnd.Next() % 100;
335     PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
336                      (ttl <= 50 ? nullptr : &data));
337   }
338   mock_env_->set_current_time(100);
339   auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
340   auto blob_files = bdb_impl->TEST_GetBlobFiles();
341   ASSERT_EQ(1, blob_files.size());
342   ASSERT_TRUE(blob_files[0]->HasTTL());
343   ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
344   VerifyDB(data);
345 }
346 
TEST_F(BlobDBTest,PutUntil)347 TEST_F(BlobDBTest, PutUntil) {
348   Random rnd(301);
349   Options options;
350   options.env = mock_env_.get();
351   BlobDBOptions bdb_options;
352   bdb_options.ttl_range_secs = 1000;
353   bdb_options.min_blob_size = 0;
354   bdb_options.blob_file_size = 256 * 1000 * 1000;
355   bdb_options.disable_background_tasks = true;
356   Open(bdb_options, options);
357   std::map<std::string, std::string> data;
358   mock_env_->set_current_time(50);
359   for (size_t i = 0; i < 100; i++) {
360     uint64_t expiration = rnd.Next() % 100 + 50;
361     PutRandomUntil("key" + ToString(i), expiration, &rnd,
362                    (expiration <= 100 ? nullptr : &data));
363   }
364   mock_env_->set_current_time(100);
365   auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
366   auto blob_files = bdb_impl->TEST_GetBlobFiles();
367   ASSERT_EQ(1, blob_files.size());
368   ASSERT_TRUE(blob_files[0]->HasTTL());
369   ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
370   VerifyDB(data);
371 }
372 
TEST_F(BlobDBTest,StackableDBGet)373 TEST_F(BlobDBTest, StackableDBGet) {
374   Random rnd(301);
375   BlobDBOptions bdb_options;
376   bdb_options.min_blob_size = 0;
377   bdb_options.disable_background_tasks = true;
378   Open(bdb_options);
379   std::map<std::string, std::string> data;
380   for (size_t i = 0; i < 100; i++) {
381     PutRandom("key" + ToString(i), &rnd, &data);
382   }
383   for (size_t i = 0; i < 100; i++) {
384     StackableDB *db = blob_db_;
385     ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
386     std::string key = "key" + ToString(i);
387     PinnableSlice pinnable_value;
388     ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
389     std::string string_value;
390     ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
391     ASSERT_EQ(string_value, pinnable_value.ToString());
392     ASSERT_EQ(string_value, data[key]);
393   }
394 }
395 
TEST_F(BlobDBTest,GetExpiration)396 TEST_F(BlobDBTest, GetExpiration) {
397   Options options;
398   options.env = mock_env_.get();
399   BlobDBOptions bdb_options;
400   bdb_options.disable_background_tasks = true;
401   mock_env_->set_current_time(100);
402   Open(bdb_options, options);
403   Put("key1", "value1");
404   PutWithTTL("key2", "value2", 200);
405   PinnableSlice value;
406   uint64_t expiration;
407   ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
408   ASSERT_EQ("value1", value.ToString());
409   ASSERT_EQ(kNoExpiration, expiration);
410   ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
411   ASSERT_EQ("value2", value.ToString());
412   ASSERT_EQ(300 /* = 100 + 200 */, expiration);
413 }
414 
TEST_F(BlobDBTest,GetIOError)415 TEST_F(BlobDBTest, GetIOError) {
416   Options options;
417   options.env = fault_injection_env_.get();
418   BlobDBOptions bdb_options;
419   bdb_options.min_blob_size = 0;  // Make sure value write to blob file
420   bdb_options.disable_background_tasks = true;
421   Open(bdb_options, options);
422   ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
423   PinnableSlice value;
424   ASSERT_OK(Put("foo", "bar"));
425   fault_injection_env_->SetFilesystemActive(false, Status::IOError());
426   Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
427   ASSERT_TRUE(s.IsIOError());
428   // Reactivate file system to allow test to close DB.
429   fault_injection_env_->SetFilesystemActive(true);
430 }
431 
TEST_F(BlobDBTest,PutIOError)432 TEST_F(BlobDBTest, PutIOError) {
433   Options options;
434   options.env = fault_injection_env_.get();
435   BlobDBOptions bdb_options;
436   bdb_options.min_blob_size = 0;  // Make sure value write to blob file
437   bdb_options.disable_background_tasks = true;
438   Open(bdb_options, options);
439   fault_injection_env_->SetFilesystemActive(false, Status::IOError());
440   ASSERT_TRUE(Put("foo", "v1").IsIOError());
441   fault_injection_env_->SetFilesystemActive(true, Status::IOError());
442   ASSERT_OK(Put("bar", "v1"));
443 }
444 
TEST_F(BlobDBTest,WriteBatch)445 TEST_F(BlobDBTest, WriteBatch) {
446   Random rnd(301);
447   BlobDBOptions bdb_options;
448   bdb_options.min_blob_size = 0;
449   bdb_options.disable_background_tasks = true;
450   Open(bdb_options);
451   std::map<std::string, std::string> data;
452   for (size_t i = 0; i < 100; i++) {
453     WriteBatch batch;
454     for (size_t j = 0; j < 10; j++) {
455       PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data);
456     }
457     blob_db_->Write(WriteOptions(), &batch);
458   }
459   VerifyDB(data);
460 }
461 
TEST_F(BlobDBTest,Delete)462 TEST_F(BlobDBTest, Delete) {
463   Random rnd(301);
464   BlobDBOptions bdb_options;
465   bdb_options.min_blob_size = 0;
466   bdb_options.disable_background_tasks = true;
467   Open(bdb_options);
468   std::map<std::string, std::string> data;
469   for (size_t i = 0; i < 100; i++) {
470     PutRandom("key" + ToString(i), &rnd, &data);
471   }
472   for (size_t i = 0; i < 100; i += 5) {
473     Delete("key" + ToString(i), &data);
474   }
475   VerifyDB(data);
476 }
477 
TEST_F(BlobDBTest,DeleteBatch)478 TEST_F(BlobDBTest, DeleteBatch) {
479   Random rnd(301);
480   BlobDBOptions bdb_options;
481   bdb_options.min_blob_size = 0;
482   bdb_options.disable_background_tasks = true;
483   Open(bdb_options);
484   for (size_t i = 0; i < 100; i++) {
485     PutRandom("key" + ToString(i), &rnd);
486   }
487   WriteBatch batch;
488   for (size_t i = 0; i < 100; i++) {
489     batch.Delete("key" + ToString(i));
490   }
491   ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
492   // DB should be empty.
493   VerifyDB({});
494 }
495 
TEST_F(BlobDBTest,Override)496 TEST_F(BlobDBTest, Override) {
497   Random rnd(301);
498   BlobDBOptions bdb_options;
499   bdb_options.min_blob_size = 0;
500   bdb_options.disable_background_tasks = true;
501   Open(bdb_options);
502   std::map<std::string, std::string> data;
503   for (int i = 0; i < 10000; i++) {
504     PutRandom("key" + ToString(i), &rnd, nullptr);
505   }
506   // override all the keys
507   for (int i = 0; i < 10000; i++) {
508     PutRandom("key" + ToString(i), &rnd, &data);
509   }
510   VerifyDB(data);
511 }
512 
513 #ifdef SNAPPY
TEST_F(BlobDBTest,Compression)514 TEST_F(BlobDBTest, Compression) {
515   Random rnd(301);
516   BlobDBOptions bdb_options;
517   bdb_options.min_blob_size = 0;
518   bdb_options.disable_background_tasks = true;
519   bdb_options.compression = CompressionType::kSnappyCompression;
520   Open(bdb_options);
521   std::map<std::string, std::string> data;
522   for (size_t i = 0; i < 100; i++) {
523     PutRandom("put-key" + ToString(i), &rnd, &data);
524   }
525   for (int i = 0; i < 100; i++) {
526     WriteBatch batch;
527     for (size_t j = 0; j < 10; j++) {
528       PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd,
529                             &batch, &data);
530     }
531     blob_db_->Write(WriteOptions(), &batch);
532   }
533   VerifyDB(data);
534 }
535 
TEST_F(BlobDBTest,DecompressAfterReopen)536 TEST_F(BlobDBTest, DecompressAfterReopen) {
537   Random rnd(301);
538   BlobDBOptions bdb_options;
539   bdb_options.min_blob_size = 0;
540   bdb_options.disable_background_tasks = true;
541   bdb_options.compression = CompressionType::kSnappyCompression;
542   Open(bdb_options);
543   std::map<std::string, std::string> data;
544   for (size_t i = 0; i < 100; i++) {
545     PutRandom("put-key" + ToString(i), &rnd, &data);
546   }
547   VerifyDB(data);
548   bdb_options.compression = CompressionType::kNoCompression;
549   Reopen(bdb_options);
550   VerifyDB(data);
551 }
552 #endif
553 
TEST_F(BlobDBTest,MultipleWriters)554 TEST_F(BlobDBTest, MultipleWriters) {
555   Open(BlobDBOptions());
556 
557   std::vector<port::Thread> workers;
558   std::vector<std::map<std::string, std::string>> data_set(10);
559   for (uint32_t i = 0; i < 10; i++)
560     workers.push_back(port::Thread(
561         [&](uint32_t id) {
562           Random rnd(301 + id);
563           for (int j = 0; j < 100; j++) {
564             std::string key = "key" + ToString(id) + "_" + ToString(j);
565             if (id < 5) {
566               PutRandom(key, &rnd, &data_set[id]);
567             } else {
568               WriteBatch batch;
569               PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
570               blob_db_->Write(WriteOptions(), &batch);
571             }
572           }
573         },
574         i));
575   std::map<std::string, std::string> data;
576   for (size_t i = 0; i < 10; i++) {
577     workers[i].join();
578     data.insert(data_set[i].begin(), data_set[i].end());
579   }
580   VerifyDB(data);
581 }
582 
TEST_F(BlobDBTest,SstFileManager)583 TEST_F(BlobDBTest, SstFileManager) {
584   // run the same test for Get(), MultiGet() and Iterator each.
585   std::shared_ptr<SstFileManager> sst_file_manager(
586       NewSstFileManager(mock_env_.get()));
587   sst_file_manager->SetDeleteRateBytesPerSecond(1);
588   SstFileManagerImpl *sfm =
589       static_cast<SstFileManagerImpl *>(sst_file_manager.get());
590 
591   BlobDBOptions bdb_options;
592   bdb_options.min_blob_size = 0;
593   bdb_options.enable_garbage_collection = true;
594   bdb_options.garbage_collection_cutoff = 1.0;
595   Options db_options;
596 
597   int files_scheduled_to_delete = 0;
598   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
599       "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
600         assert(arg);
601         const std::string *const file_path =
602             static_cast<const std::string *>(arg);
603         if (file_path->find(".blob") != std::string::npos) {
604           ++files_scheduled_to_delete;
605         }
606       });
607   SyncPoint::GetInstance()->EnableProcessing();
608   db_options.sst_file_manager = sst_file_manager;
609 
610   Open(bdb_options, db_options);
611 
612   // Create one obselete file and clean it.
613   blob_db_->Put(WriteOptions(), "foo", "bar");
614   auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
615   ASSERT_EQ(1, blob_files.size());
616   std::shared_ptr<BlobFile> bfile = blob_files[0];
617   ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
618   ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
619   blob_db_impl()->TEST_DeleteObsoleteFiles();
620 
621   // Even if SSTFileManager is not set, DB is creating a dummy one.
622   ASSERT_EQ(1, files_scheduled_to_delete);
623   Destroy();
624   // Make sure that DestroyBlobDB() also goes through delete scheduler.
625   ASSERT_EQ(2, files_scheduled_to_delete);
626   SyncPoint::GetInstance()->DisableProcessing();
627   sfm->WaitForEmptyTrash();
628 }
629 
TEST_F(BlobDBTest,SstFileManagerRestart)630 TEST_F(BlobDBTest, SstFileManagerRestart) {
631   int files_scheduled_to_delete = 0;
632   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
633       "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
634         assert(arg);
635         const std::string *const file_path =
636             static_cast<const std::string *>(arg);
637         if (file_path->find(".blob") != std::string::npos) {
638           ++files_scheduled_to_delete;
639         }
640       });
641 
642   // run the same test for Get(), MultiGet() and Iterator each.
643   std::shared_ptr<SstFileManager> sst_file_manager(
644       NewSstFileManager(mock_env_.get()));
645   sst_file_manager->SetDeleteRateBytesPerSecond(1);
646   SstFileManagerImpl *sfm =
647       static_cast<SstFileManagerImpl *>(sst_file_manager.get());
648 
649   BlobDBOptions bdb_options;
650   bdb_options.min_blob_size = 0;
651   Options db_options;
652 
653   SyncPoint::GetInstance()->EnableProcessing();
654   db_options.sst_file_manager = sst_file_manager;
655 
656   Open(bdb_options, db_options);
657   std::string blob_dir = blob_db_impl()->TEST_blob_dir();
658   blob_db_->Put(WriteOptions(), "foo", "bar");
659   Close();
660 
661   // Create 3 dummy trash files under the blob_dir
662   LegacyFileSystemWrapper fs(db_options.env);
663   CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
664   CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
665   CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
666 
667   // Make sure that reopening the DB rescan the existing trash files
668   Open(bdb_options, db_options);
669   ASSERT_EQ(files_scheduled_to_delete, 2);
670 
671   sfm->WaitForEmptyTrash();
672 
673   // There should be exact one file under the blob dir now.
674   std::vector<std::string> all_files;
675   ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
676   int nfiles = 0;
677   for (const auto &f : all_files) {
678     assert(!f.empty());
679     if (f[0] == '.') {
680       continue;
681     }
682     nfiles++;
683   }
684   ASSERT_EQ(nfiles, 1);
685 
686   SyncPoint::GetInstance()->DisableProcessing();
687 }
688 
TEST_F(BlobDBTest,SnapshotAndGarbageCollection)689 TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
690   BlobDBOptions bdb_options;
691   bdb_options.min_blob_size = 0;
692   bdb_options.enable_garbage_collection = true;
693   bdb_options.garbage_collection_cutoff = 1.0;
694   bdb_options.disable_background_tasks = true;
695 
696   // i = when to take snapshot
697   for (int i = 0; i < 4; i++) {
698     Destroy();
699     Open(bdb_options);
700 
701     const Snapshot *snapshot = nullptr;
702 
703     // First file
704     ASSERT_OK(Put("key1", "value"));
705     if (i == 0) {
706       snapshot = blob_db_->GetSnapshot();
707     }
708 
709     auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
710     ASSERT_EQ(1, blob_files.size());
711     ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
712 
713     // Second file
714     ASSERT_OK(Put("key2", "value"));
715     if (i == 1) {
716       snapshot = blob_db_->GetSnapshot();
717     }
718 
719     blob_files = blob_db_impl()->TEST_GetBlobFiles();
720     ASSERT_EQ(2, blob_files.size());
721     auto bfile = blob_files[1];
722     ASSERT_FALSE(bfile->Immutable());
723     ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
724 
725     // Third file
726     ASSERT_OK(Put("key3", "value"));
727     if (i == 2) {
728       snapshot = blob_db_->GetSnapshot();
729     }
730 
731     ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
732     ASSERT_TRUE(bfile->Obsolete());
733     ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
734               bfile->GetObsoleteSequence());
735 
736     Delete("key2");
737     if (i == 3) {
738       snapshot = blob_db_->GetSnapshot();
739     }
740 
741     ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
742     blob_db_impl()->TEST_DeleteObsoleteFiles();
743 
744     if (i >= 2) {
745       // The snapshot shouldn't see data in bfile
746       ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
747       blob_db_->ReleaseSnapshot(snapshot);
748     } else {
749       // The snapshot will see data in bfile, so the file shouldn't be deleted
750       ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
751       blob_db_->ReleaseSnapshot(snapshot);
752       blob_db_impl()->TEST_DeleteObsoleteFiles();
753       ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
754     }
755   }
756 }
757 
TEST_F(BlobDBTest,ColumnFamilyNotSupported)758 TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
759   Options options;
760   options.env = mock_env_.get();
761   mock_env_->set_current_time(0);
762   Open(BlobDBOptions(), options);
763   ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
764   ColumnFamilyHandle *handle = nullptr;
765   std::string value;
766   std::vector<std::string> values;
767   // The call simply pass through to base db. It should succeed.
768   ASSERT_OK(
769       blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
770   ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
771   ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
772                   .IsNotSupported());
773   ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
774                   .IsNotSupported());
775   WriteBatch batch;
776   batch.Put("k1", "v1");
777   batch.Put(handle, "k2", "v2");
778   ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
779   ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
780   ASSERT_TRUE(
781       blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
782   auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
783                                      {"k1", "k2"}, &values);
784   ASSERT_EQ(2, statuses.size());
785   ASSERT_TRUE(statuses[0].IsNotSupported());
786   ASSERT_TRUE(statuses[1].IsNotSupported());
787   ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
788   delete handle;
789 }
790 
TEST_F(BlobDBTest,GetLiveFilesMetaData)791 TEST_F(BlobDBTest, GetLiveFilesMetaData) {
792   Random rnd(301);
793   BlobDBOptions bdb_options;
794   bdb_options.blob_dir = "blob_dir";
795   bdb_options.path_relative = true;
796   bdb_options.min_blob_size = 0;
797   bdb_options.disable_background_tasks = true;
798   Open(bdb_options);
799   std::map<std::string, std::string> data;
800   for (size_t i = 0; i < 100; i++) {
801     PutRandom("key" + ToString(i), &rnd, &data);
802   }
803   std::vector<LiveFileMetaData> metadata;
804   blob_db_->GetLiveFilesMetaData(&metadata);
805   ASSERT_EQ(1U, metadata.size());
806   // Path should be relative to db_name, but begin with slash.
807   std::string filename = "/blob_dir/000001.blob";
808   ASSERT_EQ(filename, metadata[0].name);
809   ASSERT_EQ(1, metadata[0].file_number);
810   ASSERT_EQ("default", metadata[0].column_family_name);
811   std::vector<std::string> livefile;
812   uint64_t mfs;
813   ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
814   ASSERT_EQ(4U, livefile.size());
815   ASSERT_EQ(filename, livefile[3]);
816   VerifyDB(data);
817 }
818 
TEST_F(BlobDBTest,MigrateFromPlainRocksDB)819 TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
820   constexpr size_t kNumKey = 20;
821   constexpr size_t kNumIteration = 10;
822   Random rnd(301);
823   std::map<std::string, std::string> data;
824   std::vector<bool> is_blob(kNumKey, false);
825 
826   // Write to plain rocksdb.
827   Options options;
828   options.create_if_missing = true;
829   DB *db = nullptr;
830   ASSERT_OK(DB::Open(options, dbname_, &db));
831   for (size_t i = 0; i < kNumIteration; i++) {
832     auto key_index = rnd.Next() % kNumKey;
833     std::string key = "key" + ToString(key_index);
834     PutRandom(db, key, &rnd, &data);
835   }
836   VerifyDB(db, data);
837   delete db;
838   db = nullptr;
839 
840   // Open as blob db. Verify it can read existing data.
841   Open();
842   VerifyDB(blob_db_, data);
843   for (size_t i = 0; i < kNumIteration; i++) {
844     auto key_index = rnd.Next() % kNumKey;
845     std::string key = "key" + ToString(key_index);
846     is_blob[key_index] = true;
847     PutRandom(blob_db_, key, &rnd, &data);
848   }
849   VerifyDB(blob_db_, data);
850   delete blob_db_;
851   blob_db_ = nullptr;
852 
853   // Verify plain db return error for keys written by blob db.
854   ASSERT_OK(DB::Open(options, dbname_, &db));
855   std::string value;
856   for (size_t i = 0; i < kNumKey; i++) {
857     std::string key = "key" + ToString(i);
858     Status s = db->Get(ReadOptions(), key, &value);
859     if (data.count(key) == 0) {
860       ASSERT_TRUE(s.IsNotFound());
861     } else if (is_blob[i]) {
862       ASSERT_TRUE(s.IsNotSupported());
863     } else {
864       ASSERT_OK(s);
865       ASSERT_EQ(data[key], value);
866     }
867   }
868   delete db;
869 }
870 
871 // Test to verify that a NoSpace IOError Status is returned on reaching
872 // max_db_size limit.
TEST_F(BlobDBTest,OutOfSpace)873 TEST_F(BlobDBTest, OutOfSpace) {
874   // Use mock env to stop wall clock.
875   Options options;
876   options.env = mock_env_.get();
877   BlobDBOptions bdb_options;
878   bdb_options.max_db_size = 200;
879   bdb_options.is_fifo = false;
880   bdb_options.disable_background_tasks = true;
881   Open(bdb_options);
882 
883   // Each stored blob has an overhead of about 42 bytes currently.
884   // So a small key + a 100 byte blob should take up ~150 bytes in the db.
885   std::string value(100, 'v');
886   ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
887 
888   // Putting another blob should fail as ading it would exceed the max_db_size
889   // limit.
890   Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
891   ASSERT_TRUE(s.IsIOError());
892   ASSERT_TRUE(s.IsNoSpace());
893 }
894 
TEST_F(BlobDBTest,FIFOEviction)895 TEST_F(BlobDBTest, FIFOEviction) {
896   BlobDBOptions bdb_options;
897   bdb_options.max_db_size = 200;
898   bdb_options.blob_file_size = 100;
899   bdb_options.is_fifo = true;
900   bdb_options.disable_background_tasks = true;
901   Open(bdb_options);
902 
903   std::atomic<int> evict_count{0};
904   SyncPoint::GetInstance()->SetCallBack(
905       "BlobDBImpl::EvictOldestBlobFile:Evicted",
906       [&](void *) { evict_count++; });
907   SyncPoint::GetInstance()->EnableProcessing();
908 
909   // Each stored blob has an overhead of 32 bytes currently.
910   // So a 100 byte blob should take up 132 bytes.
911   std::string value(100, 'v');
912   ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
913   VerifyDB({{"key1", value}});
914 
915   ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
916 
917   // Adding another 100 bytes blob would take the total size to 264 bytes
918   // (2*132). max_db_size will be exceeded
919   // than max_db_size and trigger FIFO eviction.
920   ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
921   ASSERT_EQ(1, evict_count);
922   // key1 will exist until corresponding file be deleted.
923   VerifyDB({{"key1", value}, {"key2", value}});
924 
925   // Adding another 100 bytes blob without TTL.
926   ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
927   ASSERT_EQ(2, evict_count);
928   // key1 and key2 will exist until corresponding file be deleted.
929   VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
930 
931   // The fourth blob file, without TTL.
932   ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
933   ASSERT_EQ(3, evict_count);
934   VerifyDB(
935       {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
936 
937   auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
938   ASSERT_EQ(4, blob_files.size());
939   ASSERT_TRUE(blob_files[0]->Obsolete());
940   ASSERT_TRUE(blob_files[1]->Obsolete());
941   ASSERT_TRUE(blob_files[2]->Obsolete());
942   ASSERT_FALSE(blob_files[3]->Obsolete());
943   auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
944   ASSERT_EQ(3, obsolete_files.size());
945   ASSERT_EQ(blob_files[0], obsolete_files[0]);
946   ASSERT_EQ(blob_files[1], obsolete_files[1]);
947   ASSERT_EQ(blob_files[2], obsolete_files[2]);
948 
949   blob_db_impl()->TEST_DeleteObsoleteFiles();
950   obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
951   ASSERT_TRUE(obsolete_files.empty());
952   VerifyDB({{"key4", value}});
953 }
954 
TEST_F(BlobDBTest,FIFOEviction_NoOldestFileToEvict)955 TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
956   Options options;
957   BlobDBOptions bdb_options;
958   bdb_options.max_db_size = 1000;
959   bdb_options.blob_file_size = 5000;
960   bdb_options.is_fifo = true;
961   bdb_options.disable_background_tasks = true;
962   Open(bdb_options);
963 
964   std::atomic<int> evict_count{0};
965   SyncPoint::GetInstance()->SetCallBack(
966       "BlobDBImpl::EvictOldestBlobFile:Evicted",
967       [&](void *) { evict_count++; });
968   SyncPoint::GetInstance()->EnableProcessing();
969 
970   std::string value(2000, 'v');
971   ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
972   ASSERT_EQ(0, evict_count);
973 }
974 
TEST_F(BlobDBTest,FIFOEviction_NoEnoughBlobFilesToEvict)975 TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
976   BlobDBOptions bdb_options;
977   bdb_options.is_fifo = true;
978   bdb_options.min_blob_size = 100;
979   bdb_options.disable_background_tasks = true;
980   Options options;
981   // Use mock env to stop wall clock.
982   options.env = mock_env_.get();
983   options.disable_auto_compactions = true;
984   auto statistics = CreateDBStatistics();
985   options.statistics = statistics;
986   Open(bdb_options, options);
987 
988   ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
989   std::string small_value(50, 'v');
990   std::map<std::string, std::string> data;
991   // Insert some data into LSM tree to make sure FIFO eviction take SST
992   // file size into account.
993   for (int i = 0; i < 1000; i++) {
994     ASSERT_OK(Put("key" + ToString(i), small_value, &data));
995   }
996   ASSERT_OK(blob_db_->Flush(FlushOptions()));
997   uint64_t live_sst_size = 0;
998   ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
999                                        &live_sst_size));
1000   ASSERT_TRUE(live_sst_size > 0);
1001   ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1002 
1003   bdb_options.max_db_size = live_sst_size + 2000;
1004   Reopen(bdb_options, options);
1005   ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1006 
1007   std::string value_1k(1000, 'v');
1008   ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
1009   ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1010   VerifyDB(data);
1011   // large_key2 evicts large_key1
1012   ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
1013   ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1014   blob_db_impl()->TEST_DeleteObsoleteFiles();
1015   data.erase("large_key1");
1016   VerifyDB(data);
1017   // large_key3 get no enough space even after evicting large_key2, so it
1018   // instead return no space error.
1019   std::string value_2k(2000, 'v');
1020   ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
1021   ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1022   // Verify large_key2 still exists.
1023   VerifyDB(data);
1024 }
1025 
1026 // Test flush or compaction will trigger FIFO eviction since they update
1027 // total SST file size.
TEST_F(BlobDBTest,FIFOEviction_TriggerOnSSTSizeChange)1028 TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
1029   BlobDBOptions bdb_options;
1030   bdb_options.max_db_size = 1000;
1031   bdb_options.is_fifo = true;
1032   bdb_options.min_blob_size = 100;
1033   bdb_options.disable_background_tasks = true;
1034   Options options;
1035   // Use mock env to stop wall clock.
1036   options.env = mock_env_.get();
1037   auto statistics = CreateDBStatistics();
1038   options.statistics = statistics;
1039   options.compression = kNoCompression;
1040   Open(bdb_options, options);
1041 
1042   std::string value(800, 'v');
1043   ASSERT_OK(PutWithTTL("large_key", value, 60));
1044   ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1045   ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1046   VerifyDB({{"large_key", value}});
1047 
1048   // Insert some small keys and flush to bring DB out of space.
1049   std::map<std::string, std::string> data;
1050   for (int i = 0; i < 10; i++) {
1051     ASSERT_OK(Put("key" + ToString(i), "v", &data));
1052   }
1053   ASSERT_OK(blob_db_->Flush(FlushOptions()));
1054 
1055   // Verify large_key is deleted by FIFO eviction.
1056   blob_db_impl()->TEST_DeleteObsoleteFiles();
1057   ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1058   ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1059   VerifyDB(data);
1060 }
1061 
TEST_F(BlobDBTest,InlineSmallValues)1062 TEST_F(BlobDBTest, InlineSmallValues) {
1063   constexpr uint64_t kMaxExpiration = 1000;
1064   Random rnd(301);
1065   BlobDBOptions bdb_options;
1066   bdb_options.ttl_range_secs = kMaxExpiration;
1067   bdb_options.min_blob_size = 100;
1068   bdb_options.blob_file_size = 256 * 1000 * 1000;
1069   bdb_options.disable_background_tasks = true;
1070   Options options;
1071   options.env = mock_env_.get();
1072   mock_env_->set_current_time(0);
1073   Open(bdb_options, options);
1074   std::map<std::string, std::string> data;
1075   std::map<std::string, KeyVersion> versions;
1076   for (size_t i = 0; i < 1000; i++) {
1077     bool is_small_value = rnd.Next() % 2;
1078     bool has_ttl = rnd.Next() % 2;
1079     uint64_t expiration = rnd.Next() % kMaxExpiration;
1080     int len = is_small_value ? 50 : 200;
1081     std::string key = "key" + ToString(i);
1082     std::string value = test::RandomHumanReadableString(&rnd, len);
1083     std::string blob_index;
1084     data[key] = value;
1085     SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1086     if (!has_ttl) {
1087       ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
1088     } else {
1089       ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
1090     }
1091     ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1092     versions[key] =
1093         KeyVersion(key, value, sequence,
1094                    (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
1095   }
1096   VerifyDB(data);
1097   VerifyBaseDB(versions);
1098   auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
1099   auto blob_files = bdb_impl->TEST_GetBlobFiles();
1100   ASSERT_EQ(2, blob_files.size());
1101   std::shared_ptr<BlobFile> non_ttl_file;
1102   std::shared_ptr<BlobFile> ttl_file;
1103   if (blob_files[0]->HasTTL()) {
1104     ttl_file = blob_files[0];
1105     non_ttl_file = blob_files[1];
1106   } else {
1107     non_ttl_file = blob_files[0];
1108     ttl_file = blob_files[1];
1109   }
1110   ASSERT_FALSE(non_ttl_file->HasTTL());
1111   ASSERT_TRUE(ttl_file->HasTTL());
1112 }
1113 
TEST_F(BlobDBTest,CompactionFilterNotSupported)1114 TEST_F(BlobDBTest, CompactionFilterNotSupported) {
1115   class TestCompactionFilter : public CompactionFilter {
1116     const char *Name() const override { return "TestCompactionFilter"; }
1117   };
1118   class TestCompactionFilterFactory : public CompactionFilterFactory {
1119     const char *Name() const override { return "TestCompactionFilterFactory"; }
1120     std::unique_ptr<CompactionFilter> CreateCompactionFilter(
1121         const CompactionFilter::Context & /*context*/) override {
1122       return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
1123     }
1124   };
1125   for (int i = 0; i < 2; i++) {
1126     Options options;
1127     if (i == 0) {
1128       options.compaction_filter = new TestCompactionFilter();
1129     } else {
1130       options.compaction_filter_factory.reset(
1131           new TestCompactionFilterFactory());
1132     }
1133     ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
1134     delete options.compaction_filter;
1135   }
1136 }
1137 
1138 // Test comapction filter should remove any expired blob index.
TEST_F(BlobDBTest,FilterExpiredBlobIndex)1139 TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
1140   constexpr size_t kNumKeys = 100;
1141   constexpr size_t kNumPuts = 1000;
1142   constexpr uint64_t kMaxExpiration = 1000;
1143   constexpr uint64_t kCompactTime = 500;
1144   constexpr uint64_t kMinBlobSize = 100;
1145   Random rnd(301);
1146   mock_env_->set_current_time(0);
1147   BlobDBOptions bdb_options;
1148   bdb_options.min_blob_size = kMinBlobSize;
1149   bdb_options.disable_background_tasks = true;
1150   Options options;
1151   options.env = mock_env_.get();
1152   Open(bdb_options, options);
1153 
1154   std::map<std::string, std::string> data;
1155   std::map<std::string, std::string> data_after_compact;
1156   for (size_t i = 0; i < kNumPuts; i++) {
1157     bool is_small_value = rnd.Next() % 2;
1158     bool has_ttl = rnd.Next() % 2;
1159     uint64_t expiration = rnd.Next() % kMaxExpiration;
1160     int len = is_small_value ? 10 : 200;
1161     std::string key = "key" + ToString(rnd.Next() % kNumKeys);
1162     std::string value = test::RandomHumanReadableString(&rnd, len);
1163     if (!has_ttl) {
1164       if (is_small_value) {
1165         std::string blob_entry;
1166         BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
1167         // Fake blob index with TTL. See what it will do.
1168         ASSERT_GT(kMinBlobSize, blob_entry.size());
1169         value = blob_entry;
1170       }
1171       ASSERT_OK(Put(key, value));
1172       data_after_compact[key] = value;
1173     } else {
1174       ASSERT_OK(PutUntil(key, value, expiration));
1175       if (expiration <= kCompactTime) {
1176         data_after_compact.erase(key);
1177       } else {
1178         data_after_compact[key] = value;
1179       }
1180     }
1181     data[key] = value;
1182   }
1183   VerifyDB(data);
1184 
1185   mock_env_->set_current_time(kCompactTime);
1186   // Take a snapshot before compaction. Make sure expired blob indexes is
1187   // filtered regardless of snapshot.
1188   const Snapshot *snapshot = blob_db_->GetSnapshot();
1189   // Issue manual compaction to trigger compaction filter.
1190   ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1191   blob_db_->ReleaseSnapshot(snapshot);
1192   // Verify expired blob index are filtered.
1193   std::vector<KeyVersion> versions;
1194   const size_t kMaxKeys = 10000;
1195   GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
1196   ASSERT_EQ(data_after_compact.size(), versions.size());
1197   for (auto &version : versions) {
1198     ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
1199   }
1200   VerifyDB(data_after_compact);
1201 }
1202 
1203 // Test compaction filter should remove any blob index where corresponding
1204 // blob file has been removed.
TEST_F(BlobDBTest,FilterFileNotAvailable)1205 TEST_F(BlobDBTest, FilterFileNotAvailable) {
1206   BlobDBOptions bdb_options;
1207   bdb_options.min_blob_size = 0;
1208   bdb_options.disable_background_tasks = true;
1209   Options options;
1210   options.disable_auto_compactions = true;
1211   Open(bdb_options, options);
1212 
1213   ASSERT_OK(Put("foo", "v1"));
1214   auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1215   ASSERT_EQ(1, blob_files.size());
1216   ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
1217   ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
1218 
1219   ASSERT_OK(Put("bar", "v2"));
1220   blob_files = blob_db_impl()->TEST_GetBlobFiles();
1221   ASSERT_EQ(2, blob_files.size());
1222   ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
1223   ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
1224 
1225   const size_t kMaxKeys = 10000;
1226 
1227   DB *base_db = blob_db_->GetRootDB();
1228   std::vector<KeyVersion> versions;
1229   ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1230   ASSERT_EQ(2, versions.size());
1231   ASSERT_EQ("bar", versions[0].user_key);
1232   ASSERT_EQ("foo", versions[1].user_key);
1233   VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1234 
1235   ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1236   ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1237   ASSERT_EQ(2, versions.size());
1238   ASSERT_EQ("bar", versions[0].user_key);
1239   ASSERT_EQ("foo", versions[1].user_key);
1240   VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1241 
1242   // Remove the first blob file and compact. foo should be remove from base db.
1243   blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
1244   blob_db_impl()->TEST_DeleteObsoleteFiles();
1245   ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1246   ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1247   ASSERT_EQ(1, versions.size());
1248   ASSERT_EQ("bar", versions[0].user_key);
1249   VerifyDB({{"bar", "v2"}});
1250 
1251   // Remove the second blob file and compact. bar should be remove from base db.
1252   blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
1253   blob_db_impl()->TEST_DeleteObsoleteFiles();
1254   ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1255   ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1256   ASSERT_EQ(0, versions.size());
1257   VerifyDB({});
1258 }
1259 
1260 // Test compaction filter should filter any inlined TTL keys that would have
1261 // been dropped by last FIFO eviction if they are store out-of-line.
TEST_F(BlobDBTest,FilterForFIFOEviction)1262 TEST_F(BlobDBTest, FilterForFIFOEviction) {
1263   Random rnd(215);
1264   BlobDBOptions bdb_options;
1265   bdb_options.min_blob_size = 100;
1266   bdb_options.ttl_range_secs = 60;
1267   bdb_options.max_db_size = 0;
1268   bdb_options.disable_background_tasks = true;
1269   Options options;
1270   // Use mock env to stop wall clock.
1271   mock_env_->set_current_time(0);
1272   options.env = mock_env_.get();
1273   auto statistics = CreateDBStatistics();
1274   options.statistics = statistics;
1275   options.disable_auto_compactions = true;
1276   Open(bdb_options, options);
1277 
1278   std::map<std::string, std::string> data;
1279   std::map<std::string, std::string> data_after_compact;
1280   // Insert some small values that will be inlined.
1281   for (int i = 0; i < 1000; i++) {
1282     std::string key = "key" + ToString(i);
1283     std::string value = test::RandomHumanReadableString(&rnd, 50);
1284     uint64_t ttl = rnd.Next() % 120 + 1;
1285     ASSERT_OK(PutWithTTL(key, value, ttl, &data));
1286     if (ttl >= 60) {
1287       data_after_compact[key] = value;
1288     }
1289   }
1290   uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
1291   ASSERT_OK(blob_db_->Flush(FlushOptions()));
1292   uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
1293   ASSERT_GT(live_sst_size, 0);
1294   VerifyDB(data);
1295 
1296   bdb_options.max_db_size = live_sst_size + 30000;
1297   bdb_options.is_fifo = true;
1298   Reopen(bdb_options, options);
1299   VerifyDB(data);
1300 
1301   // Put two large values, each on a different blob file.
1302   std::string large_value(10000, 'v');
1303   ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
1304   ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
1305   ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1306   ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1307   data["large_key1"] = large_value;
1308   data["large_key2"] = large_value;
1309   VerifyDB(data);
1310 
1311   // Put a third large value which will bring the DB out of space.
1312   // FIFO eviction will evict the file of large_key1.
1313   ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
1314   ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1315   ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1316   blob_db_impl()->TEST_DeleteObsoleteFiles();
1317   ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1318   data.erase("large_key1");
1319   data["large_key3"] = large_value;
1320   VerifyDB(data);
1321 
1322   // Putting some more small values. These values shouldn't be evicted by
1323   // compaction filter since they are inserted after FIFO eviction.
1324   ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
1325   ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
1326 
1327   // FIFO eviction doesn't trigger again since there enough room for the flush.
1328   ASSERT_OK(blob_db_->Flush(FlushOptions()));
1329   ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1330 
1331   // Manual compact and check if compaction filter evict those keys with
1332   // expiration < 60.
1333   ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1334   // All keys with expiration < 60, plus large_key1 is filtered by
1335   // compaction filter.
1336   ASSERT_EQ(num_keys_to_evict + 1,
1337             statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
1338   ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1339   ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1340   data_after_compact["large_key2"] = large_value;
1341   data_after_compact["large_key3"] = large_value;
1342   VerifyDB(data_after_compact);
1343 }
1344 
TEST_F(BlobDBTest,GarbageCollection)1345 TEST_F(BlobDBTest, GarbageCollection) {
1346   constexpr size_t kNumPuts = 1 << 10;
1347 
1348   constexpr uint64_t kExpiration = 1000;
1349   constexpr uint64_t kCompactTime = 500;
1350 
1351   constexpr uint64_t kKeySize = 7;  // "key" + 4 digits
1352 
1353   constexpr uint64_t kSmallValueSize = 1 << 6;
1354   constexpr uint64_t kLargeValueSize = 1 << 8;
1355   constexpr uint64_t kMinBlobSize = 1 << 7;
1356   static_assert(kSmallValueSize < kMinBlobSize, "");
1357   static_assert(kLargeValueSize > kMinBlobSize, "");
1358 
1359   constexpr size_t kBlobsPerFile = 8;
1360   constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
1361   constexpr uint64_t kBlobFileSize =
1362       BlobLogHeader::kSize +
1363       (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
1364 
1365   BlobDBOptions bdb_options;
1366   bdb_options.min_blob_size = kMinBlobSize;
1367   bdb_options.blob_file_size = kBlobFileSize;
1368   bdb_options.enable_garbage_collection = true;
1369   bdb_options.garbage_collection_cutoff = 0.25;
1370   bdb_options.disable_background_tasks = true;
1371 
1372   Options options;
1373   options.env = mock_env_.get();
1374   options.statistics = CreateDBStatistics();
1375 
1376   Open(bdb_options, options);
1377 
1378   std::map<std::string, std::string> data;
1379   std::map<std::string, KeyVersion> blob_value_versions;
1380   std::map<std::string, BlobIndexVersion> blob_index_versions;
1381 
1382   Random rnd(301);
1383 
1384   // Add a bunch of large non-TTL values. These will be written to non-TTL
1385   // blob files and will be subject to GC.
1386   for (size_t i = 0; i < kNumPuts; ++i) {
1387     std::ostringstream oss;
1388     oss << "key" << std::setw(4) << std::setfill('0') << i;
1389 
1390     const std::string key(oss.str());
1391     const std::string value(
1392         test::RandomHumanReadableString(&rnd, kLargeValueSize));
1393     const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1394 
1395     ASSERT_OK(Put(key, value));
1396     ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1397 
1398     data[key] = value;
1399     blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1400     blob_index_versions[key] =
1401         BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
1402                          sequence, kTypeBlobIndex);
1403   }
1404 
1405   // Add some small and/or TTL values that will be ignored during GC.
1406   // First, add a large TTL value will be written to its own TTL blob file.
1407   {
1408     const std::string key("key2000");
1409     const std::string value(
1410         test::RandomHumanReadableString(&rnd, kLargeValueSize));
1411     const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1412 
1413     ASSERT_OK(PutUntil(key, value, kExpiration));
1414     ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1415 
1416     data[key] = value;
1417     blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1418     blob_index_versions[key] =
1419         BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
1420                          sequence, kTypeBlobIndex);
1421   }
1422 
1423   // Now add a small TTL value (which will be inlined).
1424   {
1425     const std::string key("key3000");
1426     const std::string value(
1427         test::RandomHumanReadableString(&rnd, kSmallValueSize));
1428     const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1429 
1430     ASSERT_OK(PutUntil(key, value, kExpiration));
1431     ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1432 
1433     data[key] = value;
1434     blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1435     blob_index_versions[key] = BlobIndexVersion(
1436         key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
1437   }
1438 
1439   // Finally, add a small non-TTL value (which will be stored as a regular
1440   // value).
1441   {
1442     const std::string key("key4000");
1443     const std::string value(
1444         test::RandomHumanReadableString(&rnd, kSmallValueSize));
1445     const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1446 
1447     ASSERT_OK(Put(key, value));
1448     ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1449 
1450     data[key] = value;
1451     blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
1452     blob_index_versions[key] = BlobIndexVersion(
1453         key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
1454   }
1455 
1456   VerifyDB(data);
1457   VerifyBaseDB(blob_value_versions);
1458   VerifyBaseDBBlobIndex(blob_index_versions);
1459 
1460   // At this point, we should have 128 immutable non-TTL files with file numbers
1461   // 1..128.
1462   {
1463     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1464     ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1465     for (size_t i = 0; i < kNumBlobFiles; ++i) {
1466       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1467       ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1468                 kBlobFileSize + BlobLogFooter::kSize);
1469     }
1470   }
1471 
1472   mock_env_->set_current_time(kCompactTime);
1473 
1474   ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1475 
1476   // We expect the data to remain the same and the blobs from the oldest N files
1477   // to be moved to new files. Sequence numbers get zeroed out during the
1478   // compaction.
1479   VerifyDB(data);
1480 
1481   for (auto &pair : blob_value_versions) {
1482     KeyVersion &version = pair.second;
1483     version.sequence = 0;
1484   }
1485 
1486   VerifyBaseDB(blob_value_versions);
1487 
1488   const uint64_t cutoff = static_cast<uint64_t>(
1489       bdb_options.garbage_collection_cutoff * kNumBlobFiles);
1490   for (auto &pair : blob_index_versions) {
1491     BlobIndexVersion &version = pair.second;
1492 
1493     version.sequence = 0;
1494 
1495     if (version.file_number == kInvalidBlobFileNumber) {
1496       continue;
1497     }
1498 
1499     if (version.file_number > cutoff) {
1500       continue;
1501     }
1502 
1503     version.file_number += kNumBlobFiles + 1;
1504   }
1505 
1506   VerifyBaseDBBlobIndex(blob_index_versions);
1507 
1508   const Statistics *const statistics = options.statistics.get();
1509   assert(statistics);
1510 
1511   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
1512   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
1513   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
1514   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
1515             cutoff * kBlobsPerFile);
1516   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
1517             cutoff * kBlobsPerFile * kLargeValueSize);
1518 
1519   // At this point, we should have 128 immutable non-TTL files with file numbers
1520   // 33..128 and 130..161. (129 was taken by the TTL blob file.)
1521   {
1522     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1523     ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1524     for (size_t i = 0; i < kNumBlobFiles; ++i) {
1525       uint64_t expected_file_number = i + cutoff + 1;
1526       if (expected_file_number > kNumBlobFiles) {
1527         ++expected_file_number;
1528       }
1529 
1530       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
1531       ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1532                 kBlobFileSize + BlobLogFooter::kSize);
1533     }
1534   }
1535 }
1536 
TEST_F(BlobDBTest,GarbageCollectionFailure)1537 TEST_F(BlobDBTest, GarbageCollectionFailure) {
1538   BlobDBOptions bdb_options;
1539   bdb_options.min_blob_size = 0;
1540   bdb_options.enable_garbage_collection = true;
1541   bdb_options.garbage_collection_cutoff = 1.0;
1542   bdb_options.disable_background_tasks = true;
1543 
1544   Options db_options;
1545   db_options.statistics = CreateDBStatistics();
1546 
1547   Open(bdb_options, db_options);
1548 
1549   // Write a couple of valid blobs.
1550   Put("foo", "bar");
1551   Put("dead", "beef");
1552 
1553   // Write a fake blob reference into the base DB that cannot be parsed.
1554   WriteBatch batch;
1555   ASSERT_OK(WriteBatchInternal::PutBlobIndex(
1556       &batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
1557       "not a valid blob index"));
1558   ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
1559 
1560   auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1561   ASSERT_EQ(blob_files.size(), 1);
1562   auto blob_file = blob_files[0];
1563   ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1564 
1565   ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
1566                   .IsCorruption());
1567 
1568   const Statistics *const statistics = db_options.statistics.get();
1569   assert(statistics);
1570 
1571   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
1572   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
1573   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
1574   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
1575   ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
1576 }
1577 
1578 // File should be evicted after expiration.
TEST_F(BlobDBTest,EvictExpiredFile)1579 TEST_F(BlobDBTest, EvictExpiredFile) {
1580   BlobDBOptions bdb_options;
1581   bdb_options.ttl_range_secs = 100;
1582   bdb_options.min_blob_size = 0;
1583   bdb_options.disable_background_tasks = true;
1584   Options options;
1585   options.env = mock_env_.get();
1586   Open(bdb_options, options);
1587   mock_env_->set_current_time(50);
1588   std::map<std::string, std::string> data;
1589   ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
1590   auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1591   ASSERT_EQ(1, blob_files.size());
1592   auto blob_file = blob_files[0];
1593   ASSERT_FALSE(blob_file->Immutable());
1594   ASSERT_FALSE(blob_file->Obsolete());
1595   VerifyDB(data);
1596   mock_env_->set_current_time(250);
1597   // The key should expired now.
1598   blob_db_impl()->TEST_EvictExpiredFiles();
1599   ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1600   ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1601   ASSERT_TRUE(blob_file->Immutable());
1602   ASSERT_TRUE(blob_file->Obsolete());
1603   blob_db_impl()->TEST_DeleteObsoleteFiles();
1604   ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1605   ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1606   // Make sure we don't return garbage value after blob file being evicted,
1607   // but the blob index still exists in the LSM tree.
1608   std::string val = "";
1609   ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
1610   ASSERT_EQ("", val);
1611 }
1612 
TEST_F(BlobDBTest,DisableFileDeletions)1613 TEST_F(BlobDBTest, DisableFileDeletions) {
1614   BlobDBOptions bdb_options;
1615   bdb_options.disable_background_tasks = true;
1616   Open(bdb_options);
1617   std::map<std::string, std::string> data;
1618   for (bool force : {true, false}) {
1619     ASSERT_OK(Put("foo", "v", &data));
1620     auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1621     ASSERT_EQ(1, blob_files.size());
1622     auto blob_file = blob_files[0];
1623     ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1624     blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
1625     ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1626     ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1627     // Call DisableFileDeletions twice.
1628     ASSERT_OK(blob_db_->DisableFileDeletions());
1629     ASSERT_OK(blob_db_->DisableFileDeletions());
1630     // File deletions should be disabled.
1631     blob_db_impl()->TEST_DeleteObsoleteFiles();
1632     ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1633     ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1634     VerifyDB(data);
1635     // Enable file deletions once. If force=true, file deletion is enabled.
1636     // Otherwise it needs to enable it for a second time.
1637     ASSERT_OK(blob_db_->EnableFileDeletions(force));
1638     blob_db_impl()->TEST_DeleteObsoleteFiles();
1639     if (!force) {
1640       ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1641       ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1642       VerifyDB(data);
1643       // Call EnableFileDeletions a second time.
1644       ASSERT_OK(blob_db_->EnableFileDeletions(false));
1645       blob_db_impl()->TEST_DeleteObsoleteFiles();
1646     }
1647     // Regardless of value of `force`, file should be deleted by now.
1648     ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1649     ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1650     VerifyDB({});
1651   }
1652 }
1653 
TEST_F(BlobDBTest,MaintainBlobFileToSstMapping)1654 TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
1655   BlobDBOptions bdb_options;
1656   bdb_options.enable_garbage_collection = true;
1657   bdb_options.disable_background_tasks = true;
1658   Open(bdb_options);
1659 
1660   // Register some dummy blob files.
1661   blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
1662   blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
1663   blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
1664   blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
1665   blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
1666 
1667   // Initialize the blob <-> SST file mapping. First, add some SST files with
1668   // blob file references, then some without.
1669   std::vector<LiveFileMetaData> live_files;
1670 
1671   for (uint64_t i = 1; i <= 10; ++i) {
1672     LiveFileMetaData live_file;
1673     live_file.file_number = i;
1674     live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
1675 
1676     live_files.emplace_back(live_file);
1677   }
1678 
1679   for (uint64_t i = 11; i <= 20; ++i) {
1680     LiveFileMetaData live_file;
1681     live_file.file_number = i;
1682 
1683     live_files.emplace_back(live_file);
1684   }
1685 
1686   blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
1687 
1688   // Check that the blob <-> SST mappings have been correctly initialized.
1689   auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1690 
1691   ASSERT_EQ(blob_files.size(), 5);
1692 
1693   {
1694     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1695     ASSERT_EQ(live_imm_files.size(), 5);
1696     for (size_t i = 0; i < 5; ++i) {
1697       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1698     }
1699 
1700     ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1701   }
1702 
1703   {
1704     const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1705         {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
1706     const std::vector<bool> expected_obsolete{false, false, false, false,
1707                                               false};
1708     for (size_t i = 0; i < 5; ++i) {
1709       const auto &blob_file = blob_files[i];
1710       ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1711       ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1712     }
1713 
1714     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1715     ASSERT_EQ(live_imm_files.size(), 5);
1716     for (size_t i = 0; i < 5; ++i) {
1717       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1718     }
1719 
1720     ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1721   }
1722 
1723   // Simulate a flush where the SST does not reference any blob files.
1724   {
1725     FlushJobInfo info{};
1726     info.file_number = 21;
1727     info.smallest_seqno = 1;
1728     info.largest_seqno = 100;
1729 
1730     blob_db_impl()->TEST_ProcessFlushJobInfo(info);
1731 
1732     const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1733         {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
1734     const std::vector<bool> expected_obsolete{false, false, false, false,
1735                                               false};
1736     for (size_t i = 0; i < 5; ++i) {
1737       const auto &blob_file = blob_files[i];
1738       ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1739       ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1740     }
1741 
1742     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1743     ASSERT_EQ(live_imm_files.size(), 5);
1744     for (size_t i = 0; i < 5; ++i) {
1745       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1746     }
1747 
1748     ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1749   }
1750 
1751   // Simulate a flush where the SST references a blob file.
1752   {
1753     FlushJobInfo info{};
1754     info.file_number = 22;
1755     info.oldest_blob_file_number = 5;
1756     info.smallest_seqno = 101;
1757     info.largest_seqno = 200;
1758 
1759     blob_db_impl()->TEST_ProcessFlushJobInfo(info);
1760 
1761     const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1762         {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
1763     const std::vector<bool> expected_obsolete{false, false, false, false,
1764                                               false};
1765     for (size_t i = 0; i < 5; ++i) {
1766       const auto &blob_file = blob_files[i];
1767       ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1768       ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1769     }
1770 
1771     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1772     ASSERT_EQ(live_imm_files.size(), 5);
1773     for (size_t i = 0; i < 5; ++i) {
1774       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1775     }
1776 
1777     ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1778   }
1779 
1780   // Simulate a compaction. Some inputs and outputs have blob file references,
1781   // some don't. There is also a trivial move (which means the SST appears on
1782   // both the input and the output list). Blob file 1 loses all its linked SSTs,
1783   // and since it got marked immutable at sequence number 200 which has already
1784   // been flushed, it can be marked obsolete.
1785   {
1786     CompactionJobInfo info{};
1787     info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
1788     info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
1789     info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
1790     info.input_file_infos.emplace_back(
1791         CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
1792     info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
1793     info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
1794     info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
1795     info.output_file_infos.emplace_back(
1796         CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
1797 
1798     blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
1799 
1800     const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1801         {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
1802     const std::vector<bool> expected_obsolete{true, false, false, false, false};
1803     for (size_t i = 0; i < 5; ++i) {
1804       const auto &blob_file = blob_files[i];
1805       ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1806       ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1807     }
1808 
1809     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1810     ASSERT_EQ(live_imm_files.size(), 4);
1811     for (size_t i = 0; i < 4; ++i) {
1812       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
1813     }
1814 
1815     auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1816     ASSERT_EQ(obsolete_files.size(), 1);
1817     ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1818   }
1819 
1820   // Simulate a failed compaction. No mappings should be updated.
1821   {
1822     CompactionJobInfo info{};
1823     info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
1824     info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
1825     info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
1826     info.status = Status::Corruption();
1827 
1828     blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
1829 
1830     const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1831         {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
1832     const std::vector<bool> expected_obsolete{true, false, false, false, false};
1833     for (size_t i = 0; i < 5; ++i) {
1834       const auto &blob_file = blob_files[i];
1835       ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1836       ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1837     }
1838 
1839     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1840     ASSERT_EQ(live_imm_files.size(), 4);
1841     for (size_t i = 0; i < 4; ++i) {
1842       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
1843     }
1844 
1845     auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1846     ASSERT_EQ(obsolete_files.size(), 1);
1847     ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1848   }
1849 
1850   // Simulate another compaction. Blob file 2 loses all its linked SSTs
1851   // but since it got marked immutable at sequence number 300 which hasn't
1852   // been flushed yet, it cannot be marked obsolete at this point.
1853   {
1854     CompactionJobInfo info{};
1855     info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
1856     info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
1857     info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
1858 
1859     blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
1860 
1861     const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1862         {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
1863     const std::vector<bool> expected_obsolete{true, false, false, false, false};
1864     for (size_t i = 0; i < 5; ++i) {
1865       const auto &blob_file = blob_files[i];
1866       ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1867       ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1868     }
1869 
1870     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1871     ASSERT_EQ(live_imm_files.size(), 4);
1872     for (size_t i = 0; i < 4; ++i) {
1873       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
1874     }
1875 
1876     auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1877     ASSERT_EQ(obsolete_files.size(), 1);
1878     ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1879   }
1880 
1881   // Simulate a flush with largest sequence number 300. This will make it
1882   // possible to mark blob file 2 obsolete.
1883   {
1884     FlushJobInfo info{};
1885     info.file_number = 26;
1886     info.smallest_seqno = 201;
1887     info.largest_seqno = 300;
1888 
1889     blob_db_impl()->TEST_ProcessFlushJobInfo(info);
1890 
1891     const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1892         {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
1893     const std::vector<bool> expected_obsolete{true, true, false, false, false};
1894     for (size_t i = 0; i < 5; ++i) {
1895       const auto &blob_file = blob_files[i];
1896       ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1897       ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1898     }
1899 
1900     auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1901     ASSERT_EQ(live_imm_files.size(), 3);
1902     for (size_t i = 0; i < 3; ++i) {
1903       ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
1904     }
1905 
1906     auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1907     ASSERT_EQ(obsolete_files.size(), 2);
1908     ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1909     ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
1910   }
1911 }
1912 
TEST_F(BlobDBTest,ShutdownWait)1913 TEST_F(BlobDBTest, ShutdownWait) {
1914   BlobDBOptions bdb_options;
1915   bdb_options.ttl_range_secs = 100;
1916   bdb_options.min_blob_size = 0;
1917   bdb_options.disable_background_tasks = false;
1918   Options options;
1919   options.env = mock_env_.get();
1920 
1921   SyncPoint::GetInstance()->LoadDependency({
1922       {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
1923       {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
1924       {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
1925       {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
1926   });
1927   // Force all tasks to be scheduled immediately.
1928   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1929       "TimeQueue::Add:item.end", [&](void *arg) {
1930         std::chrono::steady_clock::time_point *tp =
1931             static_cast<std::chrono::steady_clock::time_point *>(arg);
1932         *tp =
1933             std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
1934       });
1935 
1936   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1937       "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
1938         // Sleep 3 ms to increase the chance of data race.
1939         // We've synced up the code so that EvictExpiredFiles()
1940         // is called concurrently with ~BlobDBImpl().
1941         // ~BlobDBImpl() is supposed to wait for all background
1942         // task to shutdown before doing anything else. In order
1943         // to use the same test to reproduce a bug of the waiting
1944         // logic, we wait a little bit here, so that TSAN can
1945         // catch the data race.
1946         // We should improve the test if we find a better way.
1947         Env::Default()->SleepForMicroseconds(3000);
1948       });
1949 
1950   SyncPoint::GetInstance()->EnableProcessing();
1951 
1952   Open(bdb_options, options);
1953   mock_env_->set_current_time(50);
1954   std::map<std::string, std::string> data;
1955   ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
1956   auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1957   ASSERT_EQ(1, blob_files.size());
1958   auto blob_file = blob_files[0];
1959   ASSERT_FALSE(blob_file->Immutable());
1960   ASSERT_FALSE(blob_file->Obsolete());
1961   VerifyDB(data);
1962 
1963   TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
1964   mock_env_->set_current_time(250);
1965   // The key should expired now.
1966   TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
1967 
1968   TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
1969   TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
1970   Close();
1971 
1972   SyncPoint::GetInstance()->DisableProcessing();
1973 }
1974 
1975 }  //  namespace blob_db
1976 }  // namespace ROCKSDB_NAMESPACE
1977 
1978 // A black-box test for the ttl wrapper around rocksdb
main(int argc,char ** argv)1979 int main(int argc, char** argv) {
1980   ::testing::InitGoogleTest(&argc, argv);
1981   return RUN_ALL_TESTS();
1982 }
1983 
1984 #else
1985 #include <stdio.h>
1986 
main(int,char **)1987 int main(int /*argc*/, char** /*argv*/) {
1988   fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
1989   return 0;
1990 }
1991 
1992 #endif  // !ROCKSDB_LITE
1993