1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include <algorithm>
9 #include <chrono>
10 #include <cstdlib>
11 #include <iomanip>
12 #include <map>
13 #include <memory>
14 #include <sstream>
15 #include <string>
16 #include <vector>
17
18 #include "db/blob/blob_index.h"
19 #include "db/db_test_util.h"
20 #include "env/composite_env_wrapper.h"
21 #include "file/file_util.h"
22 #include "file/sst_file_manager_impl.h"
23 #include "port/port.h"
24 #include "rocksdb/utilities/debug.h"
25 #include "test_util/fault_injection_test_env.h"
26 #include "test_util/sync_point.h"
27 #include "test_util/testharness.h"
28 #include "util/cast_util.h"
29 #include "util/random.h"
30 #include "util/string_util.h"
31 #include "utilities/blob_db/blob_db.h"
32 #include "utilities/blob_db/blob_db_impl.h"
33
34 namespace ROCKSDB_NAMESPACE {
35 namespace blob_db {
36
37 class BlobDBTest : public testing::Test {
38 public:
39 const int kMaxBlobSize = 1 << 14;
40
41 struct BlobIndexVersion {
42 BlobIndexVersion() = default;
BlobIndexVersionROCKSDB_NAMESPACE::blob_db::BlobDBTest::BlobIndexVersion43 BlobIndexVersion(std::string _user_key, uint64_t _file_number,
44 uint64_t _expiration, SequenceNumber _sequence,
45 ValueType _type)
46 : user_key(std::move(_user_key)),
47 file_number(_file_number),
48 expiration(_expiration),
49 sequence(_sequence),
50 type(_type) {}
51
52 std::string user_key;
53 uint64_t file_number = kInvalidBlobFileNumber;
54 uint64_t expiration = kNoExpiration;
55 SequenceNumber sequence = 0;
56 ValueType type = kTypeValue;
57 };
58
BlobDBTest()59 BlobDBTest()
60 : dbname_(test::PerThreadDBPath("blob_db_test")),
61 mock_env_(new MockTimeEnv(Env::Default())),
62 fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
63 blob_db_(nullptr) {
64 Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
65 assert(s.ok());
66 }
67
~BlobDBTest()68 ~BlobDBTest() override {
69 SyncPoint::GetInstance()->ClearAllCallBacks();
70 Destroy();
71 }
72
TryOpen(BlobDBOptions bdb_options=BlobDBOptions (),Options options=Options ())73 Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
74 Options options = Options()) {
75 options.create_if_missing = true;
76 return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
77 }
78
Open(BlobDBOptions bdb_options=BlobDBOptions (),Options options=Options ())79 void Open(BlobDBOptions bdb_options = BlobDBOptions(),
80 Options options = Options()) {
81 ASSERT_OK(TryOpen(bdb_options, options));
82 }
83
Reopen(BlobDBOptions bdb_options=BlobDBOptions (),Options options=Options ())84 void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
85 Options options = Options()) {
86 assert(blob_db_ != nullptr);
87 delete blob_db_;
88 blob_db_ = nullptr;
89 Open(bdb_options, options);
90 }
91
Close()92 void Close() {
93 assert(blob_db_ != nullptr);
94 delete blob_db_;
95 blob_db_ = nullptr;
96 }
97
Destroy()98 void Destroy() {
99 if (blob_db_) {
100 Options options = blob_db_->GetOptions();
101 BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
102 delete blob_db_;
103 blob_db_ = nullptr;
104 ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
105 }
106 }
107
blob_db_impl()108 BlobDBImpl *blob_db_impl() {
109 return reinterpret_cast<BlobDBImpl *>(blob_db_);
110 }
111
Put(const Slice & key,const Slice & value,std::map<std::string,std::string> * data=nullptr)112 Status Put(const Slice &key, const Slice &value,
113 std::map<std::string, std::string> *data = nullptr) {
114 Status s = blob_db_->Put(WriteOptions(), key, value);
115 if (data != nullptr) {
116 (*data)[key.ToString()] = value.ToString();
117 }
118 return s;
119 }
120
Delete(const std::string & key,std::map<std::string,std::string> * data=nullptr)121 void Delete(const std::string &key,
122 std::map<std::string, std::string> *data = nullptr) {
123 ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
124 if (data != nullptr) {
125 data->erase(key);
126 }
127 }
128
PutWithTTL(const Slice & key,const Slice & value,uint64_t ttl,std::map<std::string,std::string> * data=nullptr)129 Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
130 std::map<std::string, std::string> *data = nullptr) {
131 Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
132 if (data != nullptr) {
133 (*data)[key.ToString()] = value.ToString();
134 }
135 return s;
136 }
137
PutUntil(const Slice & key,const Slice & value,uint64_t expiration)138 Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
139 return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
140 }
141
PutRandomWithTTL(const std::string & key,uint64_t ttl,Random * rnd,std::map<std::string,std::string> * data=nullptr)142 void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
143 std::map<std::string, std::string> *data = nullptr) {
144 int len = rnd->Next() % kMaxBlobSize + 1;
145 std::string value = test::RandomHumanReadableString(rnd, len);
146 ASSERT_OK(
147 blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
148 if (data != nullptr) {
149 (*data)[key] = value;
150 }
151 }
152
PutRandomUntil(const std::string & key,uint64_t expiration,Random * rnd,std::map<std::string,std::string> * data=nullptr)153 void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
154 std::map<std::string, std::string> *data = nullptr) {
155 int len = rnd->Next() % kMaxBlobSize + 1;
156 std::string value = test::RandomHumanReadableString(rnd, len);
157 ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
158 expiration));
159 if (data != nullptr) {
160 (*data)[key] = value;
161 }
162 }
163
PutRandom(const std::string & key,Random * rnd,std::map<std::string,std::string> * data=nullptr)164 void PutRandom(const std::string &key, Random *rnd,
165 std::map<std::string, std::string> *data = nullptr) {
166 PutRandom(blob_db_, key, rnd, data);
167 }
168
PutRandom(DB * db,const std::string & key,Random * rnd,std::map<std::string,std::string> * data=nullptr)169 void PutRandom(DB *db, const std::string &key, Random *rnd,
170 std::map<std::string, std::string> *data = nullptr) {
171 int len = rnd->Next() % kMaxBlobSize + 1;
172 std::string value = test::RandomHumanReadableString(rnd, len);
173 ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
174 if (data != nullptr) {
175 (*data)[key] = value;
176 }
177 }
178
PutRandomToWriteBatch(const std::string & key,Random * rnd,WriteBatch * batch,std::map<std::string,std::string> * data=nullptr)179 void PutRandomToWriteBatch(
180 const std::string &key, Random *rnd, WriteBatch *batch,
181 std::map<std::string, std::string> *data = nullptr) {
182 int len = rnd->Next() % kMaxBlobSize + 1;
183 std::string value = test::RandomHumanReadableString(rnd, len);
184 ASSERT_OK(batch->Put(key, value));
185 if (data != nullptr) {
186 (*data)[key] = value;
187 }
188 }
189
190 // Verify blob db contain expected data and nothing more.
VerifyDB(const std::map<std::string,std::string> & data)191 void VerifyDB(const std::map<std::string, std::string> &data) {
192 VerifyDB(blob_db_, data);
193 }
194
VerifyDB(DB * db,const std::map<std::string,std::string> & data)195 void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
196 // Verify normal Get
197 auto* cfh = db->DefaultColumnFamily();
198 for (auto &p : data) {
199 PinnableSlice value_slice;
200 ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
201 ASSERT_EQ(p.second, value_slice.ToString());
202 std::string value;
203 ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
204 ASSERT_EQ(p.second, value);
205 }
206
207 // Verify iterators
208 Iterator *iter = db->NewIterator(ReadOptions());
209 iter->SeekToFirst();
210 for (auto &p : data) {
211 ASSERT_TRUE(iter->Valid());
212 ASSERT_EQ(p.first, iter->key().ToString());
213 ASSERT_EQ(p.second, iter->value().ToString());
214 iter->Next();
215 }
216 ASSERT_FALSE(iter->Valid());
217 ASSERT_OK(iter->status());
218 delete iter;
219 }
220
VerifyBaseDB(const std::map<std::string,KeyVersion> & expected_versions)221 void VerifyBaseDB(
222 const std::map<std::string, KeyVersion> &expected_versions) {
223 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
224 DB *db = blob_db_->GetRootDB();
225 const size_t kMaxKeys = 10000;
226 std::vector<KeyVersion> versions;
227 GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
228 ASSERT_EQ(expected_versions.size(), versions.size());
229 size_t i = 0;
230 for (auto &key_version : expected_versions) {
231 const KeyVersion &expected_version = key_version.second;
232 ASSERT_EQ(expected_version.user_key, versions[i].user_key);
233 ASSERT_EQ(expected_version.sequence, versions[i].sequence);
234 ASSERT_EQ(expected_version.type, versions[i].type);
235 if (versions[i].type == kTypeValue) {
236 ASSERT_EQ(expected_version.value, versions[i].value);
237 } else {
238 ASSERT_EQ(kTypeBlobIndex, versions[i].type);
239 PinnableSlice value;
240 ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
241 versions[i].value, &value));
242 ASSERT_EQ(expected_version.value, value.ToString());
243 }
244 i++;
245 }
246 }
247
VerifyBaseDBBlobIndex(const std::map<std::string,BlobIndexVersion> & expected_versions)248 void VerifyBaseDBBlobIndex(
249 const std::map<std::string, BlobIndexVersion> &expected_versions) {
250 const size_t kMaxKeys = 10000;
251 std::vector<KeyVersion> versions;
252 ASSERT_OK(
253 GetAllKeyVersions(blob_db_->GetRootDB(), "", "", kMaxKeys, &versions));
254 ASSERT_EQ(versions.size(), expected_versions.size());
255
256 size_t i = 0;
257 for (const auto &expected_pair : expected_versions) {
258 const BlobIndexVersion &expected_version = expected_pair.second;
259
260 ASSERT_EQ(versions[i].user_key, expected_version.user_key);
261 ASSERT_EQ(versions[i].sequence, expected_version.sequence);
262 ASSERT_EQ(versions[i].type, expected_version.type);
263 if (versions[i].type != kTypeBlobIndex) {
264 ASSERT_EQ(kInvalidBlobFileNumber, expected_version.file_number);
265 ASSERT_EQ(kNoExpiration, expected_version.expiration);
266
267 ++i;
268 continue;
269 }
270
271 BlobIndex blob_index;
272 ASSERT_OK(blob_index.DecodeFrom(versions[i].value));
273
274 const uint64_t file_number = !blob_index.IsInlined()
275 ? blob_index.file_number()
276 : kInvalidBlobFileNumber;
277 ASSERT_EQ(file_number, expected_version.file_number);
278
279 const uint64_t expiration =
280 blob_index.HasTTL() ? blob_index.expiration() : kNoExpiration;
281 ASSERT_EQ(expiration, expected_version.expiration);
282
283 ++i;
284 }
285 }
286
InsertBlobs()287 void InsertBlobs() {
288 WriteOptions wo;
289 std::string value;
290
291 Random rnd(301);
292 for (size_t i = 0; i < 100000; i++) {
293 uint64_t ttl = rnd.Next() % 86400;
294 PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
295 }
296
297 for (size_t i = 0; i < 10; i++) {
298 Delete("key" + ToString(i % 500));
299 }
300 }
301
302 const std::string dbname_;
303 std::unique_ptr<MockTimeEnv> mock_env_;
304 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
305 BlobDB *blob_db_;
306 }; // class BlobDBTest
307
TEST_F(BlobDBTest,Put)308 TEST_F(BlobDBTest, Put) {
309 Random rnd(301);
310 BlobDBOptions bdb_options;
311 bdb_options.min_blob_size = 0;
312 bdb_options.disable_background_tasks = true;
313 Open(bdb_options);
314 std::map<std::string, std::string> data;
315 for (size_t i = 0; i < 100; i++) {
316 PutRandom("key" + ToString(i), &rnd, &data);
317 }
318 VerifyDB(data);
319 }
320
TEST_F(BlobDBTest,PutWithTTL)321 TEST_F(BlobDBTest, PutWithTTL) {
322 Random rnd(301);
323 Options options;
324 options.env = mock_env_.get();
325 BlobDBOptions bdb_options;
326 bdb_options.ttl_range_secs = 1000;
327 bdb_options.min_blob_size = 0;
328 bdb_options.blob_file_size = 256 * 1000 * 1000;
329 bdb_options.disable_background_tasks = true;
330 Open(bdb_options, options);
331 std::map<std::string, std::string> data;
332 mock_env_->set_current_time(50);
333 for (size_t i = 0; i < 100; i++) {
334 uint64_t ttl = rnd.Next() % 100;
335 PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
336 (ttl <= 50 ? nullptr : &data));
337 }
338 mock_env_->set_current_time(100);
339 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
340 auto blob_files = bdb_impl->TEST_GetBlobFiles();
341 ASSERT_EQ(1, blob_files.size());
342 ASSERT_TRUE(blob_files[0]->HasTTL());
343 ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
344 VerifyDB(data);
345 }
346
TEST_F(BlobDBTest,PutUntil)347 TEST_F(BlobDBTest, PutUntil) {
348 Random rnd(301);
349 Options options;
350 options.env = mock_env_.get();
351 BlobDBOptions bdb_options;
352 bdb_options.ttl_range_secs = 1000;
353 bdb_options.min_blob_size = 0;
354 bdb_options.blob_file_size = 256 * 1000 * 1000;
355 bdb_options.disable_background_tasks = true;
356 Open(bdb_options, options);
357 std::map<std::string, std::string> data;
358 mock_env_->set_current_time(50);
359 for (size_t i = 0; i < 100; i++) {
360 uint64_t expiration = rnd.Next() % 100 + 50;
361 PutRandomUntil("key" + ToString(i), expiration, &rnd,
362 (expiration <= 100 ? nullptr : &data));
363 }
364 mock_env_->set_current_time(100);
365 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
366 auto blob_files = bdb_impl->TEST_GetBlobFiles();
367 ASSERT_EQ(1, blob_files.size());
368 ASSERT_TRUE(blob_files[0]->HasTTL());
369 ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
370 VerifyDB(data);
371 }
372
TEST_F(BlobDBTest,StackableDBGet)373 TEST_F(BlobDBTest, StackableDBGet) {
374 Random rnd(301);
375 BlobDBOptions bdb_options;
376 bdb_options.min_blob_size = 0;
377 bdb_options.disable_background_tasks = true;
378 Open(bdb_options);
379 std::map<std::string, std::string> data;
380 for (size_t i = 0; i < 100; i++) {
381 PutRandom("key" + ToString(i), &rnd, &data);
382 }
383 for (size_t i = 0; i < 100; i++) {
384 StackableDB *db = blob_db_;
385 ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
386 std::string key = "key" + ToString(i);
387 PinnableSlice pinnable_value;
388 ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
389 std::string string_value;
390 ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
391 ASSERT_EQ(string_value, pinnable_value.ToString());
392 ASSERT_EQ(string_value, data[key]);
393 }
394 }
395
TEST_F(BlobDBTest,GetExpiration)396 TEST_F(BlobDBTest, GetExpiration) {
397 Options options;
398 options.env = mock_env_.get();
399 BlobDBOptions bdb_options;
400 bdb_options.disable_background_tasks = true;
401 mock_env_->set_current_time(100);
402 Open(bdb_options, options);
403 Put("key1", "value1");
404 PutWithTTL("key2", "value2", 200);
405 PinnableSlice value;
406 uint64_t expiration;
407 ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
408 ASSERT_EQ("value1", value.ToString());
409 ASSERT_EQ(kNoExpiration, expiration);
410 ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
411 ASSERT_EQ("value2", value.ToString());
412 ASSERT_EQ(300 /* = 100 + 200 */, expiration);
413 }
414
TEST_F(BlobDBTest,GetIOError)415 TEST_F(BlobDBTest, GetIOError) {
416 Options options;
417 options.env = fault_injection_env_.get();
418 BlobDBOptions bdb_options;
419 bdb_options.min_blob_size = 0; // Make sure value write to blob file
420 bdb_options.disable_background_tasks = true;
421 Open(bdb_options, options);
422 ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
423 PinnableSlice value;
424 ASSERT_OK(Put("foo", "bar"));
425 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
426 Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
427 ASSERT_TRUE(s.IsIOError());
428 // Reactivate file system to allow test to close DB.
429 fault_injection_env_->SetFilesystemActive(true);
430 }
431
TEST_F(BlobDBTest,PutIOError)432 TEST_F(BlobDBTest, PutIOError) {
433 Options options;
434 options.env = fault_injection_env_.get();
435 BlobDBOptions bdb_options;
436 bdb_options.min_blob_size = 0; // Make sure value write to blob file
437 bdb_options.disable_background_tasks = true;
438 Open(bdb_options, options);
439 fault_injection_env_->SetFilesystemActive(false, Status::IOError());
440 ASSERT_TRUE(Put("foo", "v1").IsIOError());
441 fault_injection_env_->SetFilesystemActive(true, Status::IOError());
442 ASSERT_OK(Put("bar", "v1"));
443 }
444
TEST_F(BlobDBTest,WriteBatch)445 TEST_F(BlobDBTest, WriteBatch) {
446 Random rnd(301);
447 BlobDBOptions bdb_options;
448 bdb_options.min_blob_size = 0;
449 bdb_options.disable_background_tasks = true;
450 Open(bdb_options);
451 std::map<std::string, std::string> data;
452 for (size_t i = 0; i < 100; i++) {
453 WriteBatch batch;
454 for (size_t j = 0; j < 10; j++) {
455 PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data);
456 }
457 blob_db_->Write(WriteOptions(), &batch);
458 }
459 VerifyDB(data);
460 }
461
TEST_F(BlobDBTest,Delete)462 TEST_F(BlobDBTest, Delete) {
463 Random rnd(301);
464 BlobDBOptions bdb_options;
465 bdb_options.min_blob_size = 0;
466 bdb_options.disable_background_tasks = true;
467 Open(bdb_options);
468 std::map<std::string, std::string> data;
469 for (size_t i = 0; i < 100; i++) {
470 PutRandom("key" + ToString(i), &rnd, &data);
471 }
472 for (size_t i = 0; i < 100; i += 5) {
473 Delete("key" + ToString(i), &data);
474 }
475 VerifyDB(data);
476 }
477
TEST_F(BlobDBTest,DeleteBatch)478 TEST_F(BlobDBTest, DeleteBatch) {
479 Random rnd(301);
480 BlobDBOptions bdb_options;
481 bdb_options.min_blob_size = 0;
482 bdb_options.disable_background_tasks = true;
483 Open(bdb_options);
484 for (size_t i = 0; i < 100; i++) {
485 PutRandom("key" + ToString(i), &rnd);
486 }
487 WriteBatch batch;
488 for (size_t i = 0; i < 100; i++) {
489 batch.Delete("key" + ToString(i));
490 }
491 ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
492 // DB should be empty.
493 VerifyDB({});
494 }
495
TEST_F(BlobDBTest,Override)496 TEST_F(BlobDBTest, Override) {
497 Random rnd(301);
498 BlobDBOptions bdb_options;
499 bdb_options.min_blob_size = 0;
500 bdb_options.disable_background_tasks = true;
501 Open(bdb_options);
502 std::map<std::string, std::string> data;
503 for (int i = 0; i < 10000; i++) {
504 PutRandom("key" + ToString(i), &rnd, nullptr);
505 }
506 // override all the keys
507 for (int i = 0; i < 10000; i++) {
508 PutRandom("key" + ToString(i), &rnd, &data);
509 }
510 VerifyDB(data);
511 }
512
513 #ifdef SNAPPY
TEST_F(BlobDBTest,Compression)514 TEST_F(BlobDBTest, Compression) {
515 Random rnd(301);
516 BlobDBOptions bdb_options;
517 bdb_options.min_blob_size = 0;
518 bdb_options.disable_background_tasks = true;
519 bdb_options.compression = CompressionType::kSnappyCompression;
520 Open(bdb_options);
521 std::map<std::string, std::string> data;
522 for (size_t i = 0; i < 100; i++) {
523 PutRandom("put-key" + ToString(i), &rnd, &data);
524 }
525 for (int i = 0; i < 100; i++) {
526 WriteBatch batch;
527 for (size_t j = 0; j < 10; j++) {
528 PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd,
529 &batch, &data);
530 }
531 blob_db_->Write(WriteOptions(), &batch);
532 }
533 VerifyDB(data);
534 }
535
TEST_F(BlobDBTest,DecompressAfterReopen)536 TEST_F(BlobDBTest, DecompressAfterReopen) {
537 Random rnd(301);
538 BlobDBOptions bdb_options;
539 bdb_options.min_blob_size = 0;
540 bdb_options.disable_background_tasks = true;
541 bdb_options.compression = CompressionType::kSnappyCompression;
542 Open(bdb_options);
543 std::map<std::string, std::string> data;
544 for (size_t i = 0; i < 100; i++) {
545 PutRandom("put-key" + ToString(i), &rnd, &data);
546 }
547 VerifyDB(data);
548 bdb_options.compression = CompressionType::kNoCompression;
549 Reopen(bdb_options);
550 VerifyDB(data);
551 }
552 #endif
553
TEST_F(BlobDBTest,MultipleWriters)554 TEST_F(BlobDBTest, MultipleWriters) {
555 Open(BlobDBOptions());
556
557 std::vector<port::Thread> workers;
558 std::vector<std::map<std::string, std::string>> data_set(10);
559 for (uint32_t i = 0; i < 10; i++)
560 workers.push_back(port::Thread(
561 [&](uint32_t id) {
562 Random rnd(301 + id);
563 for (int j = 0; j < 100; j++) {
564 std::string key = "key" + ToString(id) + "_" + ToString(j);
565 if (id < 5) {
566 PutRandom(key, &rnd, &data_set[id]);
567 } else {
568 WriteBatch batch;
569 PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
570 blob_db_->Write(WriteOptions(), &batch);
571 }
572 }
573 },
574 i));
575 std::map<std::string, std::string> data;
576 for (size_t i = 0; i < 10; i++) {
577 workers[i].join();
578 data.insert(data_set[i].begin(), data_set[i].end());
579 }
580 VerifyDB(data);
581 }
582
TEST_F(BlobDBTest,SstFileManager)583 TEST_F(BlobDBTest, SstFileManager) {
584 // run the same test for Get(), MultiGet() and Iterator each.
585 std::shared_ptr<SstFileManager> sst_file_manager(
586 NewSstFileManager(mock_env_.get()));
587 sst_file_manager->SetDeleteRateBytesPerSecond(1);
588 SstFileManagerImpl *sfm =
589 static_cast<SstFileManagerImpl *>(sst_file_manager.get());
590
591 BlobDBOptions bdb_options;
592 bdb_options.min_blob_size = 0;
593 bdb_options.enable_garbage_collection = true;
594 bdb_options.garbage_collection_cutoff = 1.0;
595 Options db_options;
596
597 int files_scheduled_to_delete = 0;
598 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
599 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
600 assert(arg);
601 const std::string *const file_path =
602 static_cast<const std::string *>(arg);
603 if (file_path->find(".blob") != std::string::npos) {
604 ++files_scheduled_to_delete;
605 }
606 });
607 SyncPoint::GetInstance()->EnableProcessing();
608 db_options.sst_file_manager = sst_file_manager;
609
610 Open(bdb_options, db_options);
611
612 // Create one obselete file and clean it.
613 blob_db_->Put(WriteOptions(), "foo", "bar");
614 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
615 ASSERT_EQ(1, blob_files.size());
616 std::shared_ptr<BlobFile> bfile = blob_files[0];
617 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
618 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
619 blob_db_impl()->TEST_DeleteObsoleteFiles();
620
621 // Even if SSTFileManager is not set, DB is creating a dummy one.
622 ASSERT_EQ(1, files_scheduled_to_delete);
623 Destroy();
624 // Make sure that DestroyBlobDB() also goes through delete scheduler.
625 ASSERT_EQ(2, files_scheduled_to_delete);
626 SyncPoint::GetInstance()->DisableProcessing();
627 sfm->WaitForEmptyTrash();
628 }
629
TEST_F(BlobDBTest,SstFileManagerRestart)630 TEST_F(BlobDBTest, SstFileManagerRestart) {
631 int files_scheduled_to_delete = 0;
632 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
633 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg) {
634 assert(arg);
635 const std::string *const file_path =
636 static_cast<const std::string *>(arg);
637 if (file_path->find(".blob") != std::string::npos) {
638 ++files_scheduled_to_delete;
639 }
640 });
641
642 // run the same test for Get(), MultiGet() and Iterator each.
643 std::shared_ptr<SstFileManager> sst_file_manager(
644 NewSstFileManager(mock_env_.get()));
645 sst_file_manager->SetDeleteRateBytesPerSecond(1);
646 SstFileManagerImpl *sfm =
647 static_cast<SstFileManagerImpl *>(sst_file_manager.get());
648
649 BlobDBOptions bdb_options;
650 bdb_options.min_blob_size = 0;
651 Options db_options;
652
653 SyncPoint::GetInstance()->EnableProcessing();
654 db_options.sst_file_manager = sst_file_manager;
655
656 Open(bdb_options, db_options);
657 std::string blob_dir = blob_db_impl()->TEST_blob_dir();
658 blob_db_->Put(WriteOptions(), "foo", "bar");
659 Close();
660
661 // Create 3 dummy trash files under the blob_dir
662 LegacyFileSystemWrapper fs(db_options.env);
663 CreateFile(&fs, blob_dir + "/000666.blob.trash", "", false);
664 CreateFile(&fs, blob_dir + "/000888.blob.trash", "", true);
665 CreateFile(&fs, blob_dir + "/something_not_match.trash", "", false);
666
667 // Make sure that reopening the DB rescan the existing trash files
668 Open(bdb_options, db_options);
669 ASSERT_EQ(files_scheduled_to_delete, 2);
670
671 sfm->WaitForEmptyTrash();
672
673 // There should be exact one file under the blob dir now.
674 std::vector<std::string> all_files;
675 ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
676 int nfiles = 0;
677 for (const auto &f : all_files) {
678 assert(!f.empty());
679 if (f[0] == '.') {
680 continue;
681 }
682 nfiles++;
683 }
684 ASSERT_EQ(nfiles, 1);
685
686 SyncPoint::GetInstance()->DisableProcessing();
687 }
688
TEST_F(BlobDBTest,SnapshotAndGarbageCollection)689 TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
690 BlobDBOptions bdb_options;
691 bdb_options.min_blob_size = 0;
692 bdb_options.enable_garbage_collection = true;
693 bdb_options.garbage_collection_cutoff = 1.0;
694 bdb_options.disable_background_tasks = true;
695
696 // i = when to take snapshot
697 for (int i = 0; i < 4; i++) {
698 Destroy();
699 Open(bdb_options);
700
701 const Snapshot *snapshot = nullptr;
702
703 // First file
704 ASSERT_OK(Put("key1", "value"));
705 if (i == 0) {
706 snapshot = blob_db_->GetSnapshot();
707 }
708
709 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
710 ASSERT_EQ(1, blob_files.size());
711 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
712
713 // Second file
714 ASSERT_OK(Put("key2", "value"));
715 if (i == 1) {
716 snapshot = blob_db_->GetSnapshot();
717 }
718
719 blob_files = blob_db_impl()->TEST_GetBlobFiles();
720 ASSERT_EQ(2, blob_files.size());
721 auto bfile = blob_files[1];
722 ASSERT_FALSE(bfile->Immutable());
723 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
724
725 // Third file
726 ASSERT_OK(Put("key3", "value"));
727 if (i == 2) {
728 snapshot = blob_db_->GetSnapshot();
729 }
730
731 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
732 ASSERT_TRUE(bfile->Obsolete());
733 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
734 bfile->GetObsoleteSequence());
735
736 Delete("key2");
737 if (i == 3) {
738 snapshot = blob_db_->GetSnapshot();
739 }
740
741 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
742 blob_db_impl()->TEST_DeleteObsoleteFiles();
743
744 if (i >= 2) {
745 // The snapshot shouldn't see data in bfile
746 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
747 blob_db_->ReleaseSnapshot(snapshot);
748 } else {
749 // The snapshot will see data in bfile, so the file shouldn't be deleted
750 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
751 blob_db_->ReleaseSnapshot(snapshot);
752 blob_db_impl()->TEST_DeleteObsoleteFiles();
753 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
754 }
755 }
756 }
757
TEST_F(BlobDBTest,ColumnFamilyNotSupported)758 TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
759 Options options;
760 options.env = mock_env_.get();
761 mock_env_->set_current_time(0);
762 Open(BlobDBOptions(), options);
763 ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
764 ColumnFamilyHandle *handle = nullptr;
765 std::string value;
766 std::vector<std::string> values;
767 // The call simply pass through to base db. It should succeed.
768 ASSERT_OK(
769 blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
770 ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
771 ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
772 .IsNotSupported());
773 ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
774 .IsNotSupported());
775 WriteBatch batch;
776 batch.Put("k1", "v1");
777 batch.Put(handle, "k2", "v2");
778 ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
779 ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
780 ASSERT_TRUE(
781 blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
782 auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
783 {"k1", "k2"}, &values);
784 ASSERT_EQ(2, statuses.size());
785 ASSERT_TRUE(statuses[0].IsNotSupported());
786 ASSERT_TRUE(statuses[1].IsNotSupported());
787 ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
788 delete handle;
789 }
790
TEST_F(BlobDBTest,GetLiveFilesMetaData)791 TEST_F(BlobDBTest, GetLiveFilesMetaData) {
792 Random rnd(301);
793 BlobDBOptions bdb_options;
794 bdb_options.blob_dir = "blob_dir";
795 bdb_options.path_relative = true;
796 bdb_options.min_blob_size = 0;
797 bdb_options.disable_background_tasks = true;
798 Open(bdb_options);
799 std::map<std::string, std::string> data;
800 for (size_t i = 0; i < 100; i++) {
801 PutRandom("key" + ToString(i), &rnd, &data);
802 }
803 std::vector<LiveFileMetaData> metadata;
804 blob_db_->GetLiveFilesMetaData(&metadata);
805 ASSERT_EQ(1U, metadata.size());
806 // Path should be relative to db_name, but begin with slash.
807 std::string filename = "/blob_dir/000001.blob";
808 ASSERT_EQ(filename, metadata[0].name);
809 ASSERT_EQ(1, metadata[0].file_number);
810 ASSERT_EQ("default", metadata[0].column_family_name);
811 std::vector<std::string> livefile;
812 uint64_t mfs;
813 ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
814 ASSERT_EQ(4U, livefile.size());
815 ASSERT_EQ(filename, livefile[3]);
816 VerifyDB(data);
817 }
818
TEST_F(BlobDBTest,MigrateFromPlainRocksDB)819 TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
820 constexpr size_t kNumKey = 20;
821 constexpr size_t kNumIteration = 10;
822 Random rnd(301);
823 std::map<std::string, std::string> data;
824 std::vector<bool> is_blob(kNumKey, false);
825
826 // Write to plain rocksdb.
827 Options options;
828 options.create_if_missing = true;
829 DB *db = nullptr;
830 ASSERT_OK(DB::Open(options, dbname_, &db));
831 for (size_t i = 0; i < kNumIteration; i++) {
832 auto key_index = rnd.Next() % kNumKey;
833 std::string key = "key" + ToString(key_index);
834 PutRandom(db, key, &rnd, &data);
835 }
836 VerifyDB(db, data);
837 delete db;
838 db = nullptr;
839
840 // Open as blob db. Verify it can read existing data.
841 Open();
842 VerifyDB(blob_db_, data);
843 for (size_t i = 0; i < kNumIteration; i++) {
844 auto key_index = rnd.Next() % kNumKey;
845 std::string key = "key" + ToString(key_index);
846 is_blob[key_index] = true;
847 PutRandom(blob_db_, key, &rnd, &data);
848 }
849 VerifyDB(blob_db_, data);
850 delete blob_db_;
851 blob_db_ = nullptr;
852
853 // Verify plain db return error for keys written by blob db.
854 ASSERT_OK(DB::Open(options, dbname_, &db));
855 std::string value;
856 for (size_t i = 0; i < kNumKey; i++) {
857 std::string key = "key" + ToString(i);
858 Status s = db->Get(ReadOptions(), key, &value);
859 if (data.count(key) == 0) {
860 ASSERT_TRUE(s.IsNotFound());
861 } else if (is_blob[i]) {
862 ASSERT_TRUE(s.IsNotSupported());
863 } else {
864 ASSERT_OK(s);
865 ASSERT_EQ(data[key], value);
866 }
867 }
868 delete db;
869 }
870
871 // Test to verify that a NoSpace IOError Status is returned on reaching
872 // max_db_size limit.
TEST_F(BlobDBTest,OutOfSpace)873 TEST_F(BlobDBTest, OutOfSpace) {
874 // Use mock env to stop wall clock.
875 Options options;
876 options.env = mock_env_.get();
877 BlobDBOptions bdb_options;
878 bdb_options.max_db_size = 200;
879 bdb_options.is_fifo = false;
880 bdb_options.disable_background_tasks = true;
881 Open(bdb_options);
882
883 // Each stored blob has an overhead of about 42 bytes currently.
884 // So a small key + a 100 byte blob should take up ~150 bytes in the db.
885 std::string value(100, 'v');
886 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
887
888 // Putting another blob should fail as ading it would exceed the max_db_size
889 // limit.
890 Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
891 ASSERT_TRUE(s.IsIOError());
892 ASSERT_TRUE(s.IsNoSpace());
893 }
894
TEST_F(BlobDBTest,FIFOEviction)895 TEST_F(BlobDBTest, FIFOEviction) {
896 BlobDBOptions bdb_options;
897 bdb_options.max_db_size = 200;
898 bdb_options.blob_file_size = 100;
899 bdb_options.is_fifo = true;
900 bdb_options.disable_background_tasks = true;
901 Open(bdb_options);
902
903 std::atomic<int> evict_count{0};
904 SyncPoint::GetInstance()->SetCallBack(
905 "BlobDBImpl::EvictOldestBlobFile:Evicted",
906 [&](void *) { evict_count++; });
907 SyncPoint::GetInstance()->EnableProcessing();
908
909 // Each stored blob has an overhead of 32 bytes currently.
910 // So a 100 byte blob should take up 132 bytes.
911 std::string value(100, 'v');
912 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
913 VerifyDB({{"key1", value}});
914
915 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
916
917 // Adding another 100 bytes blob would take the total size to 264 bytes
918 // (2*132). max_db_size will be exceeded
919 // than max_db_size and trigger FIFO eviction.
920 ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
921 ASSERT_EQ(1, evict_count);
922 // key1 will exist until corresponding file be deleted.
923 VerifyDB({{"key1", value}, {"key2", value}});
924
925 // Adding another 100 bytes blob without TTL.
926 ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
927 ASSERT_EQ(2, evict_count);
928 // key1 and key2 will exist until corresponding file be deleted.
929 VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
930
931 // The fourth blob file, without TTL.
932 ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
933 ASSERT_EQ(3, evict_count);
934 VerifyDB(
935 {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
936
937 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
938 ASSERT_EQ(4, blob_files.size());
939 ASSERT_TRUE(blob_files[0]->Obsolete());
940 ASSERT_TRUE(blob_files[1]->Obsolete());
941 ASSERT_TRUE(blob_files[2]->Obsolete());
942 ASSERT_FALSE(blob_files[3]->Obsolete());
943 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
944 ASSERT_EQ(3, obsolete_files.size());
945 ASSERT_EQ(blob_files[0], obsolete_files[0]);
946 ASSERT_EQ(blob_files[1], obsolete_files[1]);
947 ASSERT_EQ(blob_files[2], obsolete_files[2]);
948
949 blob_db_impl()->TEST_DeleteObsoleteFiles();
950 obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
951 ASSERT_TRUE(obsolete_files.empty());
952 VerifyDB({{"key4", value}});
953 }
954
TEST_F(BlobDBTest,FIFOEviction_NoOldestFileToEvict)955 TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
956 Options options;
957 BlobDBOptions bdb_options;
958 bdb_options.max_db_size = 1000;
959 bdb_options.blob_file_size = 5000;
960 bdb_options.is_fifo = true;
961 bdb_options.disable_background_tasks = true;
962 Open(bdb_options);
963
964 std::atomic<int> evict_count{0};
965 SyncPoint::GetInstance()->SetCallBack(
966 "BlobDBImpl::EvictOldestBlobFile:Evicted",
967 [&](void *) { evict_count++; });
968 SyncPoint::GetInstance()->EnableProcessing();
969
970 std::string value(2000, 'v');
971 ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
972 ASSERT_EQ(0, evict_count);
973 }
974
TEST_F(BlobDBTest,FIFOEviction_NoEnoughBlobFilesToEvict)975 TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
976 BlobDBOptions bdb_options;
977 bdb_options.is_fifo = true;
978 bdb_options.min_blob_size = 100;
979 bdb_options.disable_background_tasks = true;
980 Options options;
981 // Use mock env to stop wall clock.
982 options.env = mock_env_.get();
983 options.disable_auto_compactions = true;
984 auto statistics = CreateDBStatistics();
985 options.statistics = statistics;
986 Open(bdb_options, options);
987
988 ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
989 std::string small_value(50, 'v');
990 std::map<std::string, std::string> data;
991 // Insert some data into LSM tree to make sure FIFO eviction take SST
992 // file size into account.
993 for (int i = 0; i < 1000; i++) {
994 ASSERT_OK(Put("key" + ToString(i), small_value, &data));
995 }
996 ASSERT_OK(blob_db_->Flush(FlushOptions()));
997 uint64_t live_sst_size = 0;
998 ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
999 &live_sst_size));
1000 ASSERT_TRUE(live_sst_size > 0);
1001 ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1002
1003 bdb_options.max_db_size = live_sst_size + 2000;
1004 Reopen(bdb_options, options);
1005 ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
1006
1007 std::string value_1k(1000, 'v');
1008 ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
1009 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1010 VerifyDB(data);
1011 // large_key2 evicts large_key1
1012 ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
1013 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1014 blob_db_impl()->TEST_DeleteObsoleteFiles();
1015 data.erase("large_key1");
1016 VerifyDB(data);
1017 // large_key3 get no enough space even after evicting large_key2, so it
1018 // instead return no space error.
1019 std::string value_2k(2000, 'v');
1020 ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
1021 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1022 // Verify large_key2 still exists.
1023 VerifyDB(data);
1024 }
1025
1026 // Test flush or compaction will trigger FIFO eviction since they update
1027 // total SST file size.
TEST_F(BlobDBTest,FIFOEviction_TriggerOnSSTSizeChange)1028 TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
1029 BlobDBOptions bdb_options;
1030 bdb_options.max_db_size = 1000;
1031 bdb_options.is_fifo = true;
1032 bdb_options.min_blob_size = 100;
1033 bdb_options.disable_background_tasks = true;
1034 Options options;
1035 // Use mock env to stop wall clock.
1036 options.env = mock_env_.get();
1037 auto statistics = CreateDBStatistics();
1038 options.statistics = statistics;
1039 options.compression = kNoCompression;
1040 Open(bdb_options, options);
1041
1042 std::string value(800, 'v');
1043 ASSERT_OK(PutWithTTL("large_key", value, 60));
1044 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1045 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1046 VerifyDB({{"large_key", value}});
1047
1048 // Insert some small keys and flush to bring DB out of space.
1049 std::map<std::string, std::string> data;
1050 for (int i = 0; i < 10; i++) {
1051 ASSERT_OK(Put("key" + ToString(i), "v", &data));
1052 }
1053 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1054
1055 // Verify large_key is deleted by FIFO eviction.
1056 blob_db_impl()->TEST_DeleteObsoleteFiles();
1057 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1058 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1059 VerifyDB(data);
1060 }
1061
TEST_F(BlobDBTest,InlineSmallValues)1062 TEST_F(BlobDBTest, InlineSmallValues) {
1063 constexpr uint64_t kMaxExpiration = 1000;
1064 Random rnd(301);
1065 BlobDBOptions bdb_options;
1066 bdb_options.ttl_range_secs = kMaxExpiration;
1067 bdb_options.min_blob_size = 100;
1068 bdb_options.blob_file_size = 256 * 1000 * 1000;
1069 bdb_options.disable_background_tasks = true;
1070 Options options;
1071 options.env = mock_env_.get();
1072 mock_env_->set_current_time(0);
1073 Open(bdb_options, options);
1074 std::map<std::string, std::string> data;
1075 std::map<std::string, KeyVersion> versions;
1076 for (size_t i = 0; i < 1000; i++) {
1077 bool is_small_value = rnd.Next() % 2;
1078 bool has_ttl = rnd.Next() % 2;
1079 uint64_t expiration = rnd.Next() % kMaxExpiration;
1080 int len = is_small_value ? 50 : 200;
1081 std::string key = "key" + ToString(i);
1082 std::string value = test::RandomHumanReadableString(&rnd, len);
1083 std::string blob_index;
1084 data[key] = value;
1085 SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1086 if (!has_ttl) {
1087 ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
1088 } else {
1089 ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
1090 }
1091 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1092 versions[key] =
1093 KeyVersion(key, value, sequence,
1094 (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
1095 }
1096 VerifyDB(data);
1097 VerifyBaseDB(versions);
1098 auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
1099 auto blob_files = bdb_impl->TEST_GetBlobFiles();
1100 ASSERT_EQ(2, blob_files.size());
1101 std::shared_ptr<BlobFile> non_ttl_file;
1102 std::shared_ptr<BlobFile> ttl_file;
1103 if (blob_files[0]->HasTTL()) {
1104 ttl_file = blob_files[0];
1105 non_ttl_file = blob_files[1];
1106 } else {
1107 non_ttl_file = blob_files[0];
1108 ttl_file = blob_files[1];
1109 }
1110 ASSERT_FALSE(non_ttl_file->HasTTL());
1111 ASSERT_TRUE(ttl_file->HasTTL());
1112 }
1113
TEST_F(BlobDBTest,CompactionFilterNotSupported)1114 TEST_F(BlobDBTest, CompactionFilterNotSupported) {
1115 class TestCompactionFilter : public CompactionFilter {
1116 const char *Name() const override { return "TestCompactionFilter"; }
1117 };
1118 class TestCompactionFilterFactory : public CompactionFilterFactory {
1119 const char *Name() const override { return "TestCompactionFilterFactory"; }
1120 std::unique_ptr<CompactionFilter> CreateCompactionFilter(
1121 const CompactionFilter::Context & /*context*/) override {
1122 return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
1123 }
1124 };
1125 for (int i = 0; i < 2; i++) {
1126 Options options;
1127 if (i == 0) {
1128 options.compaction_filter = new TestCompactionFilter();
1129 } else {
1130 options.compaction_filter_factory.reset(
1131 new TestCompactionFilterFactory());
1132 }
1133 ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
1134 delete options.compaction_filter;
1135 }
1136 }
1137
1138 // Test comapction filter should remove any expired blob index.
TEST_F(BlobDBTest,FilterExpiredBlobIndex)1139 TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
1140 constexpr size_t kNumKeys = 100;
1141 constexpr size_t kNumPuts = 1000;
1142 constexpr uint64_t kMaxExpiration = 1000;
1143 constexpr uint64_t kCompactTime = 500;
1144 constexpr uint64_t kMinBlobSize = 100;
1145 Random rnd(301);
1146 mock_env_->set_current_time(0);
1147 BlobDBOptions bdb_options;
1148 bdb_options.min_blob_size = kMinBlobSize;
1149 bdb_options.disable_background_tasks = true;
1150 Options options;
1151 options.env = mock_env_.get();
1152 Open(bdb_options, options);
1153
1154 std::map<std::string, std::string> data;
1155 std::map<std::string, std::string> data_after_compact;
1156 for (size_t i = 0; i < kNumPuts; i++) {
1157 bool is_small_value = rnd.Next() % 2;
1158 bool has_ttl = rnd.Next() % 2;
1159 uint64_t expiration = rnd.Next() % kMaxExpiration;
1160 int len = is_small_value ? 10 : 200;
1161 std::string key = "key" + ToString(rnd.Next() % kNumKeys);
1162 std::string value = test::RandomHumanReadableString(&rnd, len);
1163 if (!has_ttl) {
1164 if (is_small_value) {
1165 std::string blob_entry;
1166 BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
1167 // Fake blob index with TTL. See what it will do.
1168 ASSERT_GT(kMinBlobSize, blob_entry.size());
1169 value = blob_entry;
1170 }
1171 ASSERT_OK(Put(key, value));
1172 data_after_compact[key] = value;
1173 } else {
1174 ASSERT_OK(PutUntil(key, value, expiration));
1175 if (expiration <= kCompactTime) {
1176 data_after_compact.erase(key);
1177 } else {
1178 data_after_compact[key] = value;
1179 }
1180 }
1181 data[key] = value;
1182 }
1183 VerifyDB(data);
1184
1185 mock_env_->set_current_time(kCompactTime);
1186 // Take a snapshot before compaction. Make sure expired blob indexes is
1187 // filtered regardless of snapshot.
1188 const Snapshot *snapshot = blob_db_->GetSnapshot();
1189 // Issue manual compaction to trigger compaction filter.
1190 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1191 blob_db_->ReleaseSnapshot(snapshot);
1192 // Verify expired blob index are filtered.
1193 std::vector<KeyVersion> versions;
1194 const size_t kMaxKeys = 10000;
1195 GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
1196 ASSERT_EQ(data_after_compact.size(), versions.size());
1197 for (auto &version : versions) {
1198 ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
1199 }
1200 VerifyDB(data_after_compact);
1201 }
1202
1203 // Test compaction filter should remove any blob index where corresponding
1204 // blob file has been removed.
TEST_F(BlobDBTest,FilterFileNotAvailable)1205 TEST_F(BlobDBTest, FilterFileNotAvailable) {
1206 BlobDBOptions bdb_options;
1207 bdb_options.min_blob_size = 0;
1208 bdb_options.disable_background_tasks = true;
1209 Options options;
1210 options.disable_auto_compactions = true;
1211 Open(bdb_options, options);
1212
1213 ASSERT_OK(Put("foo", "v1"));
1214 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1215 ASSERT_EQ(1, blob_files.size());
1216 ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
1217 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
1218
1219 ASSERT_OK(Put("bar", "v2"));
1220 blob_files = blob_db_impl()->TEST_GetBlobFiles();
1221 ASSERT_EQ(2, blob_files.size());
1222 ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
1223 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
1224
1225 const size_t kMaxKeys = 10000;
1226
1227 DB *base_db = blob_db_->GetRootDB();
1228 std::vector<KeyVersion> versions;
1229 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1230 ASSERT_EQ(2, versions.size());
1231 ASSERT_EQ("bar", versions[0].user_key);
1232 ASSERT_EQ("foo", versions[1].user_key);
1233 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1234
1235 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1236 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1237 ASSERT_EQ(2, versions.size());
1238 ASSERT_EQ("bar", versions[0].user_key);
1239 ASSERT_EQ("foo", versions[1].user_key);
1240 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1241
1242 // Remove the first blob file and compact. foo should be remove from base db.
1243 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
1244 blob_db_impl()->TEST_DeleteObsoleteFiles();
1245 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1246 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1247 ASSERT_EQ(1, versions.size());
1248 ASSERT_EQ("bar", versions[0].user_key);
1249 VerifyDB({{"bar", "v2"}});
1250
1251 // Remove the second blob file and compact. bar should be remove from base db.
1252 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
1253 blob_db_impl()->TEST_DeleteObsoleteFiles();
1254 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1255 ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
1256 ASSERT_EQ(0, versions.size());
1257 VerifyDB({});
1258 }
1259
1260 // Test compaction filter should filter any inlined TTL keys that would have
1261 // been dropped by last FIFO eviction if they are store out-of-line.
TEST_F(BlobDBTest,FilterForFIFOEviction)1262 TEST_F(BlobDBTest, FilterForFIFOEviction) {
1263 Random rnd(215);
1264 BlobDBOptions bdb_options;
1265 bdb_options.min_blob_size = 100;
1266 bdb_options.ttl_range_secs = 60;
1267 bdb_options.max_db_size = 0;
1268 bdb_options.disable_background_tasks = true;
1269 Options options;
1270 // Use mock env to stop wall clock.
1271 mock_env_->set_current_time(0);
1272 options.env = mock_env_.get();
1273 auto statistics = CreateDBStatistics();
1274 options.statistics = statistics;
1275 options.disable_auto_compactions = true;
1276 Open(bdb_options, options);
1277
1278 std::map<std::string, std::string> data;
1279 std::map<std::string, std::string> data_after_compact;
1280 // Insert some small values that will be inlined.
1281 for (int i = 0; i < 1000; i++) {
1282 std::string key = "key" + ToString(i);
1283 std::string value = test::RandomHumanReadableString(&rnd, 50);
1284 uint64_t ttl = rnd.Next() % 120 + 1;
1285 ASSERT_OK(PutWithTTL(key, value, ttl, &data));
1286 if (ttl >= 60) {
1287 data_after_compact[key] = value;
1288 }
1289 }
1290 uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
1291 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1292 uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
1293 ASSERT_GT(live_sst_size, 0);
1294 VerifyDB(data);
1295
1296 bdb_options.max_db_size = live_sst_size + 30000;
1297 bdb_options.is_fifo = true;
1298 Reopen(bdb_options, options);
1299 VerifyDB(data);
1300
1301 // Put two large values, each on a different blob file.
1302 std::string large_value(10000, 'v');
1303 ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
1304 ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
1305 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1306 ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1307 data["large_key1"] = large_value;
1308 data["large_key2"] = large_value;
1309 VerifyDB(data);
1310
1311 // Put a third large value which will bring the DB out of space.
1312 // FIFO eviction will evict the file of large_key1.
1313 ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
1314 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1315 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1316 blob_db_impl()->TEST_DeleteObsoleteFiles();
1317 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1318 data.erase("large_key1");
1319 data["large_key3"] = large_value;
1320 VerifyDB(data);
1321
1322 // Putting some more small values. These values shouldn't be evicted by
1323 // compaction filter since they are inserted after FIFO eviction.
1324 ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
1325 ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
1326
1327 // FIFO eviction doesn't trigger again since there enough room for the flush.
1328 ASSERT_OK(blob_db_->Flush(FlushOptions()));
1329 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1330
1331 // Manual compact and check if compaction filter evict those keys with
1332 // expiration < 60.
1333 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1334 // All keys with expiration < 60, plus large_key1 is filtered by
1335 // compaction filter.
1336 ASSERT_EQ(num_keys_to_evict + 1,
1337 statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
1338 ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
1339 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1340 data_after_compact["large_key2"] = large_value;
1341 data_after_compact["large_key3"] = large_value;
1342 VerifyDB(data_after_compact);
1343 }
1344
TEST_F(BlobDBTest,GarbageCollection)1345 TEST_F(BlobDBTest, GarbageCollection) {
1346 constexpr size_t kNumPuts = 1 << 10;
1347
1348 constexpr uint64_t kExpiration = 1000;
1349 constexpr uint64_t kCompactTime = 500;
1350
1351 constexpr uint64_t kKeySize = 7; // "key" + 4 digits
1352
1353 constexpr uint64_t kSmallValueSize = 1 << 6;
1354 constexpr uint64_t kLargeValueSize = 1 << 8;
1355 constexpr uint64_t kMinBlobSize = 1 << 7;
1356 static_assert(kSmallValueSize < kMinBlobSize, "");
1357 static_assert(kLargeValueSize > kMinBlobSize, "");
1358
1359 constexpr size_t kBlobsPerFile = 8;
1360 constexpr size_t kNumBlobFiles = kNumPuts / kBlobsPerFile;
1361 constexpr uint64_t kBlobFileSize =
1362 BlobLogHeader::kSize +
1363 (BlobLogRecord::kHeaderSize + kKeySize + kLargeValueSize) * kBlobsPerFile;
1364
1365 BlobDBOptions bdb_options;
1366 bdb_options.min_blob_size = kMinBlobSize;
1367 bdb_options.blob_file_size = kBlobFileSize;
1368 bdb_options.enable_garbage_collection = true;
1369 bdb_options.garbage_collection_cutoff = 0.25;
1370 bdb_options.disable_background_tasks = true;
1371
1372 Options options;
1373 options.env = mock_env_.get();
1374 options.statistics = CreateDBStatistics();
1375
1376 Open(bdb_options, options);
1377
1378 std::map<std::string, std::string> data;
1379 std::map<std::string, KeyVersion> blob_value_versions;
1380 std::map<std::string, BlobIndexVersion> blob_index_versions;
1381
1382 Random rnd(301);
1383
1384 // Add a bunch of large non-TTL values. These will be written to non-TTL
1385 // blob files and will be subject to GC.
1386 for (size_t i = 0; i < kNumPuts; ++i) {
1387 std::ostringstream oss;
1388 oss << "key" << std::setw(4) << std::setfill('0') << i;
1389
1390 const std::string key(oss.str());
1391 const std::string value(
1392 test::RandomHumanReadableString(&rnd, kLargeValueSize));
1393 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1394
1395 ASSERT_OK(Put(key, value));
1396 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1397
1398 data[key] = value;
1399 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1400 blob_index_versions[key] =
1401 BlobIndexVersion(key, /* file_number */ (i >> 3) + 1, kNoExpiration,
1402 sequence, kTypeBlobIndex);
1403 }
1404
1405 // Add some small and/or TTL values that will be ignored during GC.
1406 // First, add a large TTL value will be written to its own TTL blob file.
1407 {
1408 const std::string key("key2000");
1409 const std::string value(
1410 test::RandomHumanReadableString(&rnd, kLargeValueSize));
1411 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1412
1413 ASSERT_OK(PutUntil(key, value, kExpiration));
1414 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1415
1416 data[key] = value;
1417 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1418 blob_index_versions[key] =
1419 BlobIndexVersion(key, /* file_number */ kNumBlobFiles + 1, kExpiration,
1420 sequence, kTypeBlobIndex);
1421 }
1422
1423 // Now add a small TTL value (which will be inlined).
1424 {
1425 const std::string key("key3000");
1426 const std::string value(
1427 test::RandomHumanReadableString(&rnd, kSmallValueSize));
1428 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1429
1430 ASSERT_OK(PutUntil(key, value, kExpiration));
1431 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1432
1433 data[key] = value;
1434 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeBlobIndex);
1435 blob_index_versions[key] = BlobIndexVersion(
1436 key, kInvalidBlobFileNumber, kExpiration, sequence, kTypeBlobIndex);
1437 }
1438
1439 // Finally, add a small non-TTL value (which will be stored as a regular
1440 // value).
1441 {
1442 const std::string key("key4000");
1443 const std::string value(
1444 test::RandomHumanReadableString(&rnd, kSmallValueSize));
1445 const SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
1446
1447 ASSERT_OK(Put(key, value));
1448 ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
1449
1450 data[key] = value;
1451 blob_value_versions[key] = KeyVersion(key, value, sequence, kTypeValue);
1452 blob_index_versions[key] = BlobIndexVersion(
1453 key, kInvalidBlobFileNumber, kNoExpiration, sequence, kTypeValue);
1454 }
1455
1456 VerifyDB(data);
1457 VerifyBaseDB(blob_value_versions);
1458 VerifyBaseDBBlobIndex(blob_index_versions);
1459
1460 // At this point, we should have 128 immutable non-TTL files with file numbers
1461 // 1..128.
1462 {
1463 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1464 ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1465 for (size_t i = 0; i < kNumBlobFiles; ++i) {
1466 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1467 ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1468 kBlobFileSize + BlobLogFooter::kSize);
1469 }
1470 }
1471
1472 mock_env_->set_current_time(kCompactTime);
1473
1474 ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1475
1476 // We expect the data to remain the same and the blobs from the oldest N files
1477 // to be moved to new files. Sequence numbers get zeroed out during the
1478 // compaction.
1479 VerifyDB(data);
1480
1481 for (auto &pair : blob_value_versions) {
1482 KeyVersion &version = pair.second;
1483 version.sequence = 0;
1484 }
1485
1486 VerifyBaseDB(blob_value_versions);
1487
1488 const uint64_t cutoff = static_cast<uint64_t>(
1489 bdb_options.garbage_collection_cutoff * kNumBlobFiles);
1490 for (auto &pair : blob_index_versions) {
1491 BlobIndexVersion &version = pair.second;
1492
1493 version.sequence = 0;
1494
1495 if (version.file_number == kInvalidBlobFileNumber) {
1496 continue;
1497 }
1498
1499 if (version.file_number > cutoff) {
1500 continue;
1501 }
1502
1503 version.file_number += kNumBlobFiles + 1;
1504 }
1505
1506 VerifyBaseDBBlobIndex(blob_index_versions);
1507
1508 const Statistics *const statistics = options.statistics.get();
1509 assert(statistics);
1510
1511 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
1512 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
1513 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
1514 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
1515 cutoff * kBlobsPerFile);
1516 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
1517 cutoff * kBlobsPerFile * kLargeValueSize);
1518
1519 // At this point, we should have 128 immutable non-TTL files with file numbers
1520 // 33..128 and 130..161. (129 was taken by the TTL blob file.)
1521 {
1522 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1523 ASSERT_EQ(live_imm_files.size(), kNumBlobFiles);
1524 for (size_t i = 0; i < kNumBlobFiles; ++i) {
1525 uint64_t expected_file_number = i + cutoff + 1;
1526 if (expected_file_number > kNumBlobFiles) {
1527 ++expected_file_number;
1528 }
1529
1530 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), expected_file_number);
1531 ASSERT_EQ(live_imm_files[i]->GetFileSize(),
1532 kBlobFileSize + BlobLogFooter::kSize);
1533 }
1534 }
1535 }
1536
TEST_F(BlobDBTest,GarbageCollectionFailure)1537 TEST_F(BlobDBTest, GarbageCollectionFailure) {
1538 BlobDBOptions bdb_options;
1539 bdb_options.min_blob_size = 0;
1540 bdb_options.enable_garbage_collection = true;
1541 bdb_options.garbage_collection_cutoff = 1.0;
1542 bdb_options.disable_background_tasks = true;
1543
1544 Options db_options;
1545 db_options.statistics = CreateDBStatistics();
1546
1547 Open(bdb_options, db_options);
1548
1549 // Write a couple of valid blobs.
1550 Put("foo", "bar");
1551 Put("dead", "beef");
1552
1553 // Write a fake blob reference into the base DB that cannot be parsed.
1554 WriteBatch batch;
1555 ASSERT_OK(WriteBatchInternal::PutBlobIndex(
1556 &batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
1557 "not a valid blob index"));
1558 ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
1559
1560 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1561 ASSERT_EQ(blob_files.size(), 1);
1562 auto blob_file = blob_files[0];
1563 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1564
1565 ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
1566 .IsCorruption());
1567
1568 const Statistics *const statistics = db_options.statistics.get();
1569 assert(statistics);
1570
1571 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
1572 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
1573 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
1574 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
1575 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
1576 }
1577
1578 // File should be evicted after expiration.
TEST_F(BlobDBTest,EvictExpiredFile)1579 TEST_F(BlobDBTest, EvictExpiredFile) {
1580 BlobDBOptions bdb_options;
1581 bdb_options.ttl_range_secs = 100;
1582 bdb_options.min_blob_size = 0;
1583 bdb_options.disable_background_tasks = true;
1584 Options options;
1585 options.env = mock_env_.get();
1586 Open(bdb_options, options);
1587 mock_env_->set_current_time(50);
1588 std::map<std::string, std::string> data;
1589 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
1590 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1591 ASSERT_EQ(1, blob_files.size());
1592 auto blob_file = blob_files[0];
1593 ASSERT_FALSE(blob_file->Immutable());
1594 ASSERT_FALSE(blob_file->Obsolete());
1595 VerifyDB(data);
1596 mock_env_->set_current_time(250);
1597 // The key should expired now.
1598 blob_db_impl()->TEST_EvictExpiredFiles();
1599 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1600 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1601 ASSERT_TRUE(blob_file->Immutable());
1602 ASSERT_TRUE(blob_file->Obsolete());
1603 blob_db_impl()->TEST_DeleteObsoleteFiles();
1604 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1605 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1606 // Make sure we don't return garbage value after blob file being evicted,
1607 // but the blob index still exists in the LSM tree.
1608 std::string val = "";
1609 ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
1610 ASSERT_EQ("", val);
1611 }
1612
TEST_F(BlobDBTest,DisableFileDeletions)1613 TEST_F(BlobDBTest, DisableFileDeletions) {
1614 BlobDBOptions bdb_options;
1615 bdb_options.disable_background_tasks = true;
1616 Open(bdb_options);
1617 std::map<std::string, std::string> data;
1618 for (bool force : {true, false}) {
1619 ASSERT_OK(Put("foo", "v", &data));
1620 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1621 ASSERT_EQ(1, blob_files.size());
1622 auto blob_file = blob_files[0];
1623 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
1624 blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
1625 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1626 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1627 // Call DisableFileDeletions twice.
1628 ASSERT_OK(blob_db_->DisableFileDeletions());
1629 ASSERT_OK(blob_db_->DisableFileDeletions());
1630 // File deletions should be disabled.
1631 blob_db_impl()->TEST_DeleteObsoleteFiles();
1632 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1633 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1634 VerifyDB(data);
1635 // Enable file deletions once. If force=true, file deletion is enabled.
1636 // Otherwise it needs to enable it for a second time.
1637 ASSERT_OK(blob_db_->EnableFileDeletions(force));
1638 blob_db_impl()->TEST_DeleteObsoleteFiles();
1639 if (!force) {
1640 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1641 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1642 VerifyDB(data);
1643 // Call EnableFileDeletions a second time.
1644 ASSERT_OK(blob_db_->EnableFileDeletions(false));
1645 blob_db_impl()->TEST_DeleteObsoleteFiles();
1646 }
1647 // Regardless of value of `force`, file should be deleted by now.
1648 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1649 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1650 VerifyDB({});
1651 }
1652 }
1653
TEST_F(BlobDBTest,MaintainBlobFileToSstMapping)1654 TEST_F(BlobDBTest, MaintainBlobFileToSstMapping) {
1655 BlobDBOptions bdb_options;
1656 bdb_options.enable_garbage_collection = true;
1657 bdb_options.disable_background_tasks = true;
1658 Open(bdb_options);
1659
1660 // Register some dummy blob files.
1661 blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
1662 blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
1663 blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
1664 blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
1665 blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
1666
1667 // Initialize the blob <-> SST file mapping. First, add some SST files with
1668 // blob file references, then some without.
1669 std::vector<LiveFileMetaData> live_files;
1670
1671 for (uint64_t i = 1; i <= 10; ++i) {
1672 LiveFileMetaData live_file;
1673 live_file.file_number = i;
1674 live_file.oldest_blob_file_number = ((i - 1) % 5) + 1;
1675
1676 live_files.emplace_back(live_file);
1677 }
1678
1679 for (uint64_t i = 11; i <= 20; ++i) {
1680 LiveFileMetaData live_file;
1681 live_file.file_number = i;
1682
1683 live_files.emplace_back(live_file);
1684 }
1685
1686 blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files);
1687
1688 // Check that the blob <-> SST mappings have been correctly initialized.
1689 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1690
1691 ASSERT_EQ(blob_files.size(), 5);
1692
1693 {
1694 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1695 ASSERT_EQ(live_imm_files.size(), 5);
1696 for (size_t i = 0; i < 5; ++i) {
1697 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1698 }
1699
1700 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1701 }
1702
1703 {
1704 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1705 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
1706 const std::vector<bool> expected_obsolete{false, false, false, false,
1707 false};
1708 for (size_t i = 0; i < 5; ++i) {
1709 const auto &blob_file = blob_files[i];
1710 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1711 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1712 }
1713
1714 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1715 ASSERT_EQ(live_imm_files.size(), 5);
1716 for (size_t i = 0; i < 5; ++i) {
1717 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1718 }
1719
1720 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1721 }
1722
1723 // Simulate a flush where the SST does not reference any blob files.
1724 {
1725 FlushJobInfo info{};
1726 info.file_number = 21;
1727 info.smallest_seqno = 1;
1728 info.largest_seqno = 100;
1729
1730 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
1731
1732 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1733 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
1734 const std::vector<bool> expected_obsolete{false, false, false, false,
1735 false};
1736 for (size_t i = 0; i < 5; ++i) {
1737 const auto &blob_file = blob_files[i];
1738 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1739 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1740 }
1741
1742 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1743 ASSERT_EQ(live_imm_files.size(), 5);
1744 for (size_t i = 0; i < 5; ++i) {
1745 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1746 }
1747
1748 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1749 }
1750
1751 // Simulate a flush where the SST references a blob file.
1752 {
1753 FlushJobInfo info{};
1754 info.file_number = 22;
1755 info.oldest_blob_file_number = 5;
1756 info.smallest_seqno = 101;
1757 info.largest_seqno = 200;
1758
1759 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
1760
1761 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1762 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
1763 const std::vector<bool> expected_obsolete{false, false, false, false,
1764 false};
1765 for (size_t i = 0; i < 5; ++i) {
1766 const auto &blob_file = blob_files[i];
1767 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1768 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1769 }
1770
1771 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1772 ASSERT_EQ(live_imm_files.size(), 5);
1773 for (size_t i = 0; i < 5; ++i) {
1774 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 1);
1775 }
1776
1777 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
1778 }
1779
1780 // Simulate a compaction. Some inputs and outputs have blob file references,
1781 // some don't. There is also a trivial move (which means the SST appears on
1782 // both the input and the output list). Blob file 1 loses all its linked SSTs,
1783 // and since it got marked immutable at sequence number 200 which has already
1784 // been flushed, it can be marked obsolete.
1785 {
1786 CompactionJobInfo info{};
1787 info.input_file_infos.emplace_back(CompactionFileInfo{1, 1, 1});
1788 info.input_file_infos.emplace_back(CompactionFileInfo{1, 2, 2});
1789 info.input_file_infos.emplace_back(CompactionFileInfo{1, 6, 1});
1790 info.input_file_infos.emplace_back(
1791 CompactionFileInfo{1, 11, kInvalidBlobFileNumber});
1792 info.input_file_infos.emplace_back(CompactionFileInfo{1, 22, 5});
1793 info.output_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
1794 info.output_file_infos.emplace_back(CompactionFileInfo{2, 23, 3});
1795 info.output_file_infos.emplace_back(
1796 CompactionFileInfo{2, 24, kInvalidBlobFileNumber});
1797
1798 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
1799
1800 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1801 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
1802 const std::vector<bool> expected_obsolete{true, false, false, false, false};
1803 for (size_t i = 0; i < 5; ++i) {
1804 const auto &blob_file = blob_files[i];
1805 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1806 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1807 }
1808
1809 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1810 ASSERT_EQ(live_imm_files.size(), 4);
1811 for (size_t i = 0; i < 4; ++i) {
1812 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
1813 }
1814
1815 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1816 ASSERT_EQ(obsolete_files.size(), 1);
1817 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1818 }
1819
1820 // Simulate a failed compaction. No mappings should be updated.
1821 {
1822 CompactionJobInfo info{};
1823 info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
1824 info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
1825 info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
1826 info.status = Status::Corruption();
1827
1828 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
1829
1830 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1831 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
1832 const std::vector<bool> expected_obsolete{true, false, false, false, false};
1833 for (size_t i = 0; i < 5; ++i) {
1834 const auto &blob_file = blob_files[i];
1835 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1836 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1837 }
1838
1839 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1840 ASSERT_EQ(live_imm_files.size(), 4);
1841 for (size_t i = 0; i < 4; ++i) {
1842 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
1843 }
1844
1845 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1846 ASSERT_EQ(obsolete_files.size(), 1);
1847 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1848 }
1849
1850 // Simulate another compaction. Blob file 2 loses all its linked SSTs
1851 // but since it got marked immutable at sequence number 300 which hasn't
1852 // been flushed yet, it cannot be marked obsolete at this point.
1853 {
1854 CompactionJobInfo info{};
1855 info.input_file_infos.emplace_back(CompactionFileInfo{1, 7, 2});
1856 info.input_file_infos.emplace_back(CompactionFileInfo{2, 22, 5});
1857 info.output_file_infos.emplace_back(CompactionFileInfo{2, 25, 3});
1858
1859 blob_db_impl()->TEST_ProcessCompactionJobInfo(info);
1860
1861 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1862 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
1863 const std::vector<bool> expected_obsolete{true, false, false, false, false};
1864 for (size_t i = 0; i < 5; ++i) {
1865 const auto &blob_file = blob_files[i];
1866 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1867 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1868 }
1869
1870 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1871 ASSERT_EQ(live_imm_files.size(), 4);
1872 for (size_t i = 0; i < 4; ++i) {
1873 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 2);
1874 }
1875
1876 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1877 ASSERT_EQ(obsolete_files.size(), 1);
1878 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1879 }
1880
1881 // Simulate a flush with largest sequence number 300. This will make it
1882 // possible to mark blob file 2 obsolete.
1883 {
1884 FlushJobInfo info{};
1885 info.file_number = 26;
1886 info.smallest_seqno = 201;
1887 info.largest_seqno = 300;
1888
1889 blob_db_impl()->TEST_ProcessFlushJobInfo(info);
1890
1891 const std::vector<std::unordered_set<uint64_t>> expected_sst_files{
1892 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
1893 const std::vector<bool> expected_obsolete{true, true, false, false, false};
1894 for (size_t i = 0; i < 5; ++i) {
1895 const auto &blob_file = blob_files[i];
1896 ASSERT_EQ(blob_file->GetLinkedSstFiles(), expected_sst_files[i]);
1897 ASSERT_EQ(blob_file->Obsolete(), expected_obsolete[i]);
1898 }
1899
1900 auto live_imm_files = blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1901 ASSERT_EQ(live_imm_files.size(), 3);
1902 for (size_t i = 0; i < 3; ++i) {
1903 ASSERT_EQ(live_imm_files[i]->BlobFileNumber(), i + 3);
1904 }
1905
1906 auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
1907 ASSERT_EQ(obsolete_files.size(), 2);
1908 ASSERT_EQ(obsolete_files[0]->BlobFileNumber(), 1);
1909 ASSERT_EQ(obsolete_files[1]->BlobFileNumber(), 2);
1910 }
1911 }
1912
TEST_F(BlobDBTest,ShutdownWait)1913 TEST_F(BlobDBTest, ShutdownWait) {
1914 BlobDBOptions bdb_options;
1915 bdb_options.ttl_range_secs = 100;
1916 bdb_options.min_blob_size = 0;
1917 bdb_options.disable_background_tasks = false;
1918 Options options;
1919 options.env = mock_env_.get();
1920
1921 SyncPoint::GetInstance()->LoadDependency({
1922 {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
1923 {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
1924 {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
1925 {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
1926 });
1927 // Force all tasks to be scheduled immediately.
1928 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1929 "TimeQueue::Add:item.end", [&](void *arg) {
1930 std::chrono::steady_clock::time_point *tp =
1931 static_cast<std::chrono::steady_clock::time_point *>(arg);
1932 *tp =
1933 std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
1934 });
1935
1936 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
1937 "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
1938 // Sleep 3 ms to increase the chance of data race.
1939 // We've synced up the code so that EvictExpiredFiles()
1940 // is called concurrently with ~BlobDBImpl().
1941 // ~BlobDBImpl() is supposed to wait for all background
1942 // task to shutdown before doing anything else. In order
1943 // to use the same test to reproduce a bug of the waiting
1944 // logic, we wait a little bit here, so that TSAN can
1945 // catch the data race.
1946 // We should improve the test if we find a better way.
1947 Env::Default()->SleepForMicroseconds(3000);
1948 });
1949
1950 SyncPoint::GetInstance()->EnableProcessing();
1951
1952 Open(bdb_options, options);
1953 mock_env_->set_current_time(50);
1954 std::map<std::string, std::string> data;
1955 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
1956 auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
1957 ASSERT_EQ(1, blob_files.size());
1958 auto blob_file = blob_files[0];
1959 ASSERT_FALSE(blob_file->Immutable());
1960 ASSERT_FALSE(blob_file->Obsolete());
1961 VerifyDB(data);
1962
1963 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
1964 mock_env_->set_current_time(250);
1965 // The key should expired now.
1966 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
1967
1968 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
1969 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
1970 Close();
1971
1972 SyncPoint::GetInstance()->DisableProcessing();
1973 }
1974
1975 } // namespace blob_db
1976 } // namespace ROCKSDB_NAMESPACE
1977
1978 // A black-box test for the ttl wrapper around rocksdb
main(int argc,char ** argv)1979 int main(int argc, char** argv) {
1980 ::testing::InitGoogleTest(&argc, argv);
1981 return RUN_ALL_TESTS();
1982 }
1983
1984 #else
1985 #include <stdio.h>
1986
main(int,char **)1987 int main(int /*argc*/, char** /*argv*/) {
1988 fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
1989 return 0;
1990 }
1991
1992 #endif // !ROCKSDB_LITE
1993