1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/table_cache.h"
11 
12 #include "cache/simple_deleter.h"
13 #include "db/dbformat.h"
14 #include "db/range_tombstone_fragmenter.h"
15 #include "db/snapshot_impl.h"
16 #include "db/version_edit.h"
17 #include "file/filename.h"
18 #include "file/random_access_file_reader.h"
19 #include "monitoring/perf_context_imp.h"
20 #include "rocksdb/statistics.h"
21 #include "table/block_based/block_based_table_reader.h"
22 #include "table/get_context.h"
23 #include "table/internal_iterator.h"
24 #include "table/iterator_wrapper.h"
25 #include "table/multiget_context.h"
26 #include "table/table_builder.h"
27 #include "table/table_reader.h"
28 #include "test_util/sync_point.h"
29 #include "util/cast_util.h"
30 #include "util/coding.h"
31 #include "util/stop_watch.h"
32 
33 namespace ROCKSDB_NAMESPACE {
34 
35 namespace {
36 
UnrefEntry(void * arg1,void * arg2)37 static void UnrefEntry(void* arg1, void* arg2) {
38   Cache* cache = reinterpret_cast<Cache*>(arg1);
39   Cache::Handle* h = reinterpret_cast<Cache::Handle*>(arg2);
40   cache->Release(h);
41 }
42 
GetSliceForFileNumber(const uint64_t * file_number)43 static Slice GetSliceForFileNumber(const uint64_t* file_number) {
44   return Slice(reinterpret_cast<const char*>(file_number),
45                sizeof(*file_number));
46 }
47 
48 #ifndef ROCKSDB_LITE
49 
AppendVarint64(IterKey * key,uint64_t v)50 void AppendVarint64(IterKey* key, uint64_t v) {
51   char buf[10];
52   auto ptr = EncodeVarint64(buf, v);
53   key->TrimAppend(key->Size(), buf, ptr - buf);
54 }
55 
56 #endif  // ROCKSDB_LITE
57 
58 }  // namespace
59 
TableCache(const ImmutableCFOptions & ioptions,const FileOptions & file_options,Cache * const cache,BlockCacheTracer * const block_cache_tracer)60 TableCache::TableCache(const ImmutableCFOptions& ioptions,
61                        const FileOptions& file_options, Cache* const cache,
62                        BlockCacheTracer* const block_cache_tracer)
63     : ioptions_(ioptions),
64       file_options_(file_options),
65       cache_(cache),
66       immortal_tables_(false),
67       block_cache_tracer_(block_cache_tracer) {
68   if (ioptions_.row_cache) {
69     // If the same cache is shared by multiple instances, we need to
70     // disambiguate its entries.
71     PutVarint64(&row_cache_id_, ioptions_.row_cache->NewId());
72   }
73 }
74 
~TableCache()75 TableCache::~TableCache() {
76 }
77 
GetTableReaderFromHandle(Cache::Handle * handle)78 TableReader* TableCache::GetTableReaderFromHandle(Cache::Handle* handle) {
79   return reinterpret_cast<TableReader*>(cache_->Value(handle));
80 }
81 
ReleaseHandle(Cache::Handle * handle)82 void TableCache::ReleaseHandle(Cache::Handle* handle) {
83   cache_->Release(handle);
84 }
85 
GetTableReader(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,bool sequential_mode,bool record_read_stats,HistogramImpl * file_read_hist,std::unique_ptr<TableReader> * table_reader,const SliceTransform * prefix_extractor,bool skip_filters,int level,bool prefetch_index_and_filter_in_cache)86 Status TableCache::GetTableReader(
87     const FileOptions& file_options,
88     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
89     bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist,
90     std::unique_ptr<TableReader>* table_reader,
91     const SliceTransform* prefix_extractor, bool skip_filters, int level,
92     bool prefetch_index_and_filter_in_cache) {
93   std::string fname =
94       TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
95   std::unique_ptr<FSRandomAccessFile> file;
96   Status s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
97                                                nullptr);
98   RecordTick(ioptions_.statistics, NO_FILE_OPENS);
99   if (s.IsPathNotFound()) {
100     fname = Rocks2LevelTableFileName(fname);
101     s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file, nullptr);
102     RecordTick(ioptions_.statistics, NO_FILE_OPENS);
103   }
104 
105   if (s.ok()) {
106     if (!sequential_mode && ioptions_.advise_random_on_open) {
107       file->Hint(FSRandomAccessFile::kRandom);
108     }
109     StopWatch sw(ioptions_.env, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
110     std::unique_ptr<RandomAccessFileReader> file_reader(
111         new RandomAccessFileReader(
112             std::move(file), fname, ioptions_.env,
113             record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
114             file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
115     s = ioptions_.table_factory->NewTableReader(
116         TableReaderOptions(ioptions_, prefix_extractor, file_options,
117                            internal_comparator, skip_filters, immortal_tables_,
118                            level, fd.largest_seqno, block_cache_tracer_),
119         std::move(file_reader), fd.GetFileSize(), table_reader,
120         prefetch_index_and_filter_in_cache);
121     TEST_SYNC_POINT("TableCache::GetTableReader:0");
122   }
123   return s;
124 }
125 
EraseHandle(const FileDescriptor & fd,Cache::Handle * handle)126 void TableCache::EraseHandle(const FileDescriptor& fd, Cache::Handle* handle) {
127   ReleaseHandle(handle);
128   uint64_t number = fd.GetNumber();
129   Slice key = GetSliceForFileNumber(&number);
130   cache_->Erase(key);
131 }
132 
FindTable(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,Cache::Handle ** handle,const SliceTransform * prefix_extractor,const bool no_io,bool record_read_stats,HistogramImpl * file_read_hist,bool skip_filters,int level,bool prefetch_index_and_filter_in_cache)133 Status TableCache::FindTable(const FileOptions& file_options,
134                              const InternalKeyComparator& internal_comparator,
135                              const FileDescriptor& fd, Cache::Handle** handle,
136                              const SliceTransform* prefix_extractor,
137                              const bool no_io, bool record_read_stats,
138                              HistogramImpl* file_read_hist, bool skip_filters,
139                              int level,
140                              bool prefetch_index_and_filter_in_cache) {
141   PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env);
142   Status s;
143   uint64_t number = fd.GetNumber();
144   Slice key = GetSliceForFileNumber(&number);
145   *handle = cache_->Lookup(key);
146   TEST_SYNC_POINT_CALLBACK("TableCache::FindTable:0",
147                            const_cast<bool*>(&no_io));
148 
149   if (*handle == nullptr) {
150     if (no_io) {  // Don't do IO and return a not-found status
151       return Status::Incomplete("Table not found in table_cache, no_io is set");
152     }
153     std::unique_ptr<TableReader> table_reader;
154     s = GetTableReader(file_options, internal_comparator, fd,
155                        false /* sequential mode */, record_read_stats,
156                        file_read_hist, &table_reader, prefix_extractor,
157                        skip_filters, level, prefetch_index_and_filter_in_cache);
158     if (!s.ok()) {
159       assert(table_reader == nullptr);
160       RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
161       // We do not cache error results so that if the error is transient,
162       // or somebody repairs the file, we recover automatically.
163     } else {
164       s = cache_->Insert(key, table_reader.get(), 1,
165                          SimpleDeleter<TableReader>::GetInstance(), handle);
166       if (s.ok()) {
167         // Release ownership of table reader.
168         table_reader.release();
169       }
170     }
171   }
172   return s;
173 }
174 
NewIterator(const ReadOptions & options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const FileMetaData & file_meta,RangeDelAggregator * range_del_agg,const SliceTransform * prefix_extractor,TableReader ** table_reader_ptr,HistogramImpl * file_read_hist,TableReaderCaller caller,Arena * arena,bool skip_filters,int level,const InternalKey * smallest_compaction_key,const InternalKey * largest_compaction_key)175 InternalIterator* TableCache::NewIterator(
176     const ReadOptions& options, const FileOptions& file_options,
177     const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
178     RangeDelAggregator* range_del_agg, const SliceTransform* prefix_extractor,
179     TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
180     TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
181     const InternalKey* smallest_compaction_key,
182     const InternalKey* largest_compaction_key) {
183   PERF_TIMER_GUARD(new_table_iterator_nanos);
184 
185   Status s;
186   TableReader* table_reader = nullptr;
187   Cache::Handle* handle = nullptr;
188   if (table_reader_ptr != nullptr) {
189     *table_reader_ptr = nullptr;
190   }
191   bool for_compaction = caller == TableReaderCaller::kCompaction;
192   auto& fd = file_meta.fd;
193   table_reader = fd.table_reader;
194   if (table_reader == nullptr) {
195     s = FindTable(file_options, icomparator, fd, &handle, prefix_extractor,
196                   options.read_tier == kBlockCacheTier /* no_io */,
197                   !for_compaction /* record_read_stats */, file_read_hist,
198                   skip_filters, level);
199     if (s.ok()) {
200       table_reader = GetTableReaderFromHandle(handle);
201     }
202   }
203   InternalIterator* result = nullptr;
204   if (s.ok()) {
205     if (options.table_filter &&
206         !options.table_filter(*table_reader->GetTableProperties())) {
207       result = NewEmptyInternalIterator<Slice>(arena);
208     } else {
209       result = table_reader->NewIterator(options, prefix_extractor, arena,
210                                    skip_filters, caller,
211                                    file_options.compaction_readahead_size);
212     }
213     if (handle != nullptr) {
214       result->RegisterCleanup(&UnrefEntry, cache_, handle);
215       handle = nullptr;  // prevent from releasing below
216     }
217 
218     if (for_compaction) {
219       table_reader->SetupForCompaction();
220     }
221     if (table_reader_ptr != nullptr) {
222       *table_reader_ptr = table_reader;
223     }
224   }
225   if (s.ok() && range_del_agg != nullptr && !options.ignore_range_deletions) {
226     if (range_del_agg->AddFile(fd.GetNumber())) {
227       std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
228           static_cast<FragmentedRangeTombstoneIterator*>(
229               table_reader->NewRangeTombstoneIterator(options)));
230       if (range_del_iter != nullptr) {
231         s = range_del_iter->status();
232       }
233       if (s.ok()) {
234         const InternalKey* smallest = &file_meta.smallest;
235         const InternalKey* largest = &file_meta.largest;
236         if (smallest_compaction_key != nullptr) {
237           smallest = smallest_compaction_key;
238         }
239         if (largest_compaction_key != nullptr) {
240           largest = largest_compaction_key;
241         }
242         range_del_agg->AddTombstones(std::move(range_del_iter), smallest,
243                                      largest);
244       }
245     }
246   }
247 
248   if (handle != nullptr) {
249     ReleaseHandle(handle);
250   }
251   if (!s.ok()) {
252     assert(result == nullptr);
253     result = NewErrorInternalIterator<Slice>(s, arena);
254   }
255   return result;
256 }
257 
GetRangeTombstoneIterator(const ReadOptions & options,const InternalKeyComparator & internal_comparator,const FileMetaData & file_meta,std::unique_ptr<FragmentedRangeTombstoneIterator> * out_iter)258 Status TableCache::GetRangeTombstoneIterator(
259     const ReadOptions& options,
260     const InternalKeyComparator& internal_comparator,
261     const FileMetaData& file_meta,
262     std::unique_ptr<FragmentedRangeTombstoneIterator>* out_iter) {
263   const FileDescriptor& fd = file_meta.fd;
264   Status s;
265   TableReader* t = fd.table_reader;
266   Cache::Handle* handle = nullptr;
267   if (t == nullptr) {
268     s = FindTable(file_options_, internal_comparator, fd, &handle);
269     if (s.ok()) {
270       t = GetTableReaderFromHandle(handle);
271     }
272   }
273   if (s.ok()) {
274     out_iter->reset(t->NewRangeTombstoneIterator(options));
275     assert(out_iter);
276   }
277   return s;
278 }
279 
280 #ifndef ROCKSDB_LITE
CreateRowCacheKeyPrefix(const ReadOptions & options,const FileDescriptor & fd,const Slice & internal_key,GetContext * get_context,IterKey & row_cache_key)281 void TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,
282                                          const FileDescriptor& fd,
283                                          const Slice& internal_key,
284                                          GetContext* get_context,
285                                          IterKey& row_cache_key) {
286   uint64_t fd_number = fd.GetNumber();
287   // We use the user key as cache key instead of the internal key,
288   // otherwise the whole cache would be invalidated every time the
289   // sequence key increases. However, to support caching snapshot
290   // reads, we append the sequence number (incremented by 1 to
291   // distinguish from 0) only in this case.
292   // If the snapshot is larger than the largest seqno in the file,
293   // all data should be exposed to the snapshot, so we treat it
294   // the same as there is no snapshot. The exception is that if
295   // a seq-checking callback is registered, some internal keys
296   // may still be filtered out.
297   uint64_t seq_no = 0;
298   // Maybe we can include the whole file ifsnapshot == fd.largest_seqno.
299   if (options.snapshot != nullptr &&
300       (get_context->has_callback() ||
301        static_cast_with_check<const SnapshotImpl, const Snapshot>(
302            options.snapshot)
303                ->GetSequenceNumber() <= fd.largest_seqno)) {
304     // We should consider to use options.snapshot->GetSequenceNumber()
305     // instead of GetInternalKeySeqno(k), which will make the code
306     // easier to understand.
307     seq_no = 1 + GetInternalKeySeqno(internal_key);
308   }
309 
310   // Compute row cache key.
311   row_cache_key.TrimAppend(row_cache_key.Size(), row_cache_id_.data(),
312                            row_cache_id_.size());
313   AppendVarint64(&row_cache_key, fd_number);
314   AppendVarint64(&row_cache_key, seq_no);
315 }
316 
GetFromRowCache(const Slice & user_key,IterKey & row_cache_key,size_t prefix_size,GetContext * get_context)317 bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
318                                  size_t prefix_size, GetContext* get_context) {
319   bool found = false;
320 
321   row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size());
322   if (auto row_handle =
323           ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
324     // Cleanable routine to release the cache entry
325     Cleanable value_pinner;
326     auto release_cache_entry_func = [](void* cache_to_clean,
327                                        void* cache_handle) {
328       ((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
329     };
330     auto found_row_cache_entry =
331         static_cast<const std::string*>(ioptions_.row_cache->Value(row_handle));
332     // If it comes here value is located on the cache.
333     // found_row_cache_entry points to the value on cache,
334     // and value_pinner has cleanup procedure for the cached entry.
335     // After replayGetContextLog() returns, get_context.pinnable_slice_
336     // will point to cache entry buffer (or a copy based on that) and
337     // cleanup routine under value_pinner will be delegated to
338     // get_context.pinnable_slice_. Cache entry is released when
339     // get_context.pinnable_slice_ is reset.
340     value_pinner.RegisterCleanup(release_cache_entry_func,
341                                  ioptions_.row_cache.get(), row_handle);
342     replayGetContextLog(*found_row_cache_entry, user_key, get_context,
343                         &value_pinner);
344     RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
345     found = true;
346   } else {
347     RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
348   }
349   return found;
350 }
351 #endif  // ROCKSDB_LITE
352 
Get(const ReadOptions & options,const InternalKeyComparator & internal_comparator,const FileMetaData & file_meta,const Slice & k,GetContext * get_context,const SliceTransform * prefix_extractor,HistogramImpl * file_read_hist,bool skip_filters,int level)353 Status TableCache::Get(const ReadOptions& options,
354                        const InternalKeyComparator& internal_comparator,
355                        const FileMetaData& file_meta, const Slice& k,
356                        GetContext* get_context,
357                        const SliceTransform* prefix_extractor,
358                        HistogramImpl* file_read_hist, bool skip_filters,
359                        int level) {
360   auto& fd = file_meta.fd;
361   std::string* row_cache_entry = nullptr;
362   bool done = false;
363 #ifndef ROCKSDB_LITE
364   IterKey row_cache_key;
365   std::string row_cache_entry_buffer;
366 
367   // Check row cache if enabled. Since row cache does not currently store
368   // sequence numbers, we cannot use it if we need to fetch the sequence.
369   if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
370     auto user_key = ExtractUserKey(k);
371     CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
372     done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
373                            get_context);
374     if (!done) {
375       row_cache_entry = &row_cache_entry_buffer;
376     }
377   }
378 #endif  // ROCKSDB_LITE
379   Status s;
380   TableReader* t = fd.table_reader;
381   Cache::Handle* handle = nullptr;
382   if (!done && s.ok()) {
383     if (t == nullptr) {
384       s = FindTable(
385           file_options_, internal_comparator, fd, &handle, prefix_extractor,
386           options.read_tier == kBlockCacheTier /* no_io */,
387           true /* record_read_stats */, file_read_hist, skip_filters, level);
388       if (s.ok()) {
389         t = GetTableReaderFromHandle(handle);
390       }
391     }
392     SequenceNumber* max_covering_tombstone_seq =
393         get_context->max_covering_tombstone_seq();
394     if (s.ok() && max_covering_tombstone_seq != nullptr &&
395         !options.ignore_range_deletions) {
396       std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
397           t->NewRangeTombstoneIterator(options));
398       if (range_del_iter != nullptr) {
399         *max_covering_tombstone_seq = std::max(
400             *max_covering_tombstone_seq,
401             range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
402       }
403     }
404     if (s.ok()) {
405       get_context->SetReplayLog(row_cache_entry);  // nullptr if no cache.
406       s = t->Get(options, k, get_context, prefix_extractor, skip_filters);
407       get_context->SetReplayLog(nullptr);
408     } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
409       // Couldn't find Table in cache but treat as kFound if no_io set
410       get_context->MarkKeyMayExist();
411       s = Status::OK();
412       done = true;
413     }
414   }
415 
416 #ifndef ROCKSDB_LITE
417   // Put the replay log in row cache only if something was found.
418   if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
419     size_t charge =
420         row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
421     void* row_ptr = new std::string(std::move(*row_cache_entry));
422     ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
423                                 SimpleDeleter<std::string>::GetInstance());
424   }
425 #endif  // ROCKSDB_LITE
426 
427   if (handle != nullptr) {
428     ReleaseHandle(handle);
429   }
430   return s;
431 }
432 
433 // Batched version of TableCache::MultiGet.
MultiGet(const ReadOptions & options,const InternalKeyComparator & internal_comparator,const FileMetaData & file_meta,const MultiGetContext::Range * mget_range,const SliceTransform * prefix_extractor,HistogramImpl * file_read_hist,bool skip_filters,int level)434 Status TableCache::MultiGet(const ReadOptions& options,
435                             const InternalKeyComparator& internal_comparator,
436                             const FileMetaData& file_meta,
437                             const MultiGetContext::Range* mget_range,
438                             const SliceTransform* prefix_extractor,
439                             HistogramImpl* file_read_hist, bool skip_filters,
440                             int level) {
441   auto& fd = file_meta.fd;
442   Status s;
443   TableReader* t = fd.table_reader;
444   Cache::Handle* handle = nullptr;
445   MultiGetRange table_range(*mget_range, mget_range->begin(),
446                             mget_range->end());
447 #ifndef ROCKSDB_LITE
448   autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
449   IterKey row_cache_key;
450   size_t row_cache_key_prefix_size = 0;
451   KeyContext& first_key = *table_range.begin();
452   bool lookup_row_cache =
453       ioptions_.row_cache && !first_key.get_context->NeedToReadSequence();
454 
455   // Check row cache if enabled. Since row cache does not currently store
456   // sequence numbers, we cannot use it if we need to fetch the sequence.
457   if (lookup_row_cache) {
458     GetContext* first_context = first_key.get_context;
459     CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
460                             row_cache_key);
461     row_cache_key_prefix_size = row_cache_key.Size();
462 
463     for (auto miter = table_range.begin(); miter != table_range.end();
464          ++miter) {
465       const Slice& user_key = miter->ukey;
466       ;
467       GetContext* get_context = miter->get_context;
468 
469       if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
470                           get_context)) {
471         table_range.SkipKey(miter);
472       } else {
473         row_cache_entries.emplace_back();
474         get_context->SetReplayLog(&(row_cache_entries.back()));
475       }
476     }
477   }
478 #endif  // ROCKSDB_LITE
479 
480   // Check that table_range is not empty. Its possible all keys may have been
481   // found in the row cache and thus the range may now be empty
482   if (s.ok() && !table_range.empty()) {
483     if (t == nullptr) {
484       s = FindTable(
485           file_options_, internal_comparator, fd, &handle, prefix_extractor,
486           options.read_tier == kBlockCacheTier /* no_io */,
487           true /* record_read_stats */, file_read_hist, skip_filters, level);
488       TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
489       if (s.ok()) {
490         t = GetTableReaderFromHandle(handle);
491         assert(t);
492       }
493     }
494     if (s.ok() && !options.ignore_range_deletions) {
495       std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
496           t->NewRangeTombstoneIterator(options));
497       if (range_del_iter != nullptr) {
498         for (auto iter = table_range.begin(); iter != table_range.end();
499              ++iter) {
500           SequenceNumber* max_covering_tombstone_seq =
501               iter->get_context->max_covering_tombstone_seq();
502           *max_covering_tombstone_seq =
503               std::max(*max_covering_tombstone_seq,
504                        range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey));
505         }
506       }
507     }
508     if (s.ok()) {
509       t->MultiGet(options, &table_range, prefix_extractor, skip_filters);
510     } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
511       for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
512         Status* status = iter->s;
513         if (status->IsIncomplete()) {
514           // Couldn't find Table in cache but treat as kFound if no_io set
515           iter->get_context->MarkKeyMayExist();
516           s = Status::OK();
517         }
518       }
519     }
520   }
521 
522 #ifndef ROCKSDB_LITE
523   if (lookup_row_cache) {
524     size_t row_idx = 0;
525 
526     for (auto miter = table_range.begin(); miter != table_range.end();
527          ++miter) {
528       std::string& row_cache_entry = row_cache_entries[row_idx++];
529       const Slice& user_key = miter->ukey;
530       ;
531       GetContext* get_context = miter->get_context;
532 
533       get_context->SetReplayLog(nullptr);
534       // Compute row cache key.
535       row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
536                                user_key.size());
537       // Put the replay log in row cache only if something was found.
538       if (s.ok() && !row_cache_entry.empty()) {
539         size_t charge =
540             row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
541         void* row_ptr = new std::string(std::move(row_cache_entry));
542         ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
543                                     SimpleDeleter<std::string>::GetInstance());
544       }
545     }
546   }
547 #endif  // ROCKSDB_LITE
548 
549   if (handle != nullptr) {
550     ReleaseHandle(handle);
551   }
552   return s;
553 }
554 
GetTableProperties(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,std::shared_ptr<const TableProperties> * properties,const SliceTransform * prefix_extractor,bool no_io)555 Status TableCache::GetTableProperties(
556     const FileOptions& file_options,
557     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
558     std::shared_ptr<const TableProperties>* properties,
559     const SliceTransform* prefix_extractor, bool no_io) {
560   Status s;
561   auto table_reader = fd.table_reader;
562   // table already been pre-loaded?
563   if (table_reader) {
564     *properties = table_reader->GetTableProperties();
565 
566     return s;
567   }
568 
569   Cache::Handle* table_handle = nullptr;
570   s = FindTable(file_options, internal_comparator, fd, &table_handle,
571                 prefix_extractor, no_io);
572   if (!s.ok()) {
573     return s;
574   }
575   assert(table_handle);
576   auto table = GetTableReaderFromHandle(table_handle);
577   *properties = table->GetTableProperties();
578   ReleaseHandle(table_handle);
579   return s;
580 }
581 
GetMemoryUsageByTableReader(const FileOptions & file_options,const InternalKeyComparator & internal_comparator,const FileDescriptor & fd,const SliceTransform * prefix_extractor)582 size_t TableCache::GetMemoryUsageByTableReader(
583     const FileOptions& file_options,
584     const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
585     const SliceTransform* prefix_extractor) {
586   Status s;
587   auto table_reader = fd.table_reader;
588   // table already been pre-loaded?
589   if (table_reader) {
590     return table_reader->ApproximateMemoryUsage();
591   }
592 
593   Cache::Handle* table_handle = nullptr;
594   s = FindTable(file_options, internal_comparator, fd, &table_handle,
595                 prefix_extractor, true);
596   if (!s.ok()) {
597     return 0;
598   }
599   assert(table_handle);
600   auto table = GetTableReaderFromHandle(table_handle);
601   auto ret = table->ApproximateMemoryUsage();
602   ReleaseHandle(table_handle);
603   return ret;
604 }
605 
Evict(Cache * cache,uint64_t file_number)606 void TableCache::Evict(Cache* cache, uint64_t file_number) {
607   cache->Erase(GetSliceForFileNumber(&file_number));
608 }
609 
ApproximateOffsetOf(const Slice & key,const FileDescriptor & fd,TableReaderCaller caller,const InternalKeyComparator & internal_comparator,const SliceTransform * prefix_extractor)610 uint64_t TableCache::ApproximateOffsetOf(
611     const Slice& key, const FileDescriptor& fd, TableReaderCaller caller,
612     const InternalKeyComparator& internal_comparator,
613     const SliceTransform* prefix_extractor) {
614   uint64_t result = 0;
615   TableReader* table_reader = fd.table_reader;
616   Cache::Handle* table_handle = nullptr;
617   if (table_reader == nullptr) {
618     const bool for_compaction = (caller == TableReaderCaller::kCompaction);
619     Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
620                          prefix_extractor, false /* no_io */,
621                          !for_compaction /* record_read_stats */);
622     if (s.ok()) {
623       table_reader = GetTableReaderFromHandle(table_handle);
624     }
625   }
626 
627   if (table_reader != nullptr) {
628     result = table_reader->ApproximateOffsetOf(key, caller);
629   }
630   if (table_handle != nullptr) {
631     ReleaseHandle(table_handle);
632   }
633 
634   return result;
635 }
636 
ApproximateSize(const Slice & start,const Slice & end,const FileDescriptor & fd,TableReaderCaller caller,const InternalKeyComparator & internal_comparator,const SliceTransform * prefix_extractor)637 uint64_t TableCache::ApproximateSize(
638     const Slice& start, const Slice& end, const FileDescriptor& fd,
639     TableReaderCaller caller, const InternalKeyComparator& internal_comparator,
640     const SliceTransform* prefix_extractor) {
641   uint64_t result = 0;
642   TableReader* table_reader = fd.table_reader;
643   Cache::Handle* table_handle = nullptr;
644   if (table_reader == nullptr) {
645     const bool for_compaction = (caller == TableReaderCaller::kCompaction);
646     Status s = FindTable(file_options_, internal_comparator, fd, &table_handle,
647                          prefix_extractor, false /* no_io */,
648                          !for_compaction /* record_read_stats */);
649     if (s.ok()) {
650       table_reader = GetTableReaderFromHandle(table_handle);
651     }
652   }
653 
654   if (table_reader != nullptr) {
655     result = table_reader->ApproximateSize(start, end, caller);
656   }
657   if (table_handle != nullptr) {
658     ReleaseHandle(table_handle);
659   }
660 
661   return result;
662 }
663 }  // namespace ROCKSDB_NAMESPACE
664