1
2 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/blob_db/blob_db_impl.h"
9 #include <algorithm>
10 #include <cinttypes>
11 #include <iomanip>
12 #include <memory>
13 #include <sstream>
14
15 #include "db/blob/blob_index.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/write_batch_internal.h"
18 #include "env/composite_env_wrapper.h"
19 #include "file/file_util.h"
20 #include "file/filename.h"
21 #include "file/random_access_file_reader.h"
22 #include "file/sst_file_manager_impl.h"
23 #include "file/writable_file_writer.h"
24 #include "logging/logging.h"
25 #include "monitoring/instrumented_mutex.h"
26 #include "monitoring/statistics.h"
27 #include "rocksdb/convenience.h"
28 #include "rocksdb/env.h"
29 #include "rocksdb/iterator.h"
30 #include "rocksdb/utilities/stackable_db.h"
31 #include "rocksdb/utilities/transaction.h"
32 #include "table/block_based/block.h"
33 #include "table/block_based/block_based_table_builder.h"
34 #include "table/block_based/block_builder.h"
35 #include "table/meta_blocks.h"
36 #include "test_util/sync_point.h"
37 #include "util/cast_util.h"
38 #include "util/crc32c.h"
39 #include "util/mutexlock.h"
40 #include "util/random.h"
41 #include "util/stop_watch.h"
42 #include "util/timer_queue.h"
43 #include "utilities/blob_db/blob_compaction_filter.h"
44 #include "utilities/blob_db/blob_db_iterator.h"
45 #include "utilities/blob_db/blob_db_listener.h"
46
47 namespace {
48 int kBlockBasedTableVersionFormat = 2;
49 } // end namespace
50
51 namespace ROCKSDB_NAMESPACE {
52 namespace blob_db {
53
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const54 bool BlobFileComparator::operator()(
55 const std::shared_ptr<BlobFile>& lhs,
56 const std::shared_ptr<BlobFile>& rhs) const {
57 return lhs->BlobFileNumber() > rhs->BlobFileNumber();
58 }
59
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const60 bool BlobFileComparatorTTL::operator()(
61 const std::shared_ptr<BlobFile>& lhs,
62 const std::shared_ptr<BlobFile>& rhs) const {
63 assert(lhs->HasTTL() && rhs->HasTTL());
64 if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
65 return true;
66 }
67 if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
68 return false;
69 }
70 return lhs->BlobFileNumber() < rhs->BlobFileNumber();
71 }
72
BlobDBImpl(const std::string & dbname,const BlobDBOptions & blob_db_options,const DBOptions & db_options,const ColumnFamilyOptions & cf_options)73 BlobDBImpl::BlobDBImpl(const std::string& dbname,
74 const BlobDBOptions& blob_db_options,
75 const DBOptions& db_options,
76 const ColumnFamilyOptions& cf_options)
77 : BlobDB(),
78 dbname_(dbname),
79 db_impl_(nullptr),
80 env_(db_options.env),
81 bdb_options_(blob_db_options),
82 db_options_(db_options),
83 cf_options_(cf_options),
84 env_options_(db_options),
85 statistics_(db_options_.statistics.get()),
86 next_file_number_(1),
87 flush_sequence_(0),
88 closed_(true),
89 open_file_count_(0),
90 total_blob_size_(0),
91 live_sst_size_(0),
92 fifo_eviction_seq_(0),
93 evict_expiration_up_to_(0),
94 debug_level_(0) {
95 blob_dir_ = (bdb_options_.path_relative)
96 ? dbname + "/" + bdb_options_.blob_dir
97 : bdb_options_.blob_dir;
98 env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
99 }
100
~BlobDBImpl()101 BlobDBImpl::~BlobDBImpl() {
102 tqueue_.shutdown();
103 // CancelAllBackgroundWork(db_, true);
104 Status s __attribute__((__unused__)) = Close();
105 assert(s.ok());
106 }
107
Close()108 Status BlobDBImpl::Close() {
109 if (closed_) {
110 return Status::OK();
111 }
112 closed_ = true;
113
114 // Close base DB before BlobDBImpl destructs to stop event listener and
115 // compaction filter call.
116 Status s = db_->Close();
117 // delete db_ anyway even if close failed.
118 delete db_;
119 // Reset pointers to avoid StackableDB delete the pointer again.
120 db_ = nullptr;
121 db_impl_ = nullptr;
122 if (!s.ok()) {
123 return s;
124 }
125
126 s = SyncBlobFiles();
127 return s;
128 }
129
GetBlobDBOptions() const130 BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
131
Open(std::vector<ColumnFamilyHandle * > * handles)132 Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
133 assert(handles != nullptr);
134 assert(db_ == nullptr);
135
136 if (blob_dir_.empty()) {
137 return Status::NotSupported("No blob directory in options");
138 }
139
140 if (cf_options_.compaction_filter != nullptr ||
141 cf_options_.compaction_filter_factory != nullptr) {
142 return Status::NotSupported("Blob DB doesn't support compaction filter.");
143 }
144
145 if (bdb_options_.garbage_collection_cutoff < 0.0 ||
146 bdb_options_.garbage_collection_cutoff > 1.0) {
147 return Status::InvalidArgument(
148 "Garbage collection cutoff must be in the interval [0.0, 1.0]");
149 }
150
151 // Temporarily disable compactions in the base DB during open; save the user
152 // defined value beforehand so we can restore it once BlobDB is initialized.
153 // Note: this is only needed if garbage collection is enabled.
154 const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
155
156 if (bdb_options_.enable_garbage_collection) {
157 cf_options_.disable_auto_compactions = true;
158 }
159
160 Status s;
161
162 // Create info log.
163 if (db_options_.info_log == nullptr) {
164 s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
165 if (!s.ok()) {
166 return s;
167 }
168 }
169
170 ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
171
172 // Open blob directory.
173 s = env_->CreateDirIfMissing(blob_dir_);
174 if (!s.ok()) {
175 ROCKS_LOG_ERROR(db_options_.info_log,
176 "Failed to create blob_dir %s, status: %s",
177 blob_dir_.c_str(), s.ToString().c_str());
178 }
179 s = env_->NewDirectory(blob_dir_, &dir_ent_);
180 if (!s.ok()) {
181 ROCKS_LOG_ERROR(db_options_.info_log,
182 "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
183 s.ToString().c_str());
184 return s;
185 }
186
187 // Open blob files.
188 s = OpenAllBlobFiles();
189 if (!s.ok()) {
190 return s;
191 }
192
193 // Update options
194 if (bdb_options_.enable_garbage_collection) {
195 db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
196 cf_options_.compaction_filter_factory =
197 std::make_shared<BlobIndexCompactionFilterFactoryGC>(this, env_,
198 statistics_);
199 } else {
200 db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
201 cf_options_.compaction_filter_factory =
202 std::make_shared<BlobIndexCompactionFilterFactory>(this, env_,
203 statistics_);
204 }
205
206 // Open base db.
207 ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
208 s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
209 if (!s.ok()) {
210 return s;
211 }
212 db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
213
214 // Initialize SST file <-> oldest blob file mapping if garbage collection
215 // is enabled.
216 if (bdb_options_.enable_garbage_collection) {
217 std::vector<LiveFileMetaData> live_files;
218 db_->GetLiveFilesMetaData(&live_files);
219
220 InitializeBlobFileToSstMapping(live_files);
221
222 MarkUnreferencedBlobFilesObsoleteDuringOpen();
223
224 if (!disable_auto_compactions) {
225 s = db_->EnableAutoCompaction(*handles);
226 if (!s.ok()) {
227 ROCKS_LOG_ERROR(
228 db_options_.info_log,
229 "Failed to enable automatic compactions during open, status: %s",
230 s.ToString().c_str());
231 return s;
232 }
233 }
234 }
235
236 // Add trash files in blob dir to file delete scheduler.
237 SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
238 db_impl_->immutable_db_options().sst_file_manager.get());
239 DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
240
241 UpdateLiveSSTSize();
242
243 // Start background jobs.
244 if (!bdb_options_.disable_background_tasks) {
245 StartBackgroundTasks();
246 }
247
248 ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
249 bdb_options_.Dump(db_options_.info_log.get());
250 closed_ = false;
251 return s;
252 }
253
StartBackgroundTasks()254 void BlobDBImpl::StartBackgroundTasks() {
255 // store a call to a member function and object
256 tqueue_.add(
257 kReclaimOpenFilesPeriodMillisecs,
258 std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
259 tqueue_.add(
260 kDeleteObsoleteFilesPeriodMillisecs,
261 std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
262 tqueue_.add(kSanityCheckPeriodMillisecs,
263 std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
264 tqueue_.add(
265 kEvictExpiredFilesPeriodMillisecs,
266 std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
267 }
268
GetAllBlobFiles(std::set<uint64_t> * file_numbers)269 Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
270 assert(file_numbers != nullptr);
271 std::vector<std::string> all_files;
272 Status s = env_->GetChildren(blob_dir_, &all_files);
273 if (!s.ok()) {
274 ROCKS_LOG_ERROR(db_options_.info_log,
275 "Failed to get list of blob files, status: %s",
276 s.ToString().c_str());
277 return s;
278 }
279
280 for (const auto& file_name : all_files) {
281 uint64_t file_number;
282 FileType type;
283 bool success = ParseFileName(file_name, &file_number, &type);
284 if (success && type == kBlobFile) {
285 file_numbers->insert(file_number);
286 } else {
287 ROCKS_LOG_WARN(db_options_.info_log,
288 "Skipping file in blob directory: %s", file_name.c_str());
289 }
290 }
291
292 return s;
293 }
294
OpenAllBlobFiles()295 Status BlobDBImpl::OpenAllBlobFiles() {
296 std::set<uint64_t> file_numbers;
297 Status s = GetAllBlobFiles(&file_numbers);
298 if (!s.ok()) {
299 return s;
300 }
301
302 if (!file_numbers.empty()) {
303 next_file_number_.store(*file_numbers.rbegin() + 1);
304 }
305
306 std::ostringstream blob_file_oss;
307 std::ostringstream live_imm_oss;
308 std::ostringstream obsolete_file_oss;
309
310 for (auto& file_number : file_numbers) {
311 std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
312 this, blob_dir_, file_number, db_options_.info_log.get());
313 blob_file->MarkImmutable(/* sequence */ 0);
314
315 // Read file header and footer
316 Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
317 if (read_metadata_status.IsCorruption()) {
318 // Remove incomplete file.
319 if (!obsolete_files_.empty()) {
320 obsolete_file_oss << ", ";
321 }
322 obsolete_file_oss << file_number;
323
324 ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
325 continue;
326 } else if (!read_metadata_status.ok()) {
327 ROCKS_LOG_ERROR(db_options_.info_log,
328 "Unable to read metadata of blob file %" PRIu64
329 ", status: '%s'",
330 file_number, read_metadata_status.ToString().c_str());
331 return read_metadata_status;
332 }
333
334 total_blob_size_ += blob_file->GetFileSize();
335
336 if (!blob_files_.empty()) {
337 blob_file_oss << ", ";
338 }
339 blob_file_oss << file_number;
340
341 blob_files_[file_number] = blob_file;
342
343 if (!blob_file->HasTTL()) {
344 if (!live_imm_non_ttl_blob_files_.empty()) {
345 live_imm_oss << ", ";
346 }
347 live_imm_oss << file_number;
348
349 live_imm_non_ttl_blob_files_[file_number] = blob_file;
350 }
351 }
352
353 ROCKS_LOG_INFO(db_options_.info_log,
354 "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
355 blob_file_oss.str().c_str());
356 ROCKS_LOG_INFO(
357 db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
358 live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
359 ROCKS_LOG_INFO(db_options_.info_log,
360 "Found %" ROCKSDB_PRIszt
361 " incomplete or corrupted blob files: %s",
362 obsolete_files_.size(), obsolete_file_oss.str().c_str());
363 return s;
364 }
365
366 template <typename Linker>
LinkSstToBlobFileImpl(uint64_t sst_file_number,uint64_t blob_file_number,Linker linker)367 void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
368 uint64_t blob_file_number,
369 Linker linker) {
370 assert(bdb_options_.enable_garbage_collection);
371 assert(blob_file_number != kInvalidBlobFileNumber);
372
373 auto it = blob_files_.find(blob_file_number);
374 if (it == blob_files_.end()) {
375 ROCKS_LOG_WARN(db_options_.info_log,
376 "Blob file %" PRIu64
377 " not found while trying to link "
378 "SST file %" PRIu64,
379 blob_file_number, sst_file_number);
380 return;
381 }
382
383 BlobFile* const blob_file = it->second.get();
384 assert(blob_file);
385
386 linker(blob_file, sst_file_number);
387
388 ROCKS_LOG_INFO(db_options_.info_log,
389 "Blob file %" PRIu64 " linked to SST file %" PRIu64,
390 blob_file_number, sst_file_number);
391 }
392
LinkSstToBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)393 void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
394 uint64_t blob_file_number) {
395 auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
396 WriteLock file_lock(&blob_file->mutex_);
397 blob_file->LinkSstFile(sst_file);
398 };
399
400 LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
401 }
402
LinkSstToBlobFileNoLock(uint64_t sst_file_number,uint64_t blob_file_number)403 void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
404 uint64_t blob_file_number) {
405 auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
406 blob_file->LinkSstFile(sst_file);
407 };
408
409 LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
410 }
411
UnlinkSstFromBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)412 void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
413 uint64_t blob_file_number) {
414 assert(bdb_options_.enable_garbage_collection);
415 assert(blob_file_number != kInvalidBlobFileNumber);
416
417 auto it = blob_files_.find(blob_file_number);
418 if (it == blob_files_.end()) {
419 ROCKS_LOG_WARN(db_options_.info_log,
420 "Blob file %" PRIu64
421 " not found while trying to unlink "
422 "SST file %" PRIu64,
423 blob_file_number, sst_file_number);
424 return;
425 }
426
427 BlobFile* const blob_file = it->second.get();
428 assert(blob_file);
429
430 {
431 WriteLock file_lock(&blob_file->mutex_);
432 blob_file->UnlinkSstFile(sst_file_number);
433 }
434
435 ROCKS_LOG_INFO(db_options_.info_log,
436 "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
437 blob_file_number, sst_file_number);
438 }
439
InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)440 void BlobDBImpl::InitializeBlobFileToSstMapping(
441 const std::vector<LiveFileMetaData>& live_files) {
442 assert(bdb_options_.enable_garbage_collection);
443
444 for (const auto& live_file : live_files) {
445 const uint64_t sst_file_number = live_file.file_number;
446 const uint64_t blob_file_number = live_file.oldest_blob_file_number;
447
448 if (blob_file_number == kInvalidBlobFileNumber) {
449 continue;
450 }
451
452 LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
453 }
454 }
455
ProcessFlushJobInfo(const FlushJobInfo & info)456 void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
457 assert(bdb_options_.enable_garbage_collection);
458
459 WriteLock lock(&mutex_);
460
461 if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
462 LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
463 }
464
465 assert(flush_sequence_ < info.largest_seqno);
466 flush_sequence_ = info.largest_seqno;
467
468 MarkUnreferencedBlobFilesObsolete();
469 }
470
ProcessCompactionJobInfo(const CompactionJobInfo & info)471 void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
472 assert(bdb_options_.enable_garbage_collection);
473
474 if (!info.status.ok()) {
475 return;
476 }
477
478 // Note: the same SST file may appear in both the input and the output
479 // file list in case of a trivial move. We walk through the two lists
480 // below in a fashion that's similar to merge sort to detect this.
481
482 auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
483 return lhs.file_number < rhs.file_number;
484 };
485
486 auto inputs = info.input_file_infos;
487 auto iit = inputs.begin();
488 const auto iit_end = inputs.end();
489
490 std::sort(iit, iit_end, cmp);
491
492 auto outputs = info.output_file_infos;
493 auto oit = outputs.begin();
494 const auto oit_end = outputs.end();
495
496 std::sort(oit, oit_end, cmp);
497
498 WriteLock lock(&mutex_);
499
500 while (iit != iit_end && oit != oit_end) {
501 const auto& input = *iit;
502 const auto& output = *oit;
503
504 if (input.file_number == output.file_number) {
505 ++iit;
506 ++oit;
507 } else if (input.file_number < output.file_number) {
508 if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
509 UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
510 }
511
512 ++iit;
513 } else {
514 assert(output.file_number < input.file_number);
515
516 if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
517 LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
518 }
519
520 ++oit;
521 }
522 }
523
524 while (iit != iit_end) {
525 const auto& input = *iit;
526
527 if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
528 UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
529 }
530
531 ++iit;
532 }
533
534 while (oit != oit_end) {
535 const auto& output = *oit;
536
537 if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
538 LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
539 }
540
541 ++oit;
542 }
543
544 MarkUnreferencedBlobFilesObsolete();
545 }
546
MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq)547 bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
548 const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
549 assert(blob_file);
550 assert(!blob_file->HasTTL());
551 assert(blob_file->Immutable());
552 assert(bdb_options_.enable_garbage_collection);
553
554 // Note: FIFO eviction could have marked this file obsolete already.
555 if (blob_file->Obsolete()) {
556 return true;
557 }
558
559 // We cannot mark this file (or any higher-numbered files for that matter)
560 // obsolete if it is referenced by any memtables or SSTs. We keep track of
561 // the SSTs explicitly. To account for memtables, we keep track of the highest
562 // sequence number received in flush notifications, and we do not mark the
563 // blob file obsolete if there are still unflushed memtables from before
564 // the time the blob file was closed.
565 if (blob_file->GetImmutableSequence() > flush_sequence_ ||
566 !blob_file->GetLinkedSstFiles().empty()) {
567 return false;
568 }
569
570 ROCKS_LOG_INFO(db_options_.info_log,
571 "Blob file %" PRIu64 " is no longer needed, marking obsolete",
572 blob_file->BlobFileNumber());
573
574 ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
575 return true;
576 }
577
578 template <class Functor>
MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed)579 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
580 assert(bdb_options_.enable_garbage_collection);
581
582 // Iterate through all live immutable non-TTL blob files, and mark them
583 // obsolete assuming no SST files or memtables rely on the blobs in them.
584 // Note: we need to stop as soon as we find a blob file that has any
585 // linked SSTs (or one potentially referenced by memtables).
586
587 uint64_t obsoleted_files = 0;
588
589 auto it = live_imm_non_ttl_blob_files_.begin();
590 while (it != live_imm_non_ttl_blob_files_.end()) {
591 const auto& blob_file = it->second;
592 assert(blob_file);
593 assert(blob_file->BlobFileNumber() == it->first);
594 assert(!blob_file->HasTTL());
595 assert(blob_file->Immutable());
596
597 // Small optimization: Obsolete() does an atomic read, so we can do
598 // this check without taking a lock on the blob file's mutex.
599 if (blob_file->Obsolete()) {
600 it = live_imm_non_ttl_blob_files_.erase(it);
601 continue;
602 }
603
604 if (!mark_if_needed(blob_file)) {
605 break;
606 }
607
608 it = live_imm_non_ttl_blob_files_.erase(it);
609
610 ++obsoleted_files;
611 }
612
613 if (obsoleted_files > 0) {
614 ROCKS_LOG_INFO(db_options_.info_log,
615 "%" PRIu64 " blob file(s) marked obsolete by GC",
616 obsoleted_files);
617 RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
618 }
619 }
620
MarkUnreferencedBlobFilesObsolete()621 void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
622 const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
623
624 MarkUnreferencedBlobFilesObsoleteImpl(
625 [=](const std::shared_ptr<BlobFile>& blob_file) {
626 WriteLock file_lock(&blob_file->mutex_);
627 return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
628 });
629 }
630
MarkUnreferencedBlobFilesObsoleteDuringOpen()631 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
632 MarkUnreferencedBlobFilesObsoleteImpl(
633 [=](const std::shared_ptr<BlobFile>& blob_file) {
634 return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
635 });
636 }
637
CloseRandomAccessLocked(const std::shared_ptr<BlobFile> & bfile)638 void BlobDBImpl::CloseRandomAccessLocked(
639 const std::shared_ptr<BlobFile>& bfile) {
640 bfile->CloseRandomAccessLocked();
641 open_file_count_--;
642 }
643
GetBlobFileReader(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<RandomAccessFileReader> * reader)644 Status BlobDBImpl::GetBlobFileReader(
645 const std::shared_ptr<BlobFile>& blob_file,
646 std::shared_ptr<RandomAccessFileReader>* reader) {
647 assert(reader != nullptr);
648 bool fresh_open = false;
649 Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
650 if (s.ok() && fresh_open) {
651 assert(*reader != nullptr);
652 open_file_count_++;
653 }
654 return s;
655 }
656
NewBlobFile(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason)657 std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
658 bool has_ttl, const ExpirationRange& expiration_range,
659 const std::string& reason) {
660 assert(has_ttl == (expiration_range.first || expiration_range.second));
661
662 uint64_t file_num = next_file_number_++;
663
664 const uint32_t column_family_id =
665 static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
666 auto blob_file = std::make_shared<BlobFile>(
667 this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
668 bdb_options_.compression, has_ttl, expiration_range);
669
670 ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
671 blob_file->PathName().c_str(), reason.c_str());
672 LogFlush(db_options_.info_log);
673
674 return blob_file;
675 }
676
RegisterBlobFile(std::shared_ptr<BlobFile> blob_file)677 void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
678 const uint64_t blob_file_number = blob_file->BlobFileNumber();
679
680 auto it = blob_files_.lower_bound(blob_file_number);
681 assert(it == blob_files_.end() || it->first != blob_file_number);
682
683 blob_files_.insert(it,
684 std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
685 blob_file_number, std::move(blob_file)));
686 }
687
CreateWriterLocked(const std::shared_ptr<BlobFile> & bfile)688 Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
689 std::string fpath(bfile->PathName());
690 std::unique_ptr<WritableFile> wfile;
691
692 Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_);
693 if (!s.ok()) {
694 ROCKS_LOG_ERROR(db_options_.info_log,
695 "Failed to open blob file for write: %s status: '%s'"
696 " exists: '%s'",
697 fpath.c_str(), s.ToString().c_str(),
698 env_->FileExists(fpath).ToString().c_str());
699 return s;
700 }
701
702 std::unique_ptr<WritableFileWriter> fwriter;
703 fwriter.reset(new WritableFileWriter(
704 NewLegacyWritableFileWrapper(std::move(wfile)), fpath, env_options_));
705
706 uint64_t boffset = bfile->GetFileSize();
707 if (debug_level_ >= 2 && boffset) {
708 ROCKS_LOG_DEBUG(db_options_.info_log,
709 "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
710 boffset);
711 }
712
713 Writer::ElemType et = Writer::kEtNone;
714 if (bfile->file_size_ == BlobLogHeader::kSize) {
715 et = Writer::kEtFileHdr;
716 } else if (bfile->file_size_ > BlobLogHeader::kSize) {
717 et = Writer::kEtRecord;
718 } else if (bfile->file_size_) {
719 ROCKS_LOG_WARN(db_options_.info_log,
720 "Open blob file: %s with wrong size: %" PRIu64,
721 fpath.c_str(), boffset);
722 return Status::Corruption("Invalid blob file size");
723 }
724
725 bfile->log_writer_ = std::make_shared<Writer>(
726 std::move(fwriter), env_, statistics_, bfile->file_number_,
727 bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
728 bfile->log_writer_->last_elem_type_ = et;
729
730 return s;
731 }
732
FindBlobFileLocked(uint64_t expiration) const733 std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
734 uint64_t expiration) const {
735 if (open_ttl_files_.empty()) {
736 return nullptr;
737 }
738
739 std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
740 tmp->SetHasTTL(true);
741 tmp->expiration_range_ = std::make_pair(expiration, 0);
742 tmp->file_number_ = std::numeric_limits<uint64_t>::max();
743
744 auto citr = open_ttl_files_.equal_range(tmp);
745 if (citr.first == open_ttl_files_.end()) {
746 assert(citr.second == open_ttl_files_.end());
747
748 std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
749 return (check->expiration_range_.second <= expiration) ? nullptr : check;
750 }
751
752 if (citr.first != citr.second) {
753 return *(citr.first);
754 }
755
756 auto finditr = citr.second;
757 if (finditr != open_ttl_files_.begin()) {
758 --finditr;
759 }
760
761 bool b2 = (*finditr)->expiration_range_.second <= expiration;
762 bool b1 = (*finditr)->expiration_range_.first > expiration;
763
764 return (b1 || b2) ? nullptr : (*finditr);
765 }
766
CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<Writer> * writer)767 Status BlobDBImpl::CheckOrCreateWriterLocked(
768 const std::shared_ptr<BlobFile>& blob_file,
769 std::shared_ptr<Writer>* writer) {
770 assert(writer != nullptr);
771 *writer = blob_file->GetWriter();
772 if (*writer != nullptr) {
773 return Status::OK();
774 }
775 Status s = CreateWriterLocked(blob_file);
776 if (s.ok()) {
777 *writer = blob_file->GetWriter();
778 }
779 return s;
780 }
781
CreateBlobFileAndWriter(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason,std::shared_ptr<BlobFile> * blob_file,std::shared_ptr<Writer> * writer)782 Status BlobDBImpl::CreateBlobFileAndWriter(
783 bool has_ttl, const ExpirationRange& expiration_range,
784 const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
785 std::shared_ptr<Writer>* writer) {
786 assert(has_ttl == (expiration_range.first || expiration_range.second));
787 assert(blob_file);
788 assert(writer);
789
790 *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
791 assert(*blob_file);
792
793 // file not visible, hence no lock
794 Status s = CheckOrCreateWriterLocked(*blob_file, writer);
795 if (!s.ok()) {
796 ROCKS_LOG_ERROR(db_options_.info_log,
797 "Failed to get writer for blob file: %s, error: %s",
798 (*blob_file)->PathName().c_str(), s.ToString().c_str());
799 return s;
800 }
801
802 assert(*writer);
803
804 s = (*writer)->WriteHeader((*blob_file)->header_);
805 if (!s.ok()) {
806 ROCKS_LOG_ERROR(db_options_.info_log,
807 "Failed to write header to new blob file: %s"
808 " status: '%s'",
809 (*blob_file)->PathName().c_str(), s.ToString().c_str());
810 return s;
811 }
812
813 (*blob_file)->SetFileSize(BlobLogHeader::kSize);
814 total_blob_size_ += BlobLogHeader::kSize;
815
816 return s;
817 }
818
SelectBlobFile(std::shared_ptr<BlobFile> * blob_file)819 Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
820 assert(blob_file);
821
822 {
823 ReadLock rl(&mutex_);
824
825 if (open_non_ttl_file_) {
826 assert(!open_non_ttl_file_->Immutable());
827 *blob_file = open_non_ttl_file_;
828 return Status::OK();
829 }
830 }
831
832 // Check again
833 WriteLock wl(&mutex_);
834
835 if (open_non_ttl_file_) {
836 assert(!open_non_ttl_file_->Immutable());
837 *blob_file = open_non_ttl_file_;
838 return Status::OK();
839 }
840
841 std::shared_ptr<Writer> writer;
842 const Status s = CreateBlobFileAndWriter(
843 /* has_ttl */ false, ExpirationRange(),
844 /* reason */ "SelectBlobFile", blob_file, &writer);
845 if (!s.ok()) {
846 return s;
847 }
848
849 RegisterBlobFile(*blob_file);
850 open_non_ttl_file_ = *blob_file;
851
852 return s;
853 }
854
SelectBlobFileTTL(uint64_t expiration,std::shared_ptr<BlobFile> * blob_file)855 Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
856 std::shared_ptr<BlobFile>* blob_file) {
857 assert(blob_file);
858 assert(expiration != kNoExpiration);
859
860 {
861 ReadLock rl(&mutex_);
862
863 *blob_file = FindBlobFileLocked(expiration);
864 if (*blob_file != nullptr) {
865 assert(!(*blob_file)->Immutable());
866 return Status::OK();
867 }
868 }
869
870 // Check again
871 WriteLock wl(&mutex_);
872
873 *blob_file = FindBlobFileLocked(expiration);
874 if (*blob_file != nullptr) {
875 assert(!(*blob_file)->Immutable());
876 return Status::OK();
877 }
878
879 const uint64_t exp_low =
880 (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
881 const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
882 const ExpirationRange expiration_range(exp_low, exp_high);
883
884 std::ostringstream oss;
885 oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
886
887 std::shared_ptr<Writer> writer;
888 const Status s =
889 CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
890 /* reason */ oss.str(), blob_file, &writer);
891 if (!s.ok()) {
892 return s;
893 }
894
895 RegisterBlobFile(*blob_file);
896 open_ttl_files_.insert(*blob_file);
897
898 return s;
899 }
900
901 class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
902 private:
903 const WriteOptions& options_;
904 BlobDBImpl* blob_db_impl_;
905 uint32_t default_cf_id_;
906 WriteBatch batch_;
907
908 public:
BlobInserter(const WriteOptions & options,BlobDBImpl * blob_db_impl,uint32_t default_cf_id)909 BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
910 uint32_t default_cf_id)
911 : options_(options),
912 blob_db_impl_(blob_db_impl),
913 default_cf_id_(default_cf_id) {}
914
batch()915 WriteBatch* batch() { return &batch_; }
916
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)917 Status PutCF(uint32_t column_family_id, const Slice& key,
918 const Slice& value) override {
919 if (column_family_id != default_cf_id_) {
920 return Status::NotSupported(
921 "Blob DB doesn't support non-default column family.");
922 }
923 Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
924 &batch_);
925 return s;
926 }
927
DeleteCF(uint32_t column_family_id,const Slice & key)928 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
929 if (column_family_id != default_cf_id_) {
930 return Status::NotSupported(
931 "Blob DB doesn't support non-default column family.");
932 }
933 Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
934 return s;
935 }
936
DeleteRange(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)937 virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
938 const Slice& end_key) {
939 if (column_family_id != default_cf_id_) {
940 return Status::NotSupported(
941 "Blob DB doesn't support non-default column family.");
942 }
943 Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
944 begin_key, end_key);
945 return s;
946 }
947
SingleDeleteCF(uint32_t,const Slice &)948 Status SingleDeleteCF(uint32_t /*column_family_id*/,
949 const Slice& /*key*/) override {
950 return Status::NotSupported("Not supported operation in blob db.");
951 }
952
MergeCF(uint32_t,const Slice &,const Slice &)953 Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
954 const Slice& /*value*/) override {
955 return Status::NotSupported("Not supported operation in blob db.");
956 }
957
LogData(const Slice & blob)958 void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
959 };
960
Write(const WriteOptions & options,WriteBatch * updates)961 Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
962 StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
963 RecordTick(statistics_, BLOB_DB_NUM_WRITE);
964 uint32_t default_cf_id =
965 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
966 Status s;
967 BlobInserter blob_inserter(options, this, default_cf_id);
968 {
969 // Release write_mutex_ before DB write to avoid race condition with
970 // flush begin listener, which also require write_mutex_ to sync
971 // blob files.
972 MutexLock l(&write_mutex_);
973 s = updates->Iterate(&blob_inserter);
974 }
975 if (!s.ok()) {
976 return s;
977 }
978 return db_->Write(options, blob_inserter.batch());
979 }
980
Put(const WriteOptions & options,const Slice & key,const Slice & value)981 Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
982 const Slice& value) {
983 return PutUntil(options, key, value, kNoExpiration);
984 }
985
PutWithTTL(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t ttl)986 Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
987 const Slice& key, const Slice& value,
988 uint64_t ttl) {
989 uint64_t now = EpochNow();
990 uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
991 return PutUntil(options, key, value, expiration);
992 }
993
PutUntil(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t expiration)994 Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
995 const Slice& value, uint64_t expiration) {
996 StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
997 RecordTick(statistics_, BLOB_DB_NUM_PUT);
998 Status s;
999 WriteBatch batch;
1000 {
1001 // Release write_mutex_ before DB write to avoid race condition with
1002 // flush begin listener, which also require write_mutex_ to sync
1003 // blob files.
1004 MutexLock l(&write_mutex_);
1005 s = PutBlobValue(options, key, value, expiration, &batch);
1006 }
1007 if (s.ok()) {
1008 s = db_->Write(options, &batch);
1009 }
1010 return s;
1011 }
1012
PutBlobValue(const WriteOptions &,const Slice & key,const Slice & value,uint64_t expiration,WriteBatch * batch)1013 Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
1014 const Slice& key, const Slice& value,
1015 uint64_t expiration, WriteBatch* batch) {
1016 write_mutex_.AssertHeld();
1017 Status s;
1018 std::string index_entry;
1019 uint32_t column_family_id =
1020 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
1021 if (value.size() < bdb_options_.min_blob_size) {
1022 if (expiration == kNoExpiration) {
1023 // Put as normal value
1024 s = batch->Put(key, value);
1025 RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
1026 } else {
1027 // Inlined with TTL
1028 BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
1029 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1030 index_entry);
1031 RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
1032 }
1033 } else {
1034 std::string compression_output;
1035 Slice value_compressed = GetCompressedSlice(value, &compression_output);
1036
1037 std::string headerbuf;
1038 Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
1039
1040 // Check DB size limit before selecting blob file to
1041 // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
1042 // done before calling SelectBlobFile().
1043 s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
1044 value_compressed.size());
1045 if (!s.ok()) {
1046 return s;
1047 }
1048
1049 std::shared_ptr<BlobFile> blob_file;
1050 if (expiration != kNoExpiration) {
1051 s = SelectBlobFileTTL(expiration, &blob_file);
1052 } else {
1053 s = SelectBlobFile(&blob_file);
1054 }
1055 if (s.ok()) {
1056 assert(blob_file != nullptr);
1057 assert(blob_file->GetCompressionType() == bdb_options_.compression);
1058 s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
1059 &index_entry);
1060 }
1061 if (s.ok()) {
1062 if (expiration != kNoExpiration) {
1063 WriteLock file_lock(&blob_file->mutex_);
1064 blob_file->ExtendExpirationRange(expiration);
1065 }
1066 s = CloseBlobFileIfNeeded(blob_file);
1067 }
1068 if (s.ok()) {
1069 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1070 index_entry);
1071 }
1072 if (s.ok()) {
1073 if (expiration == kNoExpiration) {
1074 RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
1075 } else {
1076 RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
1077 }
1078 } else {
1079 ROCKS_LOG_ERROR(
1080 db_options_.info_log,
1081 "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
1082 " status: '%s' blob_file: '%s'",
1083 blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
1084 s.ToString().c_str(), blob_file->DumpState().c_str());
1085 }
1086 }
1087
1088 RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
1089 RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
1090 RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
1091 RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
1092
1093 return s;
1094 }
1095
GetCompressedSlice(const Slice & raw,std::string * compression_output) const1096 Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
1097 std::string* compression_output) const {
1098 if (bdb_options_.compression == kNoCompression) {
1099 return raw;
1100 }
1101 StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
1102 CompressionType type = bdb_options_.compression;
1103 CompressionOptions opts;
1104 CompressionContext context(type);
1105 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
1106 0 /* sample_for_compression */);
1107 CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
1108 compression_output, nullptr, nullptr);
1109 return *compression_output;
1110 }
1111
CompactFiles(const CompactionOptions & compact_options,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)1112 Status BlobDBImpl::CompactFiles(
1113 const CompactionOptions& compact_options,
1114 const std::vector<std::string>& input_file_names, const int output_level,
1115 const int output_path_id, std::vector<std::string>* const output_file_names,
1116 CompactionJobInfo* compaction_job_info) {
1117 // Note: we need CompactionJobInfo to be able to track updates to the
1118 // blob file <-> SST mappings, so we provide one if the user hasn't,
1119 // assuming that GC is enabled.
1120 CompactionJobInfo info{};
1121 if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
1122 compaction_job_info = &info;
1123 }
1124
1125 const Status s =
1126 db_->CompactFiles(compact_options, input_file_names, output_level,
1127 output_path_id, output_file_names, compaction_job_info);
1128 if (!s.ok()) {
1129 return s;
1130 }
1131
1132 if (bdb_options_.enable_garbage_collection) {
1133 assert(compaction_job_info);
1134 ProcessCompactionJobInfo(*compaction_job_info);
1135 }
1136
1137 return s;
1138 }
1139
GetCompactionContextCommon(BlobCompactionContext * context) const1140 void BlobDBImpl::GetCompactionContextCommon(
1141 BlobCompactionContext* context) const {
1142 assert(context);
1143
1144 context->next_file_number = next_file_number_.load();
1145 context->current_blob_files.clear();
1146 for (auto& p : blob_files_) {
1147 context->current_blob_files.insert(p.first);
1148 }
1149 context->fifo_eviction_seq = fifo_eviction_seq_;
1150 context->evict_expiration_up_to = evict_expiration_up_to_;
1151 }
1152
GetCompactionContext(BlobCompactionContext * context)1153 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
1154 assert(context);
1155
1156 ReadLock l(&mutex_);
1157 GetCompactionContextCommon(context);
1158 }
1159
GetCompactionContext(BlobCompactionContext * context,BlobCompactionContextGC * context_gc)1160 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
1161 BlobCompactionContextGC* context_gc) {
1162 assert(context);
1163 assert(context_gc);
1164
1165 ReadLock l(&mutex_);
1166 GetCompactionContextCommon(context);
1167
1168 context_gc->blob_db_impl = this;
1169
1170 if (!live_imm_non_ttl_blob_files_.empty()) {
1171 auto it = live_imm_non_ttl_blob_files_.begin();
1172 std::advance(it, bdb_options_.garbage_collection_cutoff *
1173 live_imm_non_ttl_blob_files_.size());
1174 context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
1175 ? it->first
1176 : std::numeric_limits<uint64_t>::max();
1177 }
1178 }
1179
UpdateLiveSSTSize()1180 void BlobDBImpl::UpdateLiveSSTSize() {
1181 uint64_t live_sst_size = 0;
1182 bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
1183 if (ok) {
1184 live_sst_size_.store(live_sst_size);
1185 ROCKS_LOG_INFO(db_options_.info_log,
1186 "Updated total SST file size: %" PRIu64 " bytes.",
1187 live_sst_size);
1188 } else {
1189 ROCKS_LOG_ERROR(
1190 db_options_.info_log,
1191 "Failed to update total SST file size after flush or compaction.");
1192 }
1193 {
1194 // Trigger FIFO eviction if needed.
1195 MutexLock l(&write_mutex_);
1196 Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
1197 if (s.IsNoSpace()) {
1198 ROCKS_LOG_WARN(db_options_.info_log,
1199 "DB grow out-of-space after SST size updated. Current live"
1200 " SST size: %" PRIu64
1201 " , current blob files size: %" PRIu64 ".",
1202 live_sst_size_.load(), total_blob_size_.load());
1203 }
1204 }
1205 }
1206
CheckSizeAndEvictBlobFiles(uint64_t blob_size,bool force_evict)1207 Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
1208 bool force_evict) {
1209 write_mutex_.AssertHeld();
1210
1211 uint64_t live_sst_size = live_sst_size_.load();
1212 if (bdb_options_.max_db_size == 0 ||
1213 live_sst_size + total_blob_size_.load() + blob_size <=
1214 bdb_options_.max_db_size) {
1215 return Status::OK();
1216 }
1217
1218 if (bdb_options_.is_fifo == false ||
1219 (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
1220 // FIFO eviction is disabled, or no space to insert new blob even we evict
1221 // all blob files.
1222 return Status::NoSpace(
1223 "Write failed, as writing it would exceed max_db_size limit.");
1224 }
1225
1226 std::vector<std::shared_ptr<BlobFile>> candidate_files;
1227 CopyBlobFiles(&candidate_files);
1228 std::sort(candidate_files.begin(), candidate_files.end(),
1229 BlobFileComparator());
1230 fifo_eviction_seq_ = GetLatestSequenceNumber();
1231
1232 WriteLock l(&mutex_);
1233
1234 while (!candidate_files.empty() &&
1235 live_sst_size + total_blob_size_.load() + blob_size >
1236 bdb_options_.max_db_size) {
1237 std::shared_ptr<BlobFile> blob_file = candidate_files.back();
1238 candidate_files.pop_back();
1239 WriteLock file_lock(&blob_file->mutex_);
1240 if (blob_file->Obsolete()) {
1241 // File already obsoleted by someone else.
1242 assert(blob_file->Immutable());
1243 continue;
1244 }
1245 // FIFO eviction can evict open blob files.
1246 if (!blob_file->Immutable()) {
1247 Status s = CloseBlobFile(blob_file);
1248 if (!s.ok()) {
1249 return s;
1250 }
1251 }
1252 assert(blob_file->Immutable());
1253 auto expiration_range = blob_file->GetExpirationRange();
1254 ROCKS_LOG_INFO(db_options_.info_log,
1255 "Evict oldest blob file since DB out of space. Current "
1256 "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
1257 ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
1258 ".",
1259 live_sst_size, total_blob_size_.load(),
1260 bdb_options_.max_db_size, blob_file->BlobFileNumber());
1261 ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
1262 evict_expiration_up_to_ = expiration_range.first;
1263 RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
1264 RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
1265 blob_file->BlobCount());
1266 RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
1267 blob_file->GetFileSize());
1268 TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
1269 }
1270 if (live_sst_size + total_blob_size_.load() + blob_size >
1271 bdb_options_.max_db_size) {
1272 return Status::NoSpace(
1273 "Write failed, as writing it would exceed max_db_size limit.");
1274 }
1275 return Status::OK();
1276 }
1277
AppendBlob(const std::shared_ptr<BlobFile> & bfile,const std::string & headerbuf,const Slice & key,const Slice & value,uint64_t expiration,std::string * index_entry)1278 Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
1279 const std::string& headerbuf, const Slice& key,
1280 const Slice& value, uint64_t expiration,
1281 std::string* index_entry) {
1282 Status s;
1283 uint64_t blob_offset = 0;
1284 uint64_t key_offset = 0;
1285 {
1286 WriteLock lockbfile_w(&bfile->mutex_);
1287 std::shared_ptr<Writer> writer;
1288 s = CheckOrCreateWriterLocked(bfile, &writer);
1289 if (!s.ok()) {
1290 return s;
1291 }
1292
1293 // write the blob to the blob log.
1294 s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
1295 &blob_offset);
1296 }
1297
1298 if (!s.ok()) {
1299 ROCKS_LOG_ERROR(db_options_.info_log,
1300 "Invalid status in AppendBlob: %s status: '%s'",
1301 bfile->PathName().c_str(), s.ToString().c_str());
1302 return s;
1303 }
1304
1305 uint64_t size_put = headerbuf.size() + key.size() + value.size();
1306 bfile->BlobRecordAdded(size_put);
1307 total_blob_size_ += size_put;
1308
1309 if (expiration == kNoExpiration) {
1310 BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
1311 value.size(), bdb_options_.compression);
1312 } else {
1313 BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
1314 blob_offset, value.size(),
1315 bdb_options_.compression);
1316 }
1317
1318 return s;
1319 }
1320
MultiGet(const ReadOptions & read_options,const std::vector<Slice> & keys,std::vector<std::string> * values)1321 std::vector<Status> BlobDBImpl::MultiGet(
1322 const ReadOptions& read_options,
1323 const std::vector<Slice>& keys, std::vector<std::string>* values) {
1324 StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
1325 RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
1326 // Get a snapshot to avoid blob file get deleted between we
1327 // fetch and index entry and reading from the file.
1328 ReadOptions ro(read_options);
1329 bool snapshot_created = SetSnapshotIfNeeded(&ro);
1330
1331 std::vector<Status> statuses;
1332 statuses.reserve(keys.size());
1333 values->clear();
1334 values->reserve(keys.size());
1335 PinnableSlice value;
1336 for (size_t i = 0; i < keys.size(); i++) {
1337 statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
1338 values->push_back(value.ToString());
1339 value.Reset();
1340 }
1341 if (snapshot_created) {
1342 db_->ReleaseSnapshot(ro.snapshot);
1343 }
1344 return statuses;
1345 }
1346
SetSnapshotIfNeeded(ReadOptions * read_options)1347 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
1348 assert(read_options != nullptr);
1349 if (read_options->snapshot != nullptr) {
1350 return false;
1351 }
1352 read_options->snapshot = db_->GetSnapshot();
1353 return true;
1354 }
1355
GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value,uint64_t * expiration)1356 Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
1357 PinnableSlice* value, uint64_t* expiration) {
1358 assert(value);
1359
1360 BlobIndex blob_index;
1361 Status s = blob_index.DecodeFrom(index_entry);
1362 if (!s.ok()) {
1363 return s;
1364 }
1365
1366 if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
1367 return Status::NotFound("Key expired");
1368 }
1369
1370 if (expiration != nullptr) {
1371 if (blob_index.HasTTL()) {
1372 *expiration = blob_index.expiration();
1373 } else {
1374 *expiration = kNoExpiration;
1375 }
1376 }
1377
1378 if (blob_index.IsInlined()) {
1379 // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
1380 // memory buffer to avoid extra copy.
1381 value->PinSelf(blob_index.value());
1382 return Status::OK();
1383 }
1384
1385 CompressionType compression_type = kNoCompression;
1386 s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
1387 blob_index.size(), value, &compression_type);
1388 if (!s.ok()) {
1389 return s;
1390 }
1391
1392 if (compression_type != kNoCompression) {
1393 BlockContents contents;
1394 auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
1395
1396 {
1397 StopWatch decompression_sw(env_, statistics_,
1398 BLOB_DB_DECOMPRESSION_MICROS);
1399 UncompressionContext context(compression_type);
1400 UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
1401 compression_type);
1402 s = UncompressBlockContentsForCompressionType(
1403 info, value->data(), value->size(), &contents,
1404 kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
1405 }
1406
1407 if (!s.ok()) {
1408 if (debug_level_ >= 2) {
1409 ROCKS_LOG_ERROR(
1410 db_options_.info_log,
1411 "Uncompression error during blob read from file: %" PRIu64
1412 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1413 " key: %s status: '%s'",
1414 blob_index.file_number(), blob_index.offset(), blob_index.size(),
1415 key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1416 }
1417
1418 return Status::Corruption("Unable to uncompress blob.");
1419 }
1420
1421 value->PinSelf(contents.data);
1422 }
1423
1424 return Status::OK();
1425 }
1426
GetRawBlobFromFile(const Slice & key,uint64_t file_number,uint64_t offset,uint64_t size,PinnableSlice * value,CompressionType * compression_type)1427 Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
1428 uint64_t offset, uint64_t size,
1429 PinnableSlice* value,
1430 CompressionType* compression_type) {
1431 assert(value);
1432 assert(compression_type);
1433 assert(*compression_type == kNoCompression);
1434
1435 if (!size) {
1436 value->PinSelf("");
1437 return Status::OK();
1438 }
1439
1440 // offset has to have certain min, as we will read CRC
1441 // later from the Blob Header, which needs to be also a
1442 // valid offset.
1443 if (offset <
1444 (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
1445 if (debug_level_ >= 2) {
1446 ROCKS_LOG_ERROR(db_options_.info_log,
1447 "Invalid blob index file_number: %" PRIu64
1448 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1449 " key: %s",
1450 file_number, offset, size,
1451 key.ToString(/* output_hex */ true).c_str());
1452 }
1453
1454 return Status::NotFound("Invalid blob offset");
1455 }
1456
1457 std::shared_ptr<BlobFile> blob_file;
1458
1459 {
1460 ReadLock rl(&mutex_);
1461 auto it = blob_files_.find(file_number);
1462
1463 // file was deleted
1464 if (it == blob_files_.end()) {
1465 return Status::NotFound("Blob Not Found as blob file missing");
1466 }
1467
1468 blob_file = it->second;
1469 }
1470
1471 *compression_type = blob_file->GetCompressionType();
1472
1473 // takes locks when called
1474 std::shared_ptr<RandomAccessFileReader> reader;
1475 Status s = GetBlobFileReader(blob_file, &reader);
1476 if (!s.ok()) {
1477 return s;
1478 }
1479
1480 assert(offset >= key.size() + sizeof(uint32_t));
1481 const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
1482 const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
1483
1484 // Allocate the buffer. This is safe in C++11
1485 std::string buf;
1486 AlignedBuf aligned_buf;
1487
1488 // A partial blob record contain checksum, key and value.
1489 Slice blob_record;
1490
1491 {
1492 StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
1493 if (reader->use_direct_io()) {
1494 s = reader->Read(record_offset, static_cast<size_t>(record_size),
1495 &blob_record, nullptr, &aligned_buf);
1496 } else {
1497 buf.reserve(static_cast<size_t>(record_size));
1498 s = reader->Read(record_offset, static_cast<size_t>(record_size),
1499 &blob_record, &buf[0], nullptr);
1500 }
1501 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
1502 }
1503
1504 if (!s.ok()) {
1505 ROCKS_LOG_DEBUG(
1506 db_options_.info_log,
1507 "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1508 ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
1509 file_number, offset, size, key.size(), s.ToString().c_str());
1510 return s;
1511 }
1512
1513 if (blob_record.size() != record_size) {
1514 ROCKS_LOG_DEBUG(
1515 db_options_.info_log,
1516 "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1517 ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
1518 ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
1519 file_number, offset, size, key.size(), blob_record.size(), record_size);
1520
1521 return Status::Corruption("Failed to retrieve blob from blob index.");
1522 }
1523
1524 Slice crc_slice(blob_record.data(), sizeof(uint32_t));
1525 Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
1526 static_cast<size_t>(size));
1527
1528 uint32_t crc_exp = 0;
1529 if (!GetFixed32(&crc_slice, &crc_exp)) {
1530 ROCKS_LOG_DEBUG(
1531 db_options_.info_log,
1532 "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
1533 ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
1534 file_number, offset, size, key.size(), s.ToString().c_str());
1535 return Status::Corruption("Unable to decode checksum.");
1536 }
1537
1538 uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
1539 blob_record.size() - sizeof(uint32_t));
1540 crc = crc32c::Mask(crc); // Adjust for storage
1541 if (crc != crc_exp) {
1542 if (debug_level_ >= 2) {
1543 ROCKS_LOG_ERROR(
1544 db_options_.info_log,
1545 "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
1546 " blob_size: %" PRIu64 " key: %s status: '%s'",
1547 file_number, offset, size,
1548 key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1549 }
1550
1551 return Status::Corruption("Corruption. Blob CRC mismatch");
1552 }
1553
1554 value->PinSelf(blob_value);
1555
1556 return Status::OK();
1557 }
1558
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1559 Status BlobDBImpl::Get(const ReadOptions& read_options,
1560 ColumnFamilyHandle* column_family, const Slice& key,
1561 PinnableSlice* value) {
1562 return Get(read_options, column_family, key, value,
1563 static_cast<uint64_t*>(nullptr) /*expiration*/);
1564 }
1565
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1566 Status BlobDBImpl::Get(const ReadOptions& read_options,
1567 ColumnFamilyHandle* column_family, const Slice& key,
1568 PinnableSlice* value, uint64_t* expiration) {
1569 StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
1570 RecordTick(statistics_, BLOB_DB_NUM_GET);
1571 return GetImpl(read_options, column_family, key, value, expiration);
1572 }
1573
GetImpl(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1574 Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
1575 ColumnFamilyHandle* column_family, const Slice& key,
1576 PinnableSlice* value, uint64_t* expiration) {
1577 if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
1578 return Status::NotSupported(
1579 "Blob DB doesn't support non-default column family.");
1580 }
1581 // Get a snapshot to avoid blob file get deleted between we
1582 // fetch and index entry and reading from the file.
1583 // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1584 ReadOptions ro(read_options);
1585 bool snapshot_created = SetSnapshotIfNeeded(&ro);
1586
1587 PinnableSlice index_entry;
1588 Status s;
1589 bool is_blob_index = false;
1590 DBImpl::GetImplOptions get_impl_options;
1591 get_impl_options.column_family = column_family;
1592 get_impl_options.value = &index_entry;
1593 get_impl_options.is_blob_index = &is_blob_index;
1594 s = db_impl_->GetImpl(ro, key, get_impl_options);
1595 if (expiration != nullptr) {
1596 *expiration = kNoExpiration;
1597 }
1598 RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
1599 if (s.ok()) {
1600 if (is_blob_index) {
1601 s = GetBlobValue(key, index_entry, value, expiration);
1602 } else {
1603 // The index entry is the value itself in this case.
1604 value->PinSelf(index_entry);
1605 }
1606 RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
1607 }
1608 if (snapshot_created) {
1609 db_->ReleaseSnapshot(ro.snapshot);
1610 }
1611 return s;
1612 }
1613
SanityCheck(bool aborted)1614 std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
1615 if (aborted) {
1616 return std::make_pair(false, -1);
1617 }
1618
1619 ReadLock rl(&mutex_);
1620
1621 ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
1622 ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
1623 blob_files_.size());
1624 ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
1625 open_ttl_files_.size());
1626
1627 for (const auto& blob_file : open_ttl_files_) {
1628 (void)blob_file;
1629 assert(!blob_file->Immutable());
1630 }
1631
1632 for (const auto& pair : live_imm_non_ttl_blob_files_) {
1633 const auto& blob_file = pair.second;
1634 (void)blob_file;
1635 assert(!blob_file->HasTTL());
1636 assert(blob_file->Immutable());
1637 }
1638
1639 uint64_t now = EpochNow();
1640
1641 for (auto blob_file_pair : blob_files_) {
1642 auto blob_file = blob_file_pair.second;
1643 char buf[1000];
1644 int pos = snprintf(buf, sizeof(buf),
1645 "Blob file %" PRIu64 ", size %" PRIu64
1646 ", blob count %" PRIu64 ", immutable %d",
1647 blob_file->BlobFileNumber(), blob_file->GetFileSize(),
1648 blob_file->BlobCount(), blob_file->Immutable());
1649 if (blob_file->HasTTL()) {
1650 ExpirationRange expiration_range;
1651
1652 {
1653 ReadLock file_lock(&blob_file->mutex_);
1654 expiration_range = blob_file->GetExpirationRange();
1655 }
1656
1657 pos += snprintf(buf + pos, sizeof(buf) - pos,
1658 ", expiration range (%" PRIu64 ", %" PRIu64 ")",
1659 expiration_range.first, expiration_range.second);
1660 if (!blob_file->Obsolete()) {
1661 pos += snprintf(buf + pos, sizeof(buf) - pos,
1662 ", expire in %" PRIu64 " seconds",
1663 expiration_range.second - now);
1664 }
1665 }
1666 if (blob_file->Obsolete()) {
1667 pos += snprintf(buf + pos, sizeof(buf) - pos, ", obsolete at %" PRIu64,
1668 blob_file->GetObsoleteSequence());
1669 }
1670 snprintf(buf + pos, sizeof(buf) - pos, ".");
1671 ROCKS_LOG_INFO(db_options_.info_log, "%s", buf);
1672 }
1673
1674 // reschedule
1675 return std::make_pair(true, -1);
1676 }
1677
CloseBlobFile(std::shared_ptr<BlobFile> bfile)1678 Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
1679 assert(bfile);
1680 assert(!bfile->Immutable());
1681 assert(!bfile->Obsolete());
1682
1683 if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
1684 write_mutex_.AssertHeld();
1685 }
1686
1687 ROCKS_LOG_INFO(db_options_.info_log,
1688 "Closing blob file %" PRIu64 ". Path: %s",
1689 bfile->BlobFileNumber(), bfile->PathName().c_str());
1690
1691 const SequenceNumber sequence = GetLatestSequenceNumber();
1692
1693 const Status s = bfile->WriteFooterAndCloseLocked(sequence);
1694
1695 if (s.ok()) {
1696 total_blob_size_ += BlobLogFooter::kSize;
1697 } else {
1698 bfile->MarkImmutable(sequence);
1699
1700 ROCKS_LOG_ERROR(db_options_.info_log,
1701 "Failed to close blob file %" PRIu64 "with error: %s",
1702 bfile->BlobFileNumber(), s.ToString().c_str());
1703 }
1704
1705 if (bfile->HasTTL()) {
1706 size_t erased __attribute__((__unused__));
1707 erased = open_ttl_files_.erase(bfile);
1708 } else {
1709 if (bfile == open_non_ttl_file_) {
1710 open_non_ttl_file_ = nullptr;
1711 }
1712
1713 const uint64_t blob_file_number = bfile->BlobFileNumber();
1714 auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
1715 assert(it == live_imm_non_ttl_blob_files_.end() ||
1716 it->first != blob_file_number);
1717 live_imm_non_ttl_blob_files_.insert(
1718 it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
1719 blob_file_number, bfile));
1720 }
1721
1722 return s;
1723 }
1724
CloseBlobFileIfNeeded(std::shared_ptr<BlobFile> & bfile)1725 Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
1726 write_mutex_.AssertHeld();
1727
1728 // atomic read
1729 if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
1730 return Status::OK();
1731 }
1732
1733 WriteLock lock(&mutex_);
1734 WriteLock file_lock(&bfile->mutex_);
1735
1736 assert(!bfile->Obsolete() || bfile->Immutable());
1737 if (bfile->Immutable()) {
1738 return Status::OK();
1739 }
1740
1741 return CloseBlobFile(bfile);
1742 }
1743
ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,SequenceNumber obsolete_seq,bool update_size)1744 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
1745 SequenceNumber obsolete_seq,
1746 bool update_size) {
1747 assert(blob_file->Immutable());
1748 assert(!blob_file->Obsolete());
1749
1750 // Should hold write lock of mutex_ or during DB open.
1751 blob_file->MarkObsolete(obsolete_seq);
1752 obsolete_files_.push_back(blob_file);
1753 assert(total_blob_size_.load() >= blob_file->GetFileSize());
1754 if (update_size) {
1755 total_blob_size_ -= blob_file->GetFileSize();
1756 }
1757 }
1758
VisibleToActiveSnapshot(const std::shared_ptr<BlobFile> & bfile)1759 bool BlobDBImpl::VisibleToActiveSnapshot(
1760 const std::shared_ptr<BlobFile>& bfile) {
1761 assert(bfile->Obsolete());
1762
1763 // We check whether the oldest snapshot is no less than the last sequence
1764 // by the time the blob file become obsolete. If so, the blob file is not
1765 // visible to all existing snapshots.
1766 //
1767 // If we keep track of the earliest sequence of the keys in the blob file,
1768 // we could instead check if there's a snapshot falls in range
1769 // [earliest_sequence, obsolete_sequence). But doing so will make the
1770 // implementation more complicated.
1771 SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
1772 SequenceNumber oldest_snapshot = kMaxSequenceNumber;
1773 {
1774 // Need to lock DBImpl mutex before access snapshot list.
1775 InstrumentedMutexLock l(db_impl_->mutex());
1776 auto& snapshots = db_impl_->snapshots();
1777 if (!snapshots.empty()) {
1778 oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
1779 }
1780 }
1781 bool visible = oldest_snapshot < obsolete_sequence;
1782 if (visible) {
1783 ROCKS_LOG_INFO(db_options_.info_log,
1784 "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
1785 ") visible to oldest snapshot %" PRIu64 ".",
1786 bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
1787 }
1788 return visible;
1789 }
1790
EvictExpiredFiles(bool aborted)1791 std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
1792 if (aborted) {
1793 return std::make_pair(false, -1);
1794 }
1795
1796 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
1797 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
1798
1799 std::vector<std::shared_ptr<BlobFile>> process_files;
1800 uint64_t now = EpochNow();
1801 {
1802 ReadLock rl(&mutex_);
1803 for (auto p : blob_files_) {
1804 auto& blob_file = p.second;
1805 ReadLock file_lock(&blob_file->mutex_);
1806 if (blob_file->HasTTL() && !blob_file->Obsolete() &&
1807 blob_file->GetExpirationRange().second <= now) {
1808 process_files.push_back(blob_file);
1809 }
1810 }
1811 }
1812
1813 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
1814 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
1815 TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
1816
1817 SequenceNumber seq = GetLatestSequenceNumber();
1818 {
1819 MutexLock l(&write_mutex_);
1820 WriteLock lock(&mutex_);
1821 for (auto& blob_file : process_files) {
1822 WriteLock file_lock(&blob_file->mutex_);
1823
1824 // Need to double check if the file is obsolete.
1825 if (blob_file->Obsolete()) {
1826 assert(blob_file->Immutable());
1827 continue;
1828 }
1829
1830 if (!blob_file->Immutable()) {
1831 CloseBlobFile(blob_file);
1832 }
1833
1834 assert(blob_file->Immutable());
1835
1836 ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
1837 }
1838 }
1839
1840 return std::make_pair(true, -1);
1841 }
1842
SyncBlobFiles()1843 Status BlobDBImpl::SyncBlobFiles() {
1844 MutexLock l(&write_mutex_);
1845
1846 std::vector<std::shared_ptr<BlobFile>> process_files;
1847 {
1848 ReadLock rl(&mutex_);
1849 for (auto fitr : open_ttl_files_) {
1850 process_files.push_back(fitr);
1851 }
1852 if (open_non_ttl_file_ != nullptr) {
1853 process_files.push_back(open_non_ttl_file_);
1854 }
1855 }
1856
1857 Status s;
1858 for (auto& blob_file : process_files) {
1859 s = blob_file->Fsync();
1860 if (!s.ok()) {
1861 ROCKS_LOG_ERROR(db_options_.info_log,
1862 "Failed to sync blob file %" PRIu64 ", status: %s",
1863 blob_file->BlobFileNumber(), s.ToString().c_str());
1864 return s;
1865 }
1866 }
1867
1868 s = dir_ent_->Fsync();
1869 if (!s.ok()) {
1870 ROCKS_LOG_ERROR(db_options_.info_log,
1871 "Failed to sync blob directory, status: %s",
1872 s.ToString().c_str());
1873 }
1874 return s;
1875 }
1876
ReclaimOpenFiles(bool aborted)1877 std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
1878 if (aborted) return std::make_pair(false, -1);
1879
1880 if (open_file_count_.load() < kOpenFilesTrigger) {
1881 return std::make_pair(true, -1);
1882 }
1883
1884 // in the future, we should sort by last_access_
1885 // instead of closing every file
1886 ReadLock rl(&mutex_);
1887 for (auto const& ent : blob_files_) {
1888 auto bfile = ent.second;
1889 if (bfile->last_access_.load() == -1) continue;
1890
1891 WriteLock lockbfile_w(&bfile->mutex_);
1892 CloseRandomAccessLocked(bfile);
1893 }
1894
1895 return std::make_pair(true, -1);
1896 }
1897
DeleteObsoleteFiles(bool aborted)1898 std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
1899 if (aborted) {
1900 return std::make_pair(false, -1);
1901 }
1902
1903 MutexLock delete_file_lock(&delete_file_mutex_);
1904 if (disable_file_deletions_ > 0) {
1905 return std::make_pair(true, -1);
1906 }
1907
1908 std::list<std::shared_ptr<BlobFile>> tobsolete;
1909 {
1910 WriteLock wl(&mutex_);
1911 if (obsolete_files_.empty()) {
1912 return std::make_pair(true, -1);
1913 }
1914 tobsolete.swap(obsolete_files_);
1915 }
1916
1917 bool file_deleted = false;
1918 for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
1919 auto bfile = *iter;
1920 {
1921 ReadLock lockbfile_r(&bfile->mutex_);
1922 if (VisibleToActiveSnapshot(bfile)) {
1923 ROCKS_LOG_INFO(db_options_.info_log,
1924 "Could not delete file due to snapshot failure %s",
1925 bfile->PathName().c_str());
1926 ++iter;
1927 continue;
1928 }
1929 }
1930 ROCKS_LOG_INFO(db_options_.info_log,
1931 "Will delete file due to snapshot success %s",
1932 bfile->PathName().c_str());
1933
1934 {
1935 WriteLock wl(&mutex_);
1936 blob_files_.erase(bfile->BlobFileNumber());
1937 }
1938
1939 Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
1940 bfile->PathName(), blob_dir_, true,
1941 /*force_fg=*/false);
1942 if (!s.ok()) {
1943 ROCKS_LOG_ERROR(db_options_.info_log,
1944 "File failed to be deleted as obsolete %s",
1945 bfile->PathName().c_str());
1946 ++iter;
1947 continue;
1948 }
1949
1950 file_deleted = true;
1951 ROCKS_LOG_INFO(db_options_.info_log,
1952 "File deleted as obsolete from blob dir %s",
1953 bfile->PathName().c_str());
1954
1955 iter = tobsolete.erase(iter);
1956 }
1957
1958 // directory change. Fsync
1959 if (file_deleted) {
1960 Status s = dir_ent_->Fsync();
1961 if (!s.ok()) {
1962 ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
1963 blob_dir_.c_str(), s.ToString().c_str());
1964 }
1965 }
1966
1967 // put files back into obsolete if for some reason, delete failed
1968 if (!tobsolete.empty()) {
1969 WriteLock wl(&mutex_);
1970 for (auto bfile : tobsolete) {
1971 blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
1972 obsolete_files_.push_front(bfile);
1973 }
1974 }
1975
1976 return std::make_pair(!aborted, -1);
1977 }
1978
CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>> * bfiles_copy)1979 void BlobDBImpl::CopyBlobFiles(
1980 std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
1981 ReadLock rl(&mutex_);
1982 for (auto const& p : blob_files_) {
1983 bfiles_copy->push_back(p.second);
1984 }
1985 }
1986
NewIterator(const ReadOptions & read_options)1987 Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
1988 auto* cfd =
1989 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
1990 // Get a snapshot to avoid blob file get deleted between we
1991 // fetch and index entry and reading from the file.
1992 ManagedSnapshot* own_snapshot = nullptr;
1993 const Snapshot* snapshot = read_options.snapshot;
1994 if (snapshot == nullptr) {
1995 own_snapshot = new ManagedSnapshot(db_);
1996 snapshot = own_snapshot->snapshot();
1997 }
1998 auto* iter = db_impl_->NewIteratorImpl(
1999 read_options, cfd, snapshot->GetSequenceNumber(),
2000 nullptr /*read_callback*/, true /*allow_blob*/);
2001 return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
2002 }
2003
DestroyBlobDB(const std::string & dbname,const Options & options,const BlobDBOptions & bdb_options)2004 Status DestroyBlobDB(const std::string& dbname, const Options& options,
2005 const BlobDBOptions& bdb_options) {
2006 const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
2007 Env* env = soptions.env;
2008
2009 Status status;
2010 std::string blobdir;
2011 blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
2012 : bdb_options.blob_dir;
2013
2014 std::vector<std::string> filenames;
2015 env->GetChildren(blobdir, &filenames);
2016
2017 for (const auto& f : filenames) {
2018 uint64_t number;
2019 FileType type;
2020 if (ParseFileName(f, &number, &type) && type == kBlobFile) {
2021 Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
2022 /*force_fg=*/false);
2023 if (status.ok() && !del.ok()) {
2024 status = del;
2025 }
2026 }
2027 }
2028 env->DeleteDir(blobdir);
2029
2030 Status destroy = DestroyDB(dbname, options);
2031 if (status.ok() && !destroy.ok()) {
2032 status = destroy;
2033 }
2034
2035 return status;
2036 }
2037
2038 #ifndef NDEBUG
TEST_GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value)2039 Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
2040 PinnableSlice* value) {
2041 return GetBlobValue(key, index_entry, value);
2042 }
2043
TEST_AddDummyBlobFile(uint64_t blob_file_number,SequenceNumber immutable_sequence)2044 void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
2045 SequenceNumber immutable_sequence) {
2046 auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
2047 db_options_.info_log.get());
2048 blob_file->MarkImmutable(immutable_sequence);
2049
2050 blob_files_[blob_file_number] = blob_file;
2051 live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
2052 }
2053
TEST_GetBlobFiles() const2054 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
2055 ReadLock l(&mutex_);
2056 std::vector<std::shared_ptr<BlobFile>> blob_files;
2057 for (auto& p : blob_files_) {
2058 blob_files.emplace_back(p.second);
2059 }
2060 return blob_files;
2061 }
2062
TEST_GetLiveImmNonTTLFiles() const2063 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
2064 const {
2065 ReadLock l(&mutex_);
2066 std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
2067 for (const auto& pair : live_imm_non_ttl_blob_files_) {
2068 live_imm_non_ttl_files.emplace_back(pair.second);
2069 }
2070 return live_imm_non_ttl_files;
2071 }
2072
TEST_GetObsoleteFiles() const2073 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
2074 const {
2075 ReadLock l(&mutex_);
2076 std::vector<std::shared_ptr<BlobFile>> obsolete_files;
2077 for (auto& bfile : obsolete_files_) {
2078 obsolete_files.emplace_back(bfile);
2079 }
2080 return obsolete_files;
2081 }
2082
TEST_DeleteObsoleteFiles()2083 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
2084 DeleteObsoleteFiles(false /*abort*/);
2085 }
2086
TEST_CloseBlobFile(std::shared_ptr<BlobFile> & bfile)2087 Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
2088 MutexLock l(&write_mutex_);
2089 WriteLock lock(&mutex_);
2090 WriteLock file_lock(&bfile->mutex_);
2091
2092 return CloseBlobFile(bfile);
2093 }
2094
TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq,bool update_size)2095 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
2096 SequenceNumber obsolete_seq,
2097 bool update_size) {
2098 return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
2099 }
2100
TEST_EvictExpiredFiles()2101 void BlobDBImpl::TEST_EvictExpiredFiles() {
2102 EvictExpiredFiles(false /*abort*/);
2103 }
2104
TEST_live_sst_size()2105 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
2106
TEST_InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)2107 void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
2108 const std::vector<LiveFileMetaData>& live_files) {
2109 InitializeBlobFileToSstMapping(live_files);
2110 }
2111
TEST_ProcessFlushJobInfo(const FlushJobInfo & info)2112 void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
2113 ProcessFlushJobInfo(info);
2114 }
2115
TEST_ProcessCompactionJobInfo(const CompactionJobInfo & info)2116 void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
2117 ProcessCompactionJobInfo(info);
2118 }
2119
2120 #endif // !NDEBUG
2121
2122 } // namespace blob_db
2123 } // namespace ROCKSDB_NAMESPACE
2124 #endif // ROCKSDB_LITE
2125