1 
2 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
3 //  This source code is licensed under both the GPLv2 (found in the
4 //  COPYING file in the root directory) and Apache 2.0 License
5 //  (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/blob_db/blob_db_impl.h"
9 #include <algorithm>
10 #include <cinttypes>
11 #include <iomanip>
12 #include <memory>
13 #include <sstream>
14 
15 #include "db/blob/blob_index.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/write_batch_internal.h"
18 #include "env/composite_env_wrapper.h"
19 #include "file/file_util.h"
20 #include "file/filename.h"
21 #include "file/random_access_file_reader.h"
22 #include "file/sst_file_manager_impl.h"
23 #include "file/writable_file_writer.h"
24 #include "logging/logging.h"
25 #include "monitoring/instrumented_mutex.h"
26 #include "monitoring/statistics.h"
27 #include "rocksdb/convenience.h"
28 #include "rocksdb/env.h"
29 #include "rocksdb/iterator.h"
30 #include "rocksdb/utilities/stackable_db.h"
31 #include "rocksdb/utilities/transaction.h"
32 #include "table/block_based/block.h"
33 #include "table/block_based/block_based_table_builder.h"
34 #include "table/block_based/block_builder.h"
35 #include "table/meta_blocks.h"
36 #include "test_util/sync_point.h"
37 #include "util/cast_util.h"
38 #include "util/crc32c.h"
39 #include "util/mutexlock.h"
40 #include "util/random.h"
41 #include "util/stop_watch.h"
42 #include "util/timer_queue.h"
43 #include "utilities/blob_db/blob_compaction_filter.h"
44 #include "utilities/blob_db/blob_db_iterator.h"
45 #include "utilities/blob_db/blob_db_listener.h"
46 
47 namespace {
48 int kBlockBasedTableVersionFormat = 2;
49 }  // end namespace
50 
51 namespace ROCKSDB_NAMESPACE {
52 namespace blob_db {
53 
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const54 bool BlobFileComparator::operator()(
55     const std::shared_ptr<BlobFile>& lhs,
56     const std::shared_ptr<BlobFile>& rhs) const {
57   return lhs->BlobFileNumber() > rhs->BlobFileNumber();
58 }
59 
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const60 bool BlobFileComparatorTTL::operator()(
61     const std::shared_ptr<BlobFile>& lhs,
62     const std::shared_ptr<BlobFile>& rhs) const {
63   assert(lhs->HasTTL() && rhs->HasTTL());
64   if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
65     return true;
66   }
67   if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
68     return false;
69   }
70   return lhs->BlobFileNumber() < rhs->BlobFileNumber();
71 }
72 
BlobDBImpl(const std::string & dbname,const BlobDBOptions & blob_db_options,const DBOptions & db_options,const ColumnFamilyOptions & cf_options)73 BlobDBImpl::BlobDBImpl(const std::string& dbname,
74                        const BlobDBOptions& blob_db_options,
75                        const DBOptions& db_options,
76                        const ColumnFamilyOptions& cf_options)
77     : BlobDB(),
78       dbname_(dbname),
79       db_impl_(nullptr),
80       env_(db_options.env),
81       bdb_options_(blob_db_options),
82       db_options_(db_options),
83       cf_options_(cf_options),
84       env_options_(db_options),
85       statistics_(db_options_.statistics.get()),
86       next_file_number_(1),
87       flush_sequence_(0),
88       closed_(true),
89       open_file_count_(0),
90       total_blob_size_(0),
91       live_sst_size_(0),
92       fifo_eviction_seq_(0),
93       evict_expiration_up_to_(0),
94       debug_level_(0) {
95   blob_dir_ = (bdb_options_.path_relative)
96                   ? dbname + "/" + bdb_options_.blob_dir
97                   : bdb_options_.blob_dir;
98   env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
99 }
100 
~BlobDBImpl()101 BlobDBImpl::~BlobDBImpl() {
102   tqueue_.shutdown();
103   // CancelAllBackgroundWork(db_, true);
104   Status s __attribute__((__unused__)) = Close();
105   assert(s.ok());
106 }
107 
Close()108 Status BlobDBImpl::Close() {
109   if (closed_) {
110     return Status::OK();
111   }
112   closed_ = true;
113 
114   // Close base DB before BlobDBImpl destructs to stop event listener and
115   // compaction filter call.
116   Status s = db_->Close();
117   // delete db_ anyway even if close failed.
118   delete db_;
119   // Reset pointers to avoid StackableDB delete the pointer again.
120   db_ = nullptr;
121   db_impl_ = nullptr;
122   if (!s.ok()) {
123     return s;
124   }
125 
126   s = SyncBlobFiles();
127   return s;
128 }
129 
GetBlobDBOptions() const130 BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
131 
Open(std::vector<ColumnFamilyHandle * > * handles)132 Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
133   assert(handles != nullptr);
134   assert(db_ == nullptr);
135 
136   if (blob_dir_.empty()) {
137     return Status::NotSupported("No blob directory in options");
138   }
139 
140   if (cf_options_.compaction_filter != nullptr ||
141       cf_options_.compaction_filter_factory != nullptr) {
142     return Status::NotSupported("Blob DB doesn't support compaction filter.");
143   }
144 
145   if (bdb_options_.garbage_collection_cutoff < 0.0 ||
146       bdb_options_.garbage_collection_cutoff > 1.0) {
147     return Status::InvalidArgument(
148         "Garbage collection cutoff must be in the interval [0.0, 1.0]");
149   }
150 
151   // Temporarily disable compactions in the base DB during open; save the user
152   // defined value beforehand so we can restore it once BlobDB is initialized.
153   // Note: this is only needed if garbage collection is enabled.
154   const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
155 
156   if (bdb_options_.enable_garbage_collection) {
157     cf_options_.disable_auto_compactions = true;
158   }
159 
160   Status s;
161 
162   // Create info log.
163   if (db_options_.info_log == nullptr) {
164     s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
165     if (!s.ok()) {
166       return s;
167     }
168   }
169 
170   ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
171 
172   // Open blob directory.
173   s = env_->CreateDirIfMissing(blob_dir_);
174   if (!s.ok()) {
175     ROCKS_LOG_ERROR(db_options_.info_log,
176                     "Failed to create blob_dir %s, status: %s",
177                     blob_dir_.c_str(), s.ToString().c_str());
178   }
179   s = env_->NewDirectory(blob_dir_, &dir_ent_);
180   if (!s.ok()) {
181     ROCKS_LOG_ERROR(db_options_.info_log,
182                     "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
183                     s.ToString().c_str());
184     return s;
185   }
186 
187   // Open blob files.
188   s = OpenAllBlobFiles();
189   if (!s.ok()) {
190     return s;
191   }
192 
193   // Update options
194   if (bdb_options_.enable_garbage_collection) {
195     db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
196     cf_options_.compaction_filter_factory =
197         std::make_shared<BlobIndexCompactionFilterFactoryGC>(this, env_,
198                                                              statistics_);
199   } else {
200     db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
201     cf_options_.compaction_filter_factory =
202         std::make_shared<BlobIndexCompactionFilterFactory>(this, env_,
203                                                            statistics_);
204   }
205 
206   // Open base db.
207   ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
208   s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
209   if (!s.ok()) {
210     return s;
211   }
212   db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
213 
214   // Initialize SST file <-> oldest blob file mapping if garbage collection
215   // is enabled.
216   if (bdb_options_.enable_garbage_collection) {
217     std::vector<LiveFileMetaData> live_files;
218     db_->GetLiveFilesMetaData(&live_files);
219 
220     InitializeBlobFileToSstMapping(live_files);
221 
222     MarkUnreferencedBlobFilesObsoleteDuringOpen();
223 
224     if (!disable_auto_compactions) {
225       s = db_->EnableAutoCompaction(*handles);
226       if (!s.ok()) {
227         ROCKS_LOG_ERROR(
228             db_options_.info_log,
229             "Failed to enable automatic compactions during open, status: %s",
230             s.ToString().c_str());
231         return s;
232       }
233     }
234   }
235 
236   // Add trash files in blob dir to file delete scheduler.
237   SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
238       db_impl_->immutable_db_options().sst_file_manager.get());
239   DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
240 
241   UpdateLiveSSTSize();
242 
243   // Start background jobs.
244   if (!bdb_options_.disable_background_tasks) {
245     StartBackgroundTasks();
246   }
247 
248   ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
249   bdb_options_.Dump(db_options_.info_log.get());
250   closed_ = false;
251   return s;
252 }
253 
StartBackgroundTasks()254 void BlobDBImpl::StartBackgroundTasks() {
255   // store a call to a member function and object
256   tqueue_.add(
257       kReclaimOpenFilesPeriodMillisecs,
258       std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
259   tqueue_.add(
260       kDeleteObsoleteFilesPeriodMillisecs,
261       std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
262   tqueue_.add(kSanityCheckPeriodMillisecs,
263               std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
264   tqueue_.add(
265       kEvictExpiredFilesPeriodMillisecs,
266       std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
267 }
268 
GetAllBlobFiles(std::set<uint64_t> * file_numbers)269 Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
270   assert(file_numbers != nullptr);
271   std::vector<std::string> all_files;
272   Status s = env_->GetChildren(blob_dir_, &all_files);
273   if (!s.ok()) {
274     ROCKS_LOG_ERROR(db_options_.info_log,
275                     "Failed to get list of blob files, status: %s",
276                     s.ToString().c_str());
277     return s;
278   }
279 
280   for (const auto& file_name : all_files) {
281     uint64_t file_number;
282     FileType type;
283     bool success = ParseFileName(file_name, &file_number, &type);
284     if (success && type == kBlobFile) {
285       file_numbers->insert(file_number);
286     } else {
287       ROCKS_LOG_WARN(db_options_.info_log,
288                      "Skipping file in blob directory: %s", file_name.c_str());
289     }
290   }
291 
292   return s;
293 }
294 
OpenAllBlobFiles()295 Status BlobDBImpl::OpenAllBlobFiles() {
296   std::set<uint64_t> file_numbers;
297   Status s = GetAllBlobFiles(&file_numbers);
298   if (!s.ok()) {
299     return s;
300   }
301 
302   if (!file_numbers.empty()) {
303     next_file_number_.store(*file_numbers.rbegin() + 1);
304   }
305 
306   std::ostringstream blob_file_oss;
307   std::ostringstream live_imm_oss;
308   std::ostringstream obsolete_file_oss;
309 
310   for (auto& file_number : file_numbers) {
311     std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
312         this, blob_dir_, file_number, db_options_.info_log.get());
313     blob_file->MarkImmutable(/* sequence */ 0);
314 
315     // Read file header and footer
316     Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
317     if (read_metadata_status.IsCorruption()) {
318       // Remove incomplete file.
319       if (!obsolete_files_.empty()) {
320         obsolete_file_oss << ", ";
321       }
322       obsolete_file_oss << file_number;
323 
324       ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
325       continue;
326     } else if (!read_metadata_status.ok()) {
327       ROCKS_LOG_ERROR(db_options_.info_log,
328                       "Unable to read metadata of blob file %" PRIu64
329                       ", status: '%s'",
330                       file_number, read_metadata_status.ToString().c_str());
331       return read_metadata_status;
332     }
333 
334     total_blob_size_ += blob_file->GetFileSize();
335 
336     if (!blob_files_.empty()) {
337       blob_file_oss << ", ";
338     }
339     blob_file_oss << file_number;
340 
341     blob_files_[file_number] = blob_file;
342 
343     if (!blob_file->HasTTL()) {
344       if (!live_imm_non_ttl_blob_files_.empty()) {
345         live_imm_oss << ", ";
346       }
347       live_imm_oss << file_number;
348 
349       live_imm_non_ttl_blob_files_[file_number] = blob_file;
350     }
351   }
352 
353   ROCKS_LOG_INFO(db_options_.info_log,
354                  "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
355                  blob_file_oss.str().c_str());
356   ROCKS_LOG_INFO(
357       db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
358       live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
359   ROCKS_LOG_INFO(db_options_.info_log,
360                  "Found %" ROCKSDB_PRIszt
361                  " incomplete or corrupted blob files: %s",
362                  obsolete_files_.size(), obsolete_file_oss.str().c_str());
363   return s;
364 }
365 
366 template <typename Linker>
LinkSstToBlobFileImpl(uint64_t sst_file_number,uint64_t blob_file_number,Linker linker)367 void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
368                                        uint64_t blob_file_number,
369                                        Linker linker) {
370   assert(bdb_options_.enable_garbage_collection);
371   assert(blob_file_number != kInvalidBlobFileNumber);
372 
373   auto it = blob_files_.find(blob_file_number);
374   if (it == blob_files_.end()) {
375     ROCKS_LOG_WARN(db_options_.info_log,
376                    "Blob file %" PRIu64
377                    " not found while trying to link "
378                    "SST file %" PRIu64,
379                    blob_file_number, sst_file_number);
380     return;
381   }
382 
383   BlobFile* const blob_file = it->second.get();
384   assert(blob_file);
385 
386   linker(blob_file, sst_file_number);
387 
388   ROCKS_LOG_INFO(db_options_.info_log,
389                  "Blob file %" PRIu64 " linked to SST file %" PRIu64,
390                  blob_file_number, sst_file_number);
391 }
392 
LinkSstToBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)393 void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
394                                    uint64_t blob_file_number) {
395   auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
396     WriteLock file_lock(&blob_file->mutex_);
397     blob_file->LinkSstFile(sst_file);
398   };
399 
400   LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
401 }
402 
LinkSstToBlobFileNoLock(uint64_t sst_file_number,uint64_t blob_file_number)403 void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
404                                          uint64_t blob_file_number) {
405   auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
406     blob_file->LinkSstFile(sst_file);
407   };
408 
409   LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
410 }
411 
UnlinkSstFromBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)412 void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
413                                        uint64_t blob_file_number) {
414   assert(bdb_options_.enable_garbage_collection);
415   assert(blob_file_number != kInvalidBlobFileNumber);
416 
417   auto it = blob_files_.find(blob_file_number);
418   if (it == blob_files_.end()) {
419     ROCKS_LOG_WARN(db_options_.info_log,
420                    "Blob file %" PRIu64
421                    " not found while trying to unlink "
422                    "SST file %" PRIu64,
423                    blob_file_number, sst_file_number);
424     return;
425   }
426 
427   BlobFile* const blob_file = it->second.get();
428   assert(blob_file);
429 
430   {
431     WriteLock file_lock(&blob_file->mutex_);
432     blob_file->UnlinkSstFile(sst_file_number);
433   }
434 
435   ROCKS_LOG_INFO(db_options_.info_log,
436                  "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
437                  blob_file_number, sst_file_number);
438 }
439 
InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)440 void BlobDBImpl::InitializeBlobFileToSstMapping(
441     const std::vector<LiveFileMetaData>& live_files) {
442   assert(bdb_options_.enable_garbage_collection);
443 
444   for (const auto& live_file : live_files) {
445     const uint64_t sst_file_number = live_file.file_number;
446     const uint64_t blob_file_number = live_file.oldest_blob_file_number;
447 
448     if (blob_file_number == kInvalidBlobFileNumber) {
449       continue;
450     }
451 
452     LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
453   }
454 }
455 
ProcessFlushJobInfo(const FlushJobInfo & info)456 void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
457   assert(bdb_options_.enable_garbage_collection);
458 
459   WriteLock lock(&mutex_);
460 
461   if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
462     LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
463   }
464 
465   assert(flush_sequence_ < info.largest_seqno);
466   flush_sequence_ = info.largest_seqno;
467 
468   MarkUnreferencedBlobFilesObsolete();
469 }
470 
ProcessCompactionJobInfo(const CompactionJobInfo & info)471 void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
472   assert(bdb_options_.enable_garbage_collection);
473 
474   if (!info.status.ok()) {
475     return;
476   }
477 
478   // Note: the same SST file may appear in both the input and the output
479   // file list in case of a trivial move. We walk through the two lists
480   // below in a fashion that's similar to merge sort to detect this.
481 
482   auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
483     return lhs.file_number < rhs.file_number;
484   };
485 
486   auto inputs = info.input_file_infos;
487   auto iit = inputs.begin();
488   const auto iit_end = inputs.end();
489 
490   std::sort(iit, iit_end, cmp);
491 
492   auto outputs = info.output_file_infos;
493   auto oit = outputs.begin();
494   const auto oit_end = outputs.end();
495 
496   std::sort(oit, oit_end, cmp);
497 
498   WriteLock lock(&mutex_);
499 
500   while (iit != iit_end && oit != oit_end) {
501     const auto& input = *iit;
502     const auto& output = *oit;
503 
504     if (input.file_number == output.file_number) {
505       ++iit;
506       ++oit;
507     } else if (input.file_number < output.file_number) {
508       if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
509         UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
510       }
511 
512       ++iit;
513     } else {
514       assert(output.file_number < input.file_number);
515 
516       if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
517         LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
518       }
519 
520       ++oit;
521     }
522   }
523 
524   while (iit != iit_end) {
525     const auto& input = *iit;
526 
527     if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
528       UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
529     }
530 
531     ++iit;
532   }
533 
534   while (oit != oit_end) {
535     const auto& output = *oit;
536 
537     if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
538       LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
539     }
540 
541     ++oit;
542   }
543 
544   MarkUnreferencedBlobFilesObsolete();
545 }
546 
MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq)547 bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
548     const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
549   assert(blob_file);
550   assert(!blob_file->HasTTL());
551   assert(blob_file->Immutable());
552   assert(bdb_options_.enable_garbage_collection);
553 
554   // Note: FIFO eviction could have marked this file obsolete already.
555   if (blob_file->Obsolete()) {
556     return true;
557   }
558 
559   // We cannot mark this file (or any higher-numbered files for that matter)
560   // obsolete if it is referenced by any memtables or SSTs. We keep track of
561   // the SSTs explicitly. To account for memtables, we keep track of the highest
562   // sequence number received in flush notifications, and we do not mark the
563   // blob file obsolete if there are still unflushed memtables from before
564   // the time the blob file was closed.
565   if (blob_file->GetImmutableSequence() > flush_sequence_ ||
566       !blob_file->GetLinkedSstFiles().empty()) {
567     return false;
568   }
569 
570   ROCKS_LOG_INFO(db_options_.info_log,
571                  "Blob file %" PRIu64 " is no longer needed, marking obsolete",
572                  blob_file->BlobFileNumber());
573 
574   ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
575   return true;
576 }
577 
578 template <class Functor>
MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed)579 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
580   assert(bdb_options_.enable_garbage_collection);
581 
582   // Iterate through all live immutable non-TTL blob files, and mark them
583   // obsolete assuming no SST files or memtables rely on the blobs in them.
584   // Note: we need to stop as soon as we find a blob file that has any
585   // linked SSTs (or one potentially referenced by memtables).
586 
587   uint64_t obsoleted_files = 0;
588 
589   auto it = live_imm_non_ttl_blob_files_.begin();
590   while (it != live_imm_non_ttl_blob_files_.end()) {
591     const auto& blob_file = it->second;
592     assert(blob_file);
593     assert(blob_file->BlobFileNumber() == it->first);
594     assert(!blob_file->HasTTL());
595     assert(blob_file->Immutable());
596 
597     // Small optimization: Obsolete() does an atomic read, so we can do
598     // this check without taking a lock on the blob file's mutex.
599     if (blob_file->Obsolete()) {
600       it = live_imm_non_ttl_blob_files_.erase(it);
601       continue;
602     }
603 
604     if (!mark_if_needed(blob_file)) {
605       break;
606     }
607 
608     it = live_imm_non_ttl_blob_files_.erase(it);
609 
610     ++obsoleted_files;
611   }
612 
613   if (obsoleted_files > 0) {
614     ROCKS_LOG_INFO(db_options_.info_log,
615                    "%" PRIu64 " blob file(s) marked obsolete by GC",
616                    obsoleted_files);
617     RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
618   }
619 }
620 
MarkUnreferencedBlobFilesObsolete()621 void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
622   const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
623 
624   MarkUnreferencedBlobFilesObsoleteImpl(
625       [=](const std::shared_ptr<BlobFile>& blob_file) {
626         WriteLock file_lock(&blob_file->mutex_);
627         return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
628       });
629 }
630 
MarkUnreferencedBlobFilesObsoleteDuringOpen()631 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
632   MarkUnreferencedBlobFilesObsoleteImpl(
633       [=](const std::shared_ptr<BlobFile>& blob_file) {
634         return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
635       });
636 }
637 
CloseRandomAccessLocked(const std::shared_ptr<BlobFile> & bfile)638 void BlobDBImpl::CloseRandomAccessLocked(
639     const std::shared_ptr<BlobFile>& bfile) {
640   bfile->CloseRandomAccessLocked();
641   open_file_count_--;
642 }
643 
GetBlobFileReader(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<RandomAccessFileReader> * reader)644 Status BlobDBImpl::GetBlobFileReader(
645     const std::shared_ptr<BlobFile>& blob_file,
646     std::shared_ptr<RandomAccessFileReader>* reader) {
647   assert(reader != nullptr);
648   bool fresh_open = false;
649   Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
650   if (s.ok() && fresh_open) {
651     assert(*reader != nullptr);
652     open_file_count_++;
653   }
654   return s;
655 }
656 
NewBlobFile(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason)657 std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
658     bool has_ttl, const ExpirationRange& expiration_range,
659     const std::string& reason) {
660   assert(has_ttl == (expiration_range.first || expiration_range.second));
661 
662   uint64_t file_num = next_file_number_++;
663 
664   const uint32_t column_family_id =
665       static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
666   auto blob_file = std::make_shared<BlobFile>(
667       this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
668       bdb_options_.compression, has_ttl, expiration_range);
669 
670   ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
671                   blob_file->PathName().c_str(), reason.c_str());
672   LogFlush(db_options_.info_log);
673 
674   return blob_file;
675 }
676 
RegisterBlobFile(std::shared_ptr<BlobFile> blob_file)677 void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
678   const uint64_t blob_file_number = blob_file->BlobFileNumber();
679 
680   auto it = blob_files_.lower_bound(blob_file_number);
681   assert(it == blob_files_.end() || it->first != blob_file_number);
682 
683   blob_files_.insert(it,
684                      std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
685                          blob_file_number, std::move(blob_file)));
686 }
687 
CreateWriterLocked(const std::shared_ptr<BlobFile> & bfile)688 Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
689   std::string fpath(bfile->PathName());
690   std::unique_ptr<WritableFile> wfile;
691 
692   Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_);
693   if (!s.ok()) {
694     ROCKS_LOG_ERROR(db_options_.info_log,
695                     "Failed to open blob file for write: %s status: '%s'"
696                     " exists: '%s'",
697                     fpath.c_str(), s.ToString().c_str(),
698                     env_->FileExists(fpath).ToString().c_str());
699     return s;
700   }
701 
702   std::unique_ptr<WritableFileWriter> fwriter;
703   fwriter.reset(new WritableFileWriter(
704       NewLegacyWritableFileWrapper(std::move(wfile)), fpath, env_options_));
705 
706   uint64_t boffset = bfile->GetFileSize();
707   if (debug_level_ >= 2 && boffset) {
708     ROCKS_LOG_DEBUG(db_options_.info_log,
709                     "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
710                     boffset);
711   }
712 
713   Writer::ElemType et = Writer::kEtNone;
714   if (bfile->file_size_ == BlobLogHeader::kSize) {
715     et = Writer::kEtFileHdr;
716   } else if (bfile->file_size_ > BlobLogHeader::kSize) {
717     et = Writer::kEtRecord;
718   } else if (bfile->file_size_) {
719     ROCKS_LOG_WARN(db_options_.info_log,
720                    "Open blob file: %s with wrong size: %" PRIu64,
721                    fpath.c_str(), boffset);
722     return Status::Corruption("Invalid blob file size");
723   }
724 
725   bfile->log_writer_ = std::make_shared<Writer>(
726       std::move(fwriter), env_, statistics_, bfile->file_number_,
727       bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
728   bfile->log_writer_->last_elem_type_ = et;
729 
730   return s;
731 }
732 
FindBlobFileLocked(uint64_t expiration) const733 std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
734     uint64_t expiration) const {
735   if (open_ttl_files_.empty()) {
736     return nullptr;
737   }
738 
739   std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
740   tmp->SetHasTTL(true);
741   tmp->expiration_range_ = std::make_pair(expiration, 0);
742   tmp->file_number_ = std::numeric_limits<uint64_t>::max();
743 
744   auto citr = open_ttl_files_.equal_range(tmp);
745   if (citr.first == open_ttl_files_.end()) {
746     assert(citr.second == open_ttl_files_.end());
747 
748     std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
749     return (check->expiration_range_.second <= expiration) ? nullptr : check;
750   }
751 
752   if (citr.first != citr.second) {
753     return *(citr.first);
754   }
755 
756   auto finditr = citr.second;
757   if (finditr != open_ttl_files_.begin()) {
758     --finditr;
759   }
760 
761   bool b2 = (*finditr)->expiration_range_.second <= expiration;
762   bool b1 = (*finditr)->expiration_range_.first > expiration;
763 
764   return (b1 || b2) ? nullptr : (*finditr);
765 }
766 
CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<Writer> * writer)767 Status BlobDBImpl::CheckOrCreateWriterLocked(
768     const std::shared_ptr<BlobFile>& blob_file,
769     std::shared_ptr<Writer>* writer) {
770   assert(writer != nullptr);
771   *writer = blob_file->GetWriter();
772   if (*writer != nullptr) {
773     return Status::OK();
774   }
775   Status s = CreateWriterLocked(blob_file);
776   if (s.ok()) {
777     *writer = blob_file->GetWriter();
778   }
779   return s;
780 }
781 
CreateBlobFileAndWriter(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason,std::shared_ptr<BlobFile> * blob_file,std::shared_ptr<Writer> * writer)782 Status BlobDBImpl::CreateBlobFileAndWriter(
783     bool has_ttl, const ExpirationRange& expiration_range,
784     const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
785     std::shared_ptr<Writer>* writer) {
786   assert(has_ttl == (expiration_range.first || expiration_range.second));
787   assert(blob_file);
788   assert(writer);
789 
790   *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
791   assert(*blob_file);
792 
793   // file not visible, hence no lock
794   Status s = CheckOrCreateWriterLocked(*blob_file, writer);
795   if (!s.ok()) {
796     ROCKS_LOG_ERROR(db_options_.info_log,
797                     "Failed to get writer for blob file: %s, error: %s",
798                     (*blob_file)->PathName().c_str(), s.ToString().c_str());
799     return s;
800   }
801 
802   assert(*writer);
803 
804   s = (*writer)->WriteHeader((*blob_file)->header_);
805   if (!s.ok()) {
806     ROCKS_LOG_ERROR(db_options_.info_log,
807                     "Failed to write header to new blob file: %s"
808                     " status: '%s'",
809                     (*blob_file)->PathName().c_str(), s.ToString().c_str());
810     return s;
811   }
812 
813   (*blob_file)->SetFileSize(BlobLogHeader::kSize);
814   total_blob_size_ += BlobLogHeader::kSize;
815 
816   return s;
817 }
818 
SelectBlobFile(std::shared_ptr<BlobFile> * blob_file)819 Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
820   assert(blob_file);
821 
822   {
823     ReadLock rl(&mutex_);
824 
825     if (open_non_ttl_file_) {
826       assert(!open_non_ttl_file_->Immutable());
827       *blob_file = open_non_ttl_file_;
828       return Status::OK();
829     }
830   }
831 
832   // Check again
833   WriteLock wl(&mutex_);
834 
835   if (open_non_ttl_file_) {
836     assert(!open_non_ttl_file_->Immutable());
837     *blob_file = open_non_ttl_file_;
838     return Status::OK();
839   }
840 
841   std::shared_ptr<Writer> writer;
842   const Status s = CreateBlobFileAndWriter(
843       /* has_ttl */ false, ExpirationRange(),
844       /* reason */ "SelectBlobFile", blob_file, &writer);
845   if (!s.ok()) {
846     return s;
847   }
848 
849   RegisterBlobFile(*blob_file);
850   open_non_ttl_file_ = *blob_file;
851 
852   return s;
853 }
854 
SelectBlobFileTTL(uint64_t expiration,std::shared_ptr<BlobFile> * blob_file)855 Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
856                                      std::shared_ptr<BlobFile>* blob_file) {
857   assert(blob_file);
858   assert(expiration != kNoExpiration);
859 
860   {
861     ReadLock rl(&mutex_);
862 
863     *blob_file = FindBlobFileLocked(expiration);
864     if (*blob_file != nullptr) {
865       assert(!(*blob_file)->Immutable());
866       return Status::OK();
867     }
868   }
869 
870   // Check again
871   WriteLock wl(&mutex_);
872 
873   *blob_file = FindBlobFileLocked(expiration);
874   if (*blob_file != nullptr) {
875     assert(!(*blob_file)->Immutable());
876     return Status::OK();
877   }
878 
879   const uint64_t exp_low =
880       (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
881   const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
882   const ExpirationRange expiration_range(exp_low, exp_high);
883 
884   std::ostringstream oss;
885   oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
886 
887   std::shared_ptr<Writer> writer;
888   const Status s =
889       CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
890                               /* reason */ oss.str(), blob_file, &writer);
891   if (!s.ok()) {
892     return s;
893   }
894 
895   RegisterBlobFile(*blob_file);
896   open_ttl_files_.insert(*blob_file);
897 
898   return s;
899 }
900 
901 class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
902  private:
903   const WriteOptions& options_;
904   BlobDBImpl* blob_db_impl_;
905   uint32_t default_cf_id_;
906   WriteBatch batch_;
907 
908  public:
BlobInserter(const WriteOptions & options,BlobDBImpl * blob_db_impl,uint32_t default_cf_id)909   BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
910                uint32_t default_cf_id)
911       : options_(options),
912         blob_db_impl_(blob_db_impl),
913         default_cf_id_(default_cf_id) {}
914 
batch()915   WriteBatch* batch() { return &batch_; }
916 
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)917   Status PutCF(uint32_t column_family_id, const Slice& key,
918                const Slice& value) override {
919     if (column_family_id != default_cf_id_) {
920       return Status::NotSupported(
921           "Blob DB doesn't support non-default column family.");
922     }
923     Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
924                                            &batch_);
925     return s;
926   }
927 
DeleteCF(uint32_t column_family_id,const Slice & key)928   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
929     if (column_family_id != default_cf_id_) {
930       return Status::NotSupported(
931           "Blob DB doesn't support non-default column family.");
932     }
933     Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
934     return s;
935   }
936 
DeleteRange(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)937   virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
938                              const Slice& end_key) {
939     if (column_family_id != default_cf_id_) {
940       return Status::NotSupported(
941           "Blob DB doesn't support non-default column family.");
942     }
943     Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
944                                                begin_key, end_key);
945     return s;
946   }
947 
SingleDeleteCF(uint32_t,const Slice &)948   Status SingleDeleteCF(uint32_t /*column_family_id*/,
949                         const Slice& /*key*/) override {
950     return Status::NotSupported("Not supported operation in blob db.");
951   }
952 
MergeCF(uint32_t,const Slice &,const Slice &)953   Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
954                  const Slice& /*value*/) override {
955     return Status::NotSupported("Not supported operation in blob db.");
956   }
957 
LogData(const Slice & blob)958   void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
959 };
960 
Write(const WriteOptions & options,WriteBatch * updates)961 Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
962   StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
963   RecordTick(statistics_, BLOB_DB_NUM_WRITE);
964   uint32_t default_cf_id =
965       reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
966   Status s;
967   BlobInserter blob_inserter(options, this, default_cf_id);
968   {
969     // Release write_mutex_ before DB write to avoid race condition with
970     // flush begin listener, which also require write_mutex_ to sync
971     // blob files.
972     MutexLock l(&write_mutex_);
973     s = updates->Iterate(&blob_inserter);
974   }
975   if (!s.ok()) {
976     return s;
977   }
978   return db_->Write(options, blob_inserter.batch());
979 }
980 
Put(const WriteOptions & options,const Slice & key,const Slice & value)981 Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
982                        const Slice& value) {
983   return PutUntil(options, key, value, kNoExpiration);
984 }
985 
PutWithTTL(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t ttl)986 Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
987                               const Slice& key, const Slice& value,
988                               uint64_t ttl) {
989   uint64_t now = EpochNow();
990   uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
991   return PutUntil(options, key, value, expiration);
992 }
993 
PutUntil(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t expiration)994 Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
995                             const Slice& value, uint64_t expiration) {
996   StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
997   RecordTick(statistics_, BLOB_DB_NUM_PUT);
998   Status s;
999   WriteBatch batch;
1000   {
1001     // Release write_mutex_ before DB write to avoid race condition with
1002     // flush begin listener, which also require write_mutex_ to sync
1003     // blob files.
1004     MutexLock l(&write_mutex_);
1005     s = PutBlobValue(options, key, value, expiration, &batch);
1006   }
1007   if (s.ok()) {
1008     s = db_->Write(options, &batch);
1009   }
1010   return s;
1011 }
1012 
PutBlobValue(const WriteOptions &,const Slice & key,const Slice & value,uint64_t expiration,WriteBatch * batch)1013 Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
1014                                 const Slice& key, const Slice& value,
1015                                 uint64_t expiration, WriteBatch* batch) {
1016   write_mutex_.AssertHeld();
1017   Status s;
1018   std::string index_entry;
1019   uint32_t column_family_id =
1020       reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
1021   if (value.size() < bdb_options_.min_blob_size) {
1022     if (expiration == kNoExpiration) {
1023       // Put as normal value
1024       s = batch->Put(key, value);
1025       RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
1026     } else {
1027       // Inlined with TTL
1028       BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
1029       s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1030                                            index_entry);
1031       RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
1032     }
1033   } else {
1034     std::string compression_output;
1035     Slice value_compressed = GetCompressedSlice(value, &compression_output);
1036 
1037     std::string headerbuf;
1038     Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
1039 
1040     // Check DB size limit before selecting blob file to
1041     // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
1042     // done before calling SelectBlobFile().
1043     s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
1044                                    value_compressed.size());
1045     if (!s.ok()) {
1046       return s;
1047     }
1048 
1049     std::shared_ptr<BlobFile> blob_file;
1050     if (expiration != kNoExpiration) {
1051       s = SelectBlobFileTTL(expiration, &blob_file);
1052     } else {
1053       s = SelectBlobFile(&blob_file);
1054     }
1055     if (s.ok()) {
1056       assert(blob_file != nullptr);
1057       assert(blob_file->GetCompressionType() == bdb_options_.compression);
1058       s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
1059                      &index_entry);
1060     }
1061     if (s.ok()) {
1062       if (expiration != kNoExpiration) {
1063         WriteLock file_lock(&blob_file->mutex_);
1064         blob_file->ExtendExpirationRange(expiration);
1065       }
1066       s = CloseBlobFileIfNeeded(blob_file);
1067     }
1068     if (s.ok()) {
1069       s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1070                                            index_entry);
1071     }
1072     if (s.ok()) {
1073       if (expiration == kNoExpiration) {
1074         RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
1075       } else {
1076         RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
1077       }
1078     } else {
1079       ROCKS_LOG_ERROR(
1080           db_options_.info_log,
1081           "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
1082           " status: '%s' blob_file: '%s'",
1083           blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
1084           s.ToString().c_str(), blob_file->DumpState().c_str());
1085     }
1086   }
1087 
1088   RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
1089   RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
1090   RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
1091   RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
1092 
1093   return s;
1094 }
1095 
GetCompressedSlice(const Slice & raw,std::string * compression_output) const1096 Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
1097                                      std::string* compression_output) const {
1098   if (bdb_options_.compression == kNoCompression) {
1099     return raw;
1100   }
1101   StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
1102   CompressionType type = bdb_options_.compression;
1103   CompressionOptions opts;
1104   CompressionContext context(type);
1105   CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
1106                        0 /* sample_for_compression */);
1107   CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
1108                 compression_output, nullptr, nullptr);
1109   return *compression_output;
1110 }
1111 
CompactFiles(const CompactionOptions & compact_options,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)1112 Status BlobDBImpl::CompactFiles(
1113     const CompactionOptions& compact_options,
1114     const std::vector<std::string>& input_file_names, const int output_level,
1115     const int output_path_id, std::vector<std::string>* const output_file_names,
1116     CompactionJobInfo* compaction_job_info) {
1117   // Note: we need CompactionJobInfo to be able to track updates to the
1118   // blob file <-> SST mappings, so we provide one if the user hasn't,
1119   // assuming that GC is enabled.
1120   CompactionJobInfo info{};
1121   if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
1122     compaction_job_info = &info;
1123   }
1124 
1125   const Status s =
1126       db_->CompactFiles(compact_options, input_file_names, output_level,
1127                         output_path_id, output_file_names, compaction_job_info);
1128   if (!s.ok()) {
1129     return s;
1130   }
1131 
1132   if (bdb_options_.enable_garbage_collection) {
1133     assert(compaction_job_info);
1134     ProcessCompactionJobInfo(*compaction_job_info);
1135   }
1136 
1137   return s;
1138 }
1139 
GetCompactionContextCommon(BlobCompactionContext * context) const1140 void BlobDBImpl::GetCompactionContextCommon(
1141     BlobCompactionContext* context) const {
1142   assert(context);
1143 
1144   context->next_file_number = next_file_number_.load();
1145   context->current_blob_files.clear();
1146   for (auto& p : blob_files_) {
1147     context->current_blob_files.insert(p.first);
1148   }
1149   context->fifo_eviction_seq = fifo_eviction_seq_;
1150   context->evict_expiration_up_to = evict_expiration_up_to_;
1151 }
1152 
GetCompactionContext(BlobCompactionContext * context)1153 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
1154   assert(context);
1155 
1156   ReadLock l(&mutex_);
1157   GetCompactionContextCommon(context);
1158 }
1159 
GetCompactionContext(BlobCompactionContext * context,BlobCompactionContextGC * context_gc)1160 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
1161                                       BlobCompactionContextGC* context_gc) {
1162   assert(context);
1163   assert(context_gc);
1164 
1165   ReadLock l(&mutex_);
1166   GetCompactionContextCommon(context);
1167 
1168   context_gc->blob_db_impl = this;
1169 
1170   if (!live_imm_non_ttl_blob_files_.empty()) {
1171     auto it = live_imm_non_ttl_blob_files_.begin();
1172     std::advance(it, bdb_options_.garbage_collection_cutoff *
1173                          live_imm_non_ttl_blob_files_.size());
1174     context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
1175                                          ? it->first
1176                                          : std::numeric_limits<uint64_t>::max();
1177   }
1178 }
1179 
UpdateLiveSSTSize()1180 void BlobDBImpl::UpdateLiveSSTSize() {
1181   uint64_t live_sst_size = 0;
1182   bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
1183   if (ok) {
1184     live_sst_size_.store(live_sst_size);
1185     ROCKS_LOG_INFO(db_options_.info_log,
1186                    "Updated total SST file size: %" PRIu64 " bytes.",
1187                    live_sst_size);
1188   } else {
1189     ROCKS_LOG_ERROR(
1190         db_options_.info_log,
1191         "Failed to update total SST file size after flush or compaction.");
1192   }
1193   {
1194     // Trigger FIFO eviction if needed.
1195     MutexLock l(&write_mutex_);
1196     Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
1197     if (s.IsNoSpace()) {
1198       ROCKS_LOG_WARN(db_options_.info_log,
1199                      "DB grow out-of-space after SST size updated. Current live"
1200                      " SST size: %" PRIu64
1201                      " , current blob files size: %" PRIu64 ".",
1202                      live_sst_size_.load(), total_blob_size_.load());
1203     }
1204   }
1205 }
1206 
CheckSizeAndEvictBlobFiles(uint64_t blob_size,bool force_evict)1207 Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
1208                                               bool force_evict) {
1209   write_mutex_.AssertHeld();
1210 
1211   uint64_t live_sst_size = live_sst_size_.load();
1212   if (bdb_options_.max_db_size == 0 ||
1213       live_sst_size + total_blob_size_.load() + blob_size <=
1214           bdb_options_.max_db_size) {
1215     return Status::OK();
1216   }
1217 
1218   if (bdb_options_.is_fifo == false ||
1219       (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
1220     // FIFO eviction is disabled, or no space to insert new blob even we evict
1221     // all blob files.
1222     return Status::NoSpace(
1223         "Write failed, as writing it would exceed max_db_size limit.");
1224   }
1225 
1226   std::vector<std::shared_ptr<BlobFile>> candidate_files;
1227   CopyBlobFiles(&candidate_files);
1228   std::sort(candidate_files.begin(), candidate_files.end(),
1229             BlobFileComparator());
1230   fifo_eviction_seq_ = GetLatestSequenceNumber();
1231 
1232   WriteLock l(&mutex_);
1233 
1234   while (!candidate_files.empty() &&
1235          live_sst_size + total_blob_size_.load() + blob_size >
1236              bdb_options_.max_db_size) {
1237     std::shared_ptr<BlobFile> blob_file = candidate_files.back();
1238     candidate_files.pop_back();
1239     WriteLock file_lock(&blob_file->mutex_);
1240     if (blob_file->Obsolete()) {
1241       // File already obsoleted by someone else.
1242       assert(blob_file->Immutable());
1243       continue;
1244     }
1245     // FIFO eviction can evict open blob files.
1246     if (!blob_file->Immutable()) {
1247       Status s = CloseBlobFile(blob_file);
1248       if (!s.ok()) {
1249         return s;
1250       }
1251     }
1252     assert(blob_file->Immutable());
1253     auto expiration_range = blob_file->GetExpirationRange();
1254     ROCKS_LOG_INFO(db_options_.info_log,
1255                    "Evict oldest blob file since DB out of space. Current "
1256                    "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
1257                    ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
1258                    ".",
1259                    live_sst_size, total_blob_size_.load(),
1260                    bdb_options_.max_db_size, blob_file->BlobFileNumber());
1261     ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
1262     evict_expiration_up_to_ = expiration_range.first;
1263     RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
1264     RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
1265                blob_file->BlobCount());
1266     RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
1267                blob_file->GetFileSize());
1268     TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
1269   }
1270   if (live_sst_size + total_blob_size_.load() + blob_size >
1271       bdb_options_.max_db_size) {
1272     return Status::NoSpace(
1273         "Write failed, as writing it would exceed max_db_size limit.");
1274   }
1275   return Status::OK();
1276 }
1277 
AppendBlob(const std::shared_ptr<BlobFile> & bfile,const std::string & headerbuf,const Slice & key,const Slice & value,uint64_t expiration,std::string * index_entry)1278 Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
1279                               const std::string& headerbuf, const Slice& key,
1280                               const Slice& value, uint64_t expiration,
1281                               std::string* index_entry) {
1282   Status s;
1283   uint64_t blob_offset = 0;
1284   uint64_t key_offset = 0;
1285   {
1286     WriteLock lockbfile_w(&bfile->mutex_);
1287     std::shared_ptr<Writer> writer;
1288     s = CheckOrCreateWriterLocked(bfile, &writer);
1289     if (!s.ok()) {
1290       return s;
1291     }
1292 
1293     // write the blob to the blob log.
1294     s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
1295                                    &blob_offset);
1296   }
1297 
1298   if (!s.ok()) {
1299     ROCKS_LOG_ERROR(db_options_.info_log,
1300                     "Invalid status in AppendBlob: %s status: '%s'",
1301                     bfile->PathName().c_str(), s.ToString().c_str());
1302     return s;
1303   }
1304 
1305   uint64_t size_put = headerbuf.size() + key.size() + value.size();
1306   bfile->BlobRecordAdded(size_put);
1307   total_blob_size_ += size_put;
1308 
1309   if (expiration == kNoExpiration) {
1310     BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
1311                           value.size(), bdb_options_.compression);
1312   } else {
1313     BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
1314                              blob_offset, value.size(),
1315                              bdb_options_.compression);
1316   }
1317 
1318   return s;
1319 }
1320 
MultiGet(const ReadOptions & read_options,const std::vector<Slice> & keys,std::vector<std::string> * values)1321 std::vector<Status> BlobDBImpl::MultiGet(
1322     const ReadOptions& read_options,
1323     const std::vector<Slice>& keys, std::vector<std::string>* values) {
1324   StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
1325   RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
1326   // Get a snapshot to avoid blob file get deleted between we
1327   // fetch and index entry and reading from the file.
1328   ReadOptions ro(read_options);
1329   bool snapshot_created = SetSnapshotIfNeeded(&ro);
1330 
1331   std::vector<Status> statuses;
1332   statuses.reserve(keys.size());
1333   values->clear();
1334   values->reserve(keys.size());
1335   PinnableSlice value;
1336   for (size_t i = 0; i < keys.size(); i++) {
1337     statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
1338     values->push_back(value.ToString());
1339     value.Reset();
1340   }
1341   if (snapshot_created) {
1342     db_->ReleaseSnapshot(ro.snapshot);
1343   }
1344   return statuses;
1345 }
1346 
SetSnapshotIfNeeded(ReadOptions * read_options)1347 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
1348   assert(read_options != nullptr);
1349   if (read_options->snapshot != nullptr) {
1350     return false;
1351   }
1352   read_options->snapshot = db_->GetSnapshot();
1353   return true;
1354 }
1355 
GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value,uint64_t * expiration)1356 Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
1357                                 PinnableSlice* value, uint64_t* expiration) {
1358   assert(value);
1359 
1360   BlobIndex blob_index;
1361   Status s = blob_index.DecodeFrom(index_entry);
1362   if (!s.ok()) {
1363     return s;
1364   }
1365 
1366   if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
1367     return Status::NotFound("Key expired");
1368   }
1369 
1370   if (expiration != nullptr) {
1371     if (blob_index.HasTTL()) {
1372       *expiration = blob_index.expiration();
1373     } else {
1374       *expiration = kNoExpiration;
1375     }
1376   }
1377 
1378   if (blob_index.IsInlined()) {
1379     // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
1380     // memory buffer to avoid extra copy.
1381     value->PinSelf(blob_index.value());
1382     return Status::OK();
1383   }
1384 
1385   CompressionType compression_type = kNoCompression;
1386   s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
1387                          blob_index.size(), value, &compression_type);
1388   if (!s.ok()) {
1389     return s;
1390   }
1391 
1392   if (compression_type != kNoCompression) {
1393     BlockContents contents;
1394     auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
1395 
1396     {
1397       StopWatch decompression_sw(env_, statistics_,
1398                                  BLOB_DB_DECOMPRESSION_MICROS);
1399       UncompressionContext context(compression_type);
1400       UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
1401                              compression_type);
1402       s = UncompressBlockContentsForCompressionType(
1403           info, value->data(), value->size(), &contents,
1404           kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
1405     }
1406 
1407     if (!s.ok()) {
1408       if (debug_level_ >= 2) {
1409         ROCKS_LOG_ERROR(
1410             db_options_.info_log,
1411             "Uncompression error during blob read from file: %" PRIu64
1412             " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1413             " key: %s status: '%s'",
1414             blob_index.file_number(), blob_index.offset(), blob_index.size(),
1415             key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1416       }
1417 
1418       return Status::Corruption("Unable to uncompress blob.");
1419     }
1420 
1421     value->PinSelf(contents.data);
1422   }
1423 
1424   return Status::OK();
1425 }
1426 
GetRawBlobFromFile(const Slice & key,uint64_t file_number,uint64_t offset,uint64_t size,PinnableSlice * value,CompressionType * compression_type)1427 Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
1428                                       uint64_t offset, uint64_t size,
1429                                       PinnableSlice* value,
1430                                       CompressionType* compression_type) {
1431   assert(value);
1432   assert(compression_type);
1433   assert(*compression_type == kNoCompression);
1434 
1435   if (!size) {
1436     value->PinSelf("");
1437     return Status::OK();
1438   }
1439 
1440   // offset has to have certain min, as we will read CRC
1441   // later from the Blob Header, which needs to be also a
1442   // valid offset.
1443   if (offset <
1444       (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
1445     if (debug_level_ >= 2) {
1446       ROCKS_LOG_ERROR(db_options_.info_log,
1447                       "Invalid blob index file_number: %" PRIu64
1448                       " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1449                       " key: %s",
1450                       file_number, offset, size,
1451                       key.ToString(/* output_hex */ true).c_str());
1452     }
1453 
1454     return Status::NotFound("Invalid blob offset");
1455   }
1456 
1457   std::shared_ptr<BlobFile> blob_file;
1458 
1459   {
1460     ReadLock rl(&mutex_);
1461     auto it = blob_files_.find(file_number);
1462 
1463     // file was deleted
1464     if (it == blob_files_.end()) {
1465       return Status::NotFound("Blob Not Found as blob file missing");
1466     }
1467 
1468     blob_file = it->second;
1469   }
1470 
1471   *compression_type = blob_file->GetCompressionType();
1472 
1473   // takes locks when called
1474   std::shared_ptr<RandomAccessFileReader> reader;
1475   Status s = GetBlobFileReader(blob_file, &reader);
1476   if (!s.ok()) {
1477     return s;
1478   }
1479 
1480   assert(offset >= key.size() + sizeof(uint32_t));
1481   const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
1482   const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
1483 
1484   // Allocate the buffer. This is safe in C++11
1485   std::string buf;
1486   AlignedBuf aligned_buf;
1487 
1488   // A partial blob record contain checksum, key and value.
1489   Slice blob_record;
1490 
1491   {
1492     StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
1493     if (reader->use_direct_io()) {
1494       s = reader->Read(record_offset, static_cast<size_t>(record_size),
1495                        &blob_record, nullptr, &aligned_buf);
1496     } else {
1497       buf.reserve(static_cast<size_t>(record_size));
1498       s = reader->Read(record_offset, static_cast<size_t>(record_size),
1499                        &blob_record, &buf[0], nullptr);
1500     }
1501     RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
1502   }
1503 
1504   if (!s.ok()) {
1505     ROCKS_LOG_DEBUG(
1506         db_options_.info_log,
1507         "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1508         ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
1509         file_number, offset, size, key.size(), s.ToString().c_str());
1510     return s;
1511   }
1512 
1513   if (blob_record.size() != record_size) {
1514     ROCKS_LOG_DEBUG(
1515         db_options_.info_log,
1516         "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1517         ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
1518         ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
1519         file_number, offset, size, key.size(), blob_record.size(), record_size);
1520 
1521     return Status::Corruption("Failed to retrieve blob from blob index.");
1522   }
1523 
1524   Slice crc_slice(blob_record.data(), sizeof(uint32_t));
1525   Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
1526                    static_cast<size_t>(size));
1527 
1528   uint32_t crc_exp = 0;
1529   if (!GetFixed32(&crc_slice, &crc_exp)) {
1530     ROCKS_LOG_DEBUG(
1531         db_options_.info_log,
1532         "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
1533         ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
1534         file_number, offset, size, key.size(), s.ToString().c_str());
1535     return Status::Corruption("Unable to decode checksum.");
1536   }
1537 
1538   uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
1539                                blob_record.size() - sizeof(uint32_t));
1540   crc = crc32c::Mask(crc);  // Adjust for storage
1541   if (crc != crc_exp) {
1542     if (debug_level_ >= 2) {
1543       ROCKS_LOG_ERROR(
1544           db_options_.info_log,
1545           "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
1546           " blob_size: %" PRIu64 " key: %s status: '%s'",
1547           file_number, offset, size,
1548           key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1549     }
1550 
1551     return Status::Corruption("Corruption. Blob CRC mismatch");
1552   }
1553 
1554   value->PinSelf(blob_value);
1555 
1556   return Status::OK();
1557 }
1558 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1559 Status BlobDBImpl::Get(const ReadOptions& read_options,
1560                        ColumnFamilyHandle* column_family, const Slice& key,
1561                        PinnableSlice* value) {
1562   return Get(read_options, column_family, key, value,
1563              static_cast<uint64_t*>(nullptr) /*expiration*/);
1564 }
1565 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1566 Status BlobDBImpl::Get(const ReadOptions& read_options,
1567                        ColumnFamilyHandle* column_family, const Slice& key,
1568                        PinnableSlice* value, uint64_t* expiration) {
1569   StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
1570   RecordTick(statistics_, BLOB_DB_NUM_GET);
1571   return GetImpl(read_options, column_family, key, value, expiration);
1572 }
1573 
GetImpl(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1574 Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
1575                            ColumnFamilyHandle* column_family, const Slice& key,
1576                            PinnableSlice* value, uint64_t* expiration) {
1577   if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
1578     return Status::NotSupported(
1579         "Blob DB doesn't support non-default column family.");
1580   }
1581   // Get a snapshot to avoid blob file get deleted between we
1582   // fetch and index entry and reading from the file.
1583   // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1584   ReadOptions ro(read_options);
1585   bool snapshot_created = SetSnapshotIfNeeded(&ro);
1586 
1587   PinnableSlice index_entry;
1588   Status s;
1589   bool is_blob_index = false;
1590   DBImpl::GetImplOptions get_impl_options;
1591   get_impl_options.column_family = column_family;
1592   get_impl_options.value = &index_entry;
1593   get_impl_options.is_blob_index = &is_blob_index;
1594   s = db_impl_->GetImpl(ro, key, get_impl_options);
1595   if (expiration != nullptr) {
1596     *expiration = kNoExpiration;
1597   }
1598   RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
1599   if (s.ok()) {
1600     if (is_blob_index) {
1601       s = GetBlobValue(key, index_entry, value, expiration);
1602     } else {
1603       // The index entry is the value itself in this case.
1604       value->PinSelf(index_entry);
1605     }
1606     RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
1607   }
1608   if (snapshot_created) {
1609     db_->ReleaseSnapshot(ro.snapshot);
1610   }
1611   return s;
1612 }
1613 
SanityCheck(bool aborted)1614 std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
1615   if (aborted) {
1616     return std::make_pair(false, -1);
1617   }
1618 
1619   ReadLock rl(&mutex_);
1620 
1621   ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
1622   ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
1623                  blob_files_.size());
1624   ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
1625                  open_ttl_files_.size());
1626 
1627   for (const auto& blob_file : open_ttl_files_) {
1628     (void)blob_file;
1629     assert(!blob_file->Immutable());
1630   }
1631 
1632   for (const auto& pair : live_imm_non_ttl_blob_files_) {
1633     const auto& blob_file = pair.second;
1634     (void)blob_file;
1635     assert(!blob_file->HasTTL());
1636     assert(blob_file->Immutable());
1637   }
1638 
1639   uint64_t now = EpochNow();
1640 
1641   for (auto blob_file_pair : blob_files_) {
1642     auto blob_file = blob_file_pair.second;
1643     char buf[1000];
1644     int pos = snprintf(buf, sizeof(buf),
1645                        "Blob file %" PRIu64 ", size %" PRIu64
1646                        ", blob count %" PRIu64 ", immutable %d",
1647                        blob_file->BlobFileNumber(), blob_file->GetFileSize(),
1648                        blob_file->BlobCount(), blob_file->Immutable());
1649     if (blob_file->HasTTL()) {
1650       ExpirationRange expiration_range;
1651 
1652       {
1653         ReadLock file_lock(&blob_file->mutex_);
1654         expiration_range = blob_file->GetExpirationRange();
1655       }
1656 
1657       pos += snprintf(buf + pos, sizeof(buf) - pos,
1658                       ", expiration range (%" PRIu64 ", %" PRIu64 ")",
1659                       expiration_range.first, expiration_range.second);
1660       if (!blob_file->Obsolete()) {
1661         pos += snprintf(buf + pos, sizeof(buf) - pos,
1662                         ", expire in %" PRIu64 " seconds",
1663                         expiration_range.second - now);
1664       }
1665     }
1666     if (blob_file->Obsolete()) {
1667       pos += snprintf(buf + pos, sizeof(buf) - pos, ", obsolete at %" PRIu64,
1668                       blob_file->GetObsoleteSequence());
1669     }
1670     snprintf(buf + pos, sizeof(buf) - pos, ".");
1671     ROCKS_LOG_INFO(db_options_.info_log, "%s", buf);
1672   }
1673 
1674   // reschedule
1675   return std::make_pair(true, -1);
1676 }
1677 
CloseBlobFile(std::shared_ptr<BlobFile> bfile)1678 Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
1679   assert(bfile);
1680   assert(!bfile->Immutable());
1681   assert(!bfile->Obsolete());
1682 
1683   if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
1684     write_mutex_.AssertHeld();
1685   }
1686 
1687   ROCKS_LOG_INFO(db_options_.info_log,
1688                  "Closing blob file %" PRIu64 ". Path: %s",
1689                  bfile->BlobFileNumber(), bfile->PathName().c_str());
1690 
1691   const SequenceNumber sequence = GetLatestSequenceNumber();
1692 
1693   const Status s = bfile->WriteFooterAndCloseLocked(sequence);
1694 
1695   if (s.ok()) {
1696     total_blob_size_ += BlobLogFooter::kSize;
1697   } else {
1698     bfile->MarkImmutable(sequence);
1699 
1700     ROCKS_LOG_ERROR(db_options_.info_log,
1701                     "Failed to close blob file %" PRIu64 "with error: %s",
1702                     bfile->BlobFileNumber(), s.ToString().c_str());
1703   }
1704 
1705   if (bfile->HasTTL()) {
1706     size_t erased __attribute__((__unused__));
1707     erased = open_ttl_files_.erase(bfile);
1708   } else {
1709     if (bfile == open_non_ttl_file_) {
1710       open_non_ttl_file_ = nullptr;
1711     }
1712 
1713     const uint64_t blob_file_number = bfile->BlobFileNumber();
1714     auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
1715     assert(it == live_imm_non_ttl_blob_files_.end() ||
1716            it->first != blob_file_number);
1717     live_imm_non_ttl_blob_files_.insert(
1718         it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
1719                 blob_file_number, bfile));
1720   }
1721 
1722   return s;
1723 }
1724 
CloseBlobFileIfNeeded(std::shared_ptr<BlobFile> & bfile)1725 Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
1726   write_mutex_.AssertHeld();
1727 
1728   // atomic read
1729   if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
1730     return Status::OK();
1731   }
1732 
1733   WriteLock lock(&mutex_);
1734   WriteLock file_lock(&bfile->mutex_);
1735 
1736   assert(!bfile->Obsolete() || bfile->Immutable());
1737   if (bfile->Immutable()) {
1738     return Status::OK();
1739   }
1740 
1741   return CloseBlobFile(bfile);
1742 }
1743 
ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,SequenceNumber obsolete_seq,bool update_size)1744 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
1745                                   SequenceNumber obsolete_seq,
1746                                   bool update_size) {
1747   assert(blob_file->Immutable());
1748   assert(!blob_file->Obsolete());
1749 
1750   // Should hold write lock of mutex_ or during DB open.
1751   blob_file->MarkObsolete(obsolete_seq);
1752   obsolete_files_.push_back(blob_file);
1753   assert(total_blob_size_.load() >= blob_file->GetFileSize());
1754   if (update_size) {
1755     total_blob_size_ -= blob_file->GetFileSize();
1756   }
1757 }
1758 
VisibleToActiveSnapshot(const std::shared_ptr<BlobFile> & bfile)1759 bool BlobDBImpl::VisibleToActiveSnapshot(
1760     const std::shared_ptr<BlobFile>& bfile) {
1761   assert(bfile->Obsolete());
1762 
1763   // We check whether the oldest snapshot is no less than the last sequence
1764   // by the time the blob file become obsolete. If so, the blob file is not
1765   // visible to all existing snapshots.
1766   //
1767   // If we keep track of the earliest sequence of the keys in the blob file,
1768   // we could instead check if there's a snapshot falls in range
1769   // [earliest_sequence, obsolete_sequence). But doing so will make the
1770   // implementation more complicated.
1771   SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
1772   SequenceNumber oldest_snapshot = kMaxSequenceNumber;
1773   {
1774     // Need to lock DBImpl mutex before access snapshot list.
1775     InstrumentedMutexLock l(db_impl_->mutex());
1776     auto& snapshots = db_impl_->snapshots();
1777     if (!snapshots.empty()) {
1778       oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
1779     }
1780   }
1781   bool visible = oldest_snapshot < obsolete_sequence;
1782   if (visible) {
1783     ROCKS_LOG_INFO(db_options_.info_log,
1784                    "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
1785                    ") visible to oldest snapshot %" PRIu64 ".",
1786                    bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
1787   }
1788   return visible;
1789 }
1790 
EvictExpiredFiles(bool aborted)1791 std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
1792   if (aborted) {
1793     return std::make_pair(false, -1);
1794   }
1795 
1796   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
1797   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
1798 
1799   std::vector<std::shared_ptr<BlobFile>> process_files;
1800   uint64_t now = EpochNow();
1801   {
1802     ReadLock rl(&mutex_);
1803     for (auto p : blob_files_) {
1804       auto& blob_file = p.second;
1805       ReadLock file_lock(&blob_file->mutex_);
1806       if (blob_file->HasTTL() && !blob_file->Obsolete() &&
1807           blob_file->GetExpirationRange().second <= now) {
1808         process_files.push_back(blob_file);
1809       }
1810     }
1811   }
1812 
1813   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
1814   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
1815   TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
1816 
1817   SequenceNumber seq = GetLatestSequenceNumber();
1818   {
1819     MutexLock l(&write_mutex_);
1820     WriteLock lock(&mutex_);
1821     for (auto& blob_file : process_files) {
1822       WriteLock file_lock(&blob_file->mutex_);
1823 
1824       // Need to double check if the file is obsolete.
1825       if (blob_file->Obsolete()) {
1826         assert(blob_file->Immutable());
1827         continue;
1828       }
1829 
1830       if (!blob_file->Immutable()) {
1831         CloseBlobFile(blob_file);
1832       }
1833 
1834       assert(blob_file->Immutable());
1835 
1836       ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
1837     }
1838   }
1839 
1840   return std::make_pair(true, -1);
1841 }
1842 
SyncBlobFiles()1843 Status BlobDBImpl::SyncBlobFiles() {
1844   MutexLock l(&write_mutex_);
1845 
1846   std::vector<std::shared_ptr<BlobFile>> process_files;
1847   {
1848     ReadLock rl(&mutex_);
1849     for (auto fitr : open_ttl_files_) {
1850       process_files.push_back(fitr);
1851     }
1852     if (open_non_ttl_file_ != nullptr) {
1853       process_files.push_back(open_non_ttl_file_);
1854     }
1855   }
1856 
1857   Status s;
1858   for (auto& blob_file : process_files) {
1859     s = blob_file->Fsync();
1860     if (!s.ok()) {
1861       ROCKS_LOG_ERROR(db_options_.info_log,
1862                       "Failed to sync blob file %" PRIu64 ", status: %s",
1863                       blob_file->BlobFileNumber(), s.ToString().c_str());
1864       return s;
1865     }
1866   }
1867 
1868   s = dir_ent_->Fsync();
1869   if (!s.ok()) {
1870     ROCKS_LOG_ERROR(db_options_.info_log,
1871                     "Failed to sync blob directory, status: %s",
1872                     s.ToString().c_str());
1873   }
1874   return s;
1875 }
1876 
ReclaimOpenFiles(bool aborted)1877 std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
1878   if (aborted) return std::make_pair(false, -1);
1879 
1880   if (open_file_count_.load() < kOpenFilesTrigger) {
1881     return std::make_pair(true, -1);
1882   }
1883 
1884   // in the future, we should sort by last_access_
1885   // instead of closing every file
1886   ReadLock rl(&mutex_);
1887   for (auto const& ent : blob_files_) {
1888     auto bfile = ent.second;
1889     if (bfile->last_access_.load() == -1) continue;
1890 
1891     WriteLock lockbfile_w(&bfile->mutex_);
1892     CloseRandomAccessLocked(bfile);
1893   }
1894 
1895   return std::make_pair(true, -1);
1896 }
1897 
DeleteObsoleteFiles(bool aborted)1898 std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
1899   if (aborted) {
1900     return std::make_pair(false, -1);
1901   }
1902 
1903   MutexLock delete_file_lock(&delete_file_mutex_);
1904   if (disable_file_deletions_ > 0) {
1905     return std::make_pair(true, -1);
1906   }
1907 
1908   std::list<std::shared_ptr<BlobFile>> tobsolete;
1909   {
1910     WriteLock wl(&mutex_);
1911     if (obsolete_files_.empty()) {
1912       return std::make_pair(true, -1);
1913     }
1914     tobsolete.swap(obsolete_files_);
1915   }
1916 
1917   bool file_deleted = false;
1918   for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
1919     auto bfile = *iter;
1920     {
1921       ReadLock lockbfile_r(&bfile->mutex_);
1922       if (VisibleToActiveSnapshot(bfile)) {
1923         ROCKS_LOG_INFO(db_options_.info_log,
1924                        "Could not delete file due to snapshot failure %s",
1925                        bfile->PathName().c_str());
1926         ++iter;
1927         continue;
1928       }
1929     }
1930     ROCKS_LOG_INFO(db_options_.info_log,
1931                    "Will delete file due to snapshot success %s",
1932                    bfile->PathName().c_str());
1933 
1934     {
1935       WriteLock wl(&mutex_);
1936       blob_files_.erase(bfile->BlobFileNumber());
1937     }
1938 
1939     Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
1940                             bfile->PathName(), blob_dir_, true,
1941                             /*force_fg=*/false);
1942     if (!s.ok()) {
1943       ROCKS_LOG_ERROR(db_options_.info_log,
1944                       "File failed to be deleted as obsolete %s",
1945                       bfile->PathName().c_str());
1946       ++iter;
1947       continue;
1948     }
1949 
1950     file_deleted = true;
1951     ROCKS_LOG_INFO(db_options_.info_log,
1952                    "File deleted as obsolete from blob dir %s",
1953                    bfile->PathName().c_str());
1954 
1955     iter = tobsolete.erase(iter);
1956   }
1957 
1958   // directory change. Fsync
1959   if (file_deleted) {
1960     Status s = dir_ent_->Fsync();
1961     if (!s.ok()) {
1962       ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
1963                       blob_dir_.c_str(), s.ToString().c_str());
1964     }
1965   }
1966 
1967   // put files back into obsolete if for some reason, delete failed
1968   if (!tobsolete.empty()) {
1969     WriteLock wl(&mutex_);
1970     for (auto bfile : tobsolete) {
1971       blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
1972       obsolete_files_.push_front(bfile);
1973     }
1974   }
1975 
1976   return std::make_pair(!aborted, -1);
1977 }
1978 
CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>> * bfiles_copy)1979 void BlobDBImpl::CopyBlobFiles(
1980     std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
1981   ReadLock rl(&mutex_);
1982   for (auto const& p : blob_files_) {
1983     bfiles_copy->push_back(p.second);
1984   }
1985 }
1986 
NewIterator(const ReadOptions & read_options)1987 Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
1988   auto* cfd =
1989       reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
1990   // Get a snapshot to avoid blob file get deleted between we
1991   // fetch and index entry and reading from the file.
1992   ManagedSnapshot* own_snapshot = nullptr;
1993   const Snapshot* snapshot = read_options.snapshot;
1994   if (snapshot == nullptr) {
1995     own_snapshot = new ManagedSnapshot(db_);
1996     snapshot = own_snapshot->snapshot();
1997   }
1998   auto* iter = db_impl_->NewIteratorImpl(
1999       read_options, cfd, snapshot->GetSequenceNumber(),
2000       nullptr /*read_callback*/, true /*allow_blob*/);
2001   return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
2002 }
2003 
DestroyBlobDB(const std::string & dbname,const Options & options,const BlobDBOptions & bdb_options)2004 Status DestroyBlobDB(const std::string& dbname, const Options& options,
2005                      const BlobDBOptions& bdb_options) {
2006   const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
2007   Env* env = soptions.env;
2008 
2009   Status status;
2010   std::string blobdir;
2011   blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
2012                                         : bdb_options.blob_dir;
2013 
2014   std::vector<std::string> filenames;
2015   env->GetChildren(blobdir, &filenames);
2016 
2017   for (const auto& f : filenames) {
2018     uint64_t number;
2019     FileType type;
2020     if (ParseFileName(f, &number, &type) && type == kBlobFile) {
2021       Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
2022                                 /*force_fg=*/false);
2023       if (status.ok() && !del.ok()) {
2024         status = del;
2025       }
2026     }
2027   }
2028   env->DeleteDir(blobdir);
2029 
2030   Status destroy = DestroyDB(dbname, options);
2031   if (status.ok() && !destroy.ok()) {
2032     status = destroy;
2033   }
2034 
2035   return status;
2036 }
2037 
2038 #ifndef NDEBUG
TEST_GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value)2039 Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
2040                                      PinnableSlice* value) {
2041   return GetBlobValue(key, index_entry, value);
2042 }
2043 
TEST_AddDummyBlobFile(uint64_t blob_file_number,SequenceNumber immutable_sequence)2044 void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
2045                                        SequenceNumber immutable_sequence) {
2046   auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
2047                                               db_options_.info_log.get());
2048   blob_file->MarkImmutable(immutable_sequence);
2049 
2050   blob_files_[blob_file_number] = blob_file;
2051   live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
2052 }
2053 
TEST_GetBlobFiles() const2054 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
2055   ReadLock l(&mutex_);
2056   std::vector<std::shared_ptr<BlobFile>> blob_files;
2057   for (auto& p : blob_files_) {
2058     blob_files.emplace_back(p.second);
2059   }
2060   return blob_files;
2061 }
2062 
TEST_GetLiveImmNonTTLFiles() const2063 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
2064     const {
2065   ReadLock l(&mutex_);
2066   std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
2067   for (const auto& pair : live_imm_non_ttl_blob_files_) {
2068     live_imm_non_ttl_files.emplace_back(pair.second);
2069   }
2070   return live_imm_non_ttl_files;
2071 }
2072 
TEST_GetObsoleteFiles() const2073 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
2074     const {
2075   ReadLock l(&mutex_);
2076   std::vector<std::shared_ptr<BlobFile>> obsolete_files;
2077   for (auto& bfile : obsolete_files_) {
2078     obsolete_files.emplace_back(bfile);
2079   }
2080   return obsolete_files;
2081 }
2082 
TEST_DeleteObsoleteFiles()2083 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
2084   DeleteObsoleteFiles(false /*abort*/);
2085 }
2086 
TEST_CloseBlobFile(std::shared_ptr<BlobFile> & bfile)2087 Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
2088   MutexLock l(&write_mutex_);
2089   WriteLock lock(&mutex_);
2090   WriteLock file_lock(&bfile->mutex_);
2091 
2092   return CloseBlobFile(bfile);
2093 }
2094 
TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq,bool update_size)2095 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
2096                                        SequenceNumber obsolete_seq,
2097                                        bool update_size) {
2098   return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
2099 }
2100 
TEST_EvictExpiredFiles()2101 void BlobDBImpl::TEST_EvictExpiredFiles() {
2102   EvictExpiredFiles(false /*abort*/);
2103 }
2104 
TEST_live_sst_size()2105 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
2106 
TEST_InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)2107 void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
2108     const std::vector<LiveFileMetaData>& live_files) {
2109   InitializeBlobFileToSstMapping(live_files);
2110 }
2111 
TEST_ProcessFlushJobInfo(const FlushJobInfo & info)2112 void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
2113   ProcessFlushJobInfo(info);
2114 }
2115 
TEST_ProcessCompactionJobInfo(const CompactionJobInfo & info)2116 void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
2117   ProcessCompactionJobInfo(info);
2118 }
2119 
2120 #endif  //  !NDEBUG
2121 
2122 }  // namespace blob_db
2123 }  // namespace ROCKSDB_NAMESPACE
2124 #endif  // ROCKSDB_LITE
2125