1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "table/block_based/block_based_table_builder.h"
11 
12 #include <assert.h>
13 #include <stdio.h>
14 #include <list>
15 #include <map>
16 #include <memory>
17 #include <string>
18 #include <unordered_map>
19 #include <utility>
20 
21 #include "cache/simple_deleter.h"
22 #include "db/dbformat.h"
23 #include "index_builder.h"
24 
25 #include "rocksdb/cache.h"
26 #include "rocksdb/comparator.h"
27 #include "rocksdb/env.h"
28 #include "rocksdb/flush_block_policy.h"
29 #include "rocksdb/merge_operator.h"
30 #include "rocksdb/table.h"
31 
32 #include "table/block_based/block.h"
33 #include "table/block_based/block_based_filter_block.h"
34 #include "table/block_based/block_based_table_factory.h"
35 #include "table/block_based/block_based_table_reader.h"
36 #include "table/block_based/block_builder.h"
37 #include "table/block_based/filter_block.h"
38 #include "table/block_based/filter_policy_internal.h"
39 #include "table/block_based/full_filter_block.h"
40 #include "table/block_based/partitioned_filter_block.h"
41 #include "table/format.h"
42 #include "table/table_builder.h"
43 
44 #include "memory/memory_allocator.h"
45 #include "util/coding.h"
46 #include "util/compression.h"
47 #include "util/crc32c.h"
48 #include "util/stop_watch.h"
49 #include "util/string_util.h"
50 #include "util/xxhash.h"
51 
52 namespace ROCKSDB_NAMESPACE {
53 
54 extern const std::string kHashIndexPrefixesBlock;
55 extern const std::string kHashIndexPrefixesMetadataBlock;
56 
57 typedef BlockBasedTableOptions::IndexType IndexType;
58 
59 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
60 namespace {
61 
62 // Create a filter block builder based on its type.
CreateFilterBlockBuilder(const ImmutableCFOptions &,const MutableCFOptions & mopt,const FilterBuildingContext & context,const bool use_delta_encoding_for_index_values,PartitionedIndexBuilder * const p_index_builder)63 FilterBlockBuilder* CreateFilterBlockBuilder(
64     const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
65     const FilterBuildingContext& context,
66     const bool use_delta_encoding_for_index_values,
67     PartitionedIndexBuilder* const p_index_builder) {
68   const BlockBasedTableOptions& table_opt = context.table_options;
69   if (table_opt.filter_policy == nullptr) return nullptr;
70 
71   FilterBitsBuilder* filter_bits_builder =
72       BloomFilterPolicy::GetBuilderFromContext(context);
73   if (filter_bits_builder == nullptr) {
74     return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
75                                             table_opt);
76   } else {
77     if (table_opt.partition_filters) {
78       assert(p_index_builder != nullptr);
79       // Since after partition cut request from filter builder it takes time
80       // until index builder actully cuts the partition, we take the lower bound
81       // as partition size.
82       assert(table_opt.block_size_deviation <= 100);
83       auto partition_size =
84           static_cast<uint32_t>(((table_opt.metadata_block_size *
85                                   (100 - table_opt.block_size_deviation)) +
86                                  99) /
87                                 100);
88       partition_size = std::max(partition_size, static_cast<uint32_t>(1));
89       return new PartitionedFilterBlockBuilder(
90           mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
91           filter_bits_builder, table_opt.index_block_restart_interval,
92           use_delta_encoding_for_index_values, p_index_builder, partition_size);
93     } else {
94       return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
95                                         table_opt.whole_key_filtering,
96                                         filter_bits_builder);
97     }
98   }
99 }
100 
GoodCompressionRatio(size_t compressed_size,size_t raw_size)101 bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
102   // Check to see if compressed less than 12.5%
103   return compressed_size < raw_size - (raw_size / 8u);
104 }
105 
CompressBlockInternal(const Slice & raw,const CompressionInfo & compression_info,uint32_t format_version,std::string * compressed_output)106 bool CompressBlockInternal(const Slice& raw,
107                            const CompressionInfo& compression_info,
108                            uint32_t format_version,
109                            std::string* compressed_output) {
110   // Will return compressed block contents if (1) the compression method is
111   // supported in this platform and (2) the compression rate is "good enough".
112   switch (compression_info.type()) {
113     case kSnappyCompression:
114       return Snappy_Compress(compression_info, raw.data(), raw.size(),
115                              compressed_output);
116     case kZlibCompression:
117       return Zlib_Compress(
118           compression_info,
119           GetCompressFormatForVersion(kZlibCompression, format_version),
120           raw.data(), raw.size(), compressed_output);
121     case kBZip2Compression:
122       return BZip2_Compress(
123           compression_info,
124           GetCompressFormatForVersion(kBZip2Compression, format_version),
125           raw.data(), raw.size(), compressed_output);
126     case kLZ4Compression:
127       return LZ4_Compress(
128           compression_info,
129           GetCompressFormatForVersion(kLZ4Compression, format_version),
130           raw.data(), raw.size(), compressed_output);
131     case kLZ4HCCompression:
132       return LZ4HC_Compress(
133           compression_info,
134           GetCompressFormatForVersion(kLZ4HCCompression, format_version),
135           raw.data(), raw.size(), compressed_output);
136     case kXpressCompression:
137       return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
138     case kZSTD:
139     case kZSTDNotFinalCompression:
140       return ZSTD_Compress(compression_info, raw.data(), raw.size(),
141                            compressed_output);
142     default:
143       // Do not recognize this compression type
144       return false;
145   }
146 }
147 
148 }  // namespace
149 
150 // format_version is the block format as defined in include/rocksdb/table.h
CompressBlock(const Slice & raw,const CompressionInfo & info,CompressionType * type,uint32_t format_version,bool do_sample,std::string * compressed_output,std::string * sampled_output_fast,std::string * sampled_output_slow)151 Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
152                     CompressionType* type, uint32_t format_version,
153                     bool do_sample, std::string* compressed_output,
154                     std::string* sampled_output_fast,
155                     std::string* sampled_output_slow) {
156   *type = info.type();
157 
158   if (info.type() == kNoCompression && !info.SampleForCompression()) {
159     return raw;
160   }
161 
162   // If requested, we sample one in every N block with a
163   // fast and slow compression algorithm and report the stats.
164   // The users can use these stats to decide if it is worthwhile
165   // enabling compression and they also get a hint about which
166   // compression algorithm wil be beneficial.
167   if (do_sample && info.SampleForCompression() &&
168       Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
169       sampled_output_fast && sampled_output_slow) {
170     // Sampling with a fast compression algorithm
171     if (LZ4_Supported() || Snappy_Supported()) {
172       CompressionType c =
173           LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
174       CompressionContext context(c);
175       CompressionOptions options;
176       CompressionInfo info_tmp(options, context,
177                                CompressionDict::GetEmptyDict(), c,
178                                info.SampleForCompression());
179 
180       CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
181     }
182 
183     // Sampling with a slow but high-compression algorithm
184     if (ZSTD_Supported() || Zlib_Supported()) {
185       CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
186       CompressionContext context(c);
187       CompressionOptions options;
188       CompressionInfo info_tmp(options, context,
189                                CompressionDict::GetEmptyDict(), c,
190                                info.SampleForCompression());
191       CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
192     }
193   }
194 
195   // Actually compress the data
196   if (*type != kNoCompression) {
197     if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
198         GoodCompressionRatio(compressed_output->size(), raw.size())) {
199       return *compressed_output;
200     }
201   }
202 
203   // Compression method is not supported, or not good
204   // compression ratio, so just fall back to uncompressed form.
205   *type = kNoCompression;
206   return raw;
207 }
208 
209 // kBlockBasedTableMagicNumber was picked by running
210 //    echo rocksdb.table.block_based | sha1sum
211 // and taking the leading 64 bits.
212 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
213 // .cc files
214 // for that reason we declare it extern in the header but to get the space
215 // allocated
216 // it must be not extern in one place.
217 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
218 // We also support reading and writing legacy block based table format (for
219 // backwards compatibility)
220 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
221 
222 // A collector that collects properties of interest to block-based table.
223 // For now this class looks heavy-weight since we only write one additional
224 // property.
225 // But in the foreseeable future, we will add more and more properties that are
226 // specific to block-based table.
227 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
228     : public IntTblPropCollector {
229  public:
BlockBasedTablePropertiesCollector(BlockBasedTableOptions::IndexType index_type,bool whole_key_filtering,bool prefix_filtering)230   explicit BlockBasedTablePropertiesCollector(
231       BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
232       bool prefix_filtering)
233       : index_type_(index_type),
234         whole_key_filtering_(whole_key_filtering),
235         prefix_filtering_(prefix_filtering) {}
236 
InternalAdd(const Slice &,const Slice &,uint64_t)237   Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
238                      uint64_t /*file_size*/) override {
239     // Intentionally left blank. Have no interest in collecting stats for
240     // individual key/value pairs.
241     return Status::OK();
242   }
243 
BlockAdd(uint64_t,uint64_t,uint64_t)244   virtual void BlockAdd(uint64_t /* blockRawBytes */,
245                         uint64_t /* blockCompressedBytesFast */,
246                         uint64_t /* blockCompressedBytesSlow */) override {
247     // Intentionally left blank. No interest in collecting stats for
248     // blocks.
249     return;
250   }
251 
Finish(UserCollectedProperties * properties)252   Status Finish(UserCollectedProperties* properties) override {
253     std::string val;
254     PutFixed32(&val, static_cast<uint32_t>(index_type_));
255     properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
256     properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
257                         whole_key_filtering_ ? kPropTrue : kPropFalse});
258     properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
259                         prefix_filtering_ ? kPropTrue : kPropFalse});
260     return Status::OK();
261   }
262 
263   // The name of the properties collector can be used for debugging purpose.
Name() const264   const char* Name() const override {
265     return "BlockBasedTablePropertiesCollector";
266   }
267 
GetReadableProperties() const268   UserCollectedProperties GetReadableProperties() const override {
269     // Intentionally left blank.
270     return UserCollectedProperties();
271   }
272 
273  private:
274   BlockBasedTableOptions::IndexType index_type_;
275   bool whole_key_filtering_;
276   bool prefix_filtering_;
277 };
278 
279 struct BlockBasedTableBuilder::Rep {
280   const ImmutableCFOptions ioptions;
281   const MutableCFOptions moptions;
282   const BlockBasedTableOptions table_options;
283   const InternalKeyComparator& internal_comparator;
284   WritableFileWriter* file;
285   uint64_t offset = 0;
286   Status status;
287   IOStatus io_status;
288   size_t alignment;
289   BlockBuilder data_block;
290   // Buffers uncompressed data blocks and keys to replay later. Needed when
291   // compression dictionary is enabled so we can finalize the dictionary before
292   // compressing any data blocks.
293   // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
294   // blocks as it's redundant, but it's easier to implement for now.
295   std::vector<std::pair<std::string, std::vector<std::string>>>
296       data_block_and_keys_buffers;
297   BlockBuilder range_del_block;
298 
299   InternalKeySliceTransform internal_prefix_transform;
300   std::unique_ptr<IndexBuilder> index_builder;
301   PartitionedIndexBuilder* p_index_builder_ = nullptr;
302 
303   std::string last_key;
304   CompressionType compression_type;
305   uint64_t sample_for_compression;
306   CompressionOptions compression_opts;
307   std::unique_ptr<CompressionDict> compression_dict;
308   CompressionContext compression_ctx;
309   std::unique_ptr<UncompressionContext> verify_ctx;
310   std::unique_ptr<UncompressionDict> verify_dict;
311 
312   size_t data_begin_offset = 0;
313 
314   TableProperties props;
315 
316   // States of the builder.
317   //
318   // - `kBuffered`: This is the initial state where zero or more data blocks are
319   //   accumulated uncompressed in-memory. From this state, call
320   //   `EnterUnbuffered()` to finalize the compression dictionary if enabled,
321   //   compress/write out any buffered blocks, and proceed to the `kUnbuffered`
322   //   state.
323   //
324   // - `kUnbuffered`: This is the state when compression dictionary is finalized
325   //   either because it wasn't enabled in the first place or it's been created
326   //   from sampling previously buffered data. In this state, blocks are simply
327   //   compressed/written out as they fill up. From this state, call `Finish()`
328   //   to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
329   //   the partially created file.
330   //
331   // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
332   //   called, so the table builder is no longer usable. We must be in this
333   //   state by the time the destructor runs.
334   enum class State {
335     kBuffered,
336     kUnbuffered,
337     kClosed,
338   };
339   State state;
340 
341   const bool use_delta_encoding_for_index_values;
342   std::unique_ptr<FilterBlockBuilder> filter_builder;
343   char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
344   size_t compressed_cache_key_prefix_size;
345 
346   BlockHandle pending_handle;  // Handle to add to index block
347 
348   std::string compressed_output;
349   std::unique_ptr<FlushBlockPolicy> flush_block_policy;
350   int level_at_creation;
351   uint32_t column_family_id;
352   const std::string& column_family_name;
353   uint64_t creation_time = 0;
354   uint64_t oldest_key_time = 0;
355   const uint64_t target_file_size;
356   uint64_t file_creation_time = 0;
357 
358   std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
359 
RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep360   Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
361       const BlockBasedTableOptions& table_opt,
362       const InternalKeyComparator& icomparator,
363       const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
364           int_tbl_prop_collector_factories,
365       uint32_t _column_family_id, WritableFileWriter* f,
366       const CompressionType _compression_type,
367       const uint64_t _sample_for_compression,
368       const CompressionOptions& _compression_opts, const bool skip_filters,
369       const int _level_at_creation, const std::string& _column_family_name,
370       const uint64_t _creation_time, const uint64_t _oldest_key_time,
371       const uint64_t _target_file_size, const uint64_t _file_creation_time)
372       : ioptions(_ioptions),
373         moptions(_moptions),
374         table_options(table_opt),
375         internal_comparator(icomparator),
376         file(f),
377         alignment(table_options.block_align
378                       ? std::min(table_options.block_size, kDefaultPageSize)
379                       : 0),
380         data_block(table_options.block_restart_interval,
381                    table_options.use_delta_encoding,
382                    false /* use_value_delta_encoding */,
383                    icomparator.user_comparator()
384                            ->CanKeysWithDifferentByteContentsBeEqual()
385                        ? BlockBasedTableOptions::kDataBlockBinarySearch
386                        : table_options.data_block_index_type,
387                    table_options.data_block_hash_table_util_ratio),
388         range_del_block(1 /* block_restart_interval */),
389         internal_prefix_transform(_moptions.prefix_extractor.get()),
390         compression_type(_compression_type),
391         sample_for_compression(_sample_for_compression),
392         compression_opts(_compression_opts),
393         compression_dict(),
394         compression_ctx(_compression_type),
395         verify_dict(),
396         state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
397                                                      : State::kUnbuffered),
398         use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
399                                             !table_opt.block_align),
400         compressed_cache_key_prefix_size(0),
401         flush_block_policy(
402             table_options.flush_block_policy_factory->NewFlushBlockPolicy(
403                 table_options, data_block)),
404         level_at_creation(_level_at_creation),
405         column_family_id(_column_family_id),
406         column_family_name(_column_family_name),
407         creation_time(_creation_time),
408         oldest_key_time(_oldest_key_time),
409         target_file_size(_target_file_size),
410         file_creation_time(_file_creation_time) {
411     if (table_options.index_type ==
412         BlockBasedTableOptions::kTwoLevelIndexSearch) {
413       p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
414           &internal_comparator, use_delta_encoding_for_index_values,
415           table_options);
416       index_builder.reset(p_index_builder_);
417     } else {
418       index_builder.reset(IndexBuilder::CreateIndexBuilder(
419           table_options.index_type, &internal_comparator,
420           &this->internal_prefix_transform, use_delta_encoding_for_index_values,
421           table_options));
422     }
423     if (skip_filters) {
424       filter_builder = nullptr;
425     } else {
426       FilterBuildingContext context(table_options);
427       context.column_family_name = column_family_name;
428       context.compaction_style = ioptions.compaction_style;
429       context.level_at_creation = level_at_creation;
430       context.info_log = ioptions.info_log;
431       filter_builder.reset(CreateFilterBlockBuilder(
432           ioptions, moptions, context, use_delta_encoding_for_index_values,
433           p_index_builder_));
434     }
435 
436     for (auto& collector_factories : *int_tbl_prop_collector_factories) {
437       table_properties_collectors.emplace_back(
438           collector_factories->CreateIntTblPropCollector(column_family_id));
439     }
440     table_properties_collectors.emplace_back(
441         new BlockBasedTablePropertiesCollector(
442             table_options.index_type, table_options.whole_key_filtering,
443             _moptions.prefix_extractor != nullptr));
444     if (table_options.verify_compression) {
445       verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
446                                                 compression_type));
447     }
448   }
449 
450   Rep(const Rep&) = delete;
451   Rep& operator=(const Rep&) = delete;
452 
~RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep453   ~Rep() {}
454 };
455 
BlockBasedTableBuilder(const ImmutableCFOptions & ioptions,const MutableCFOptions & moptions,const BlockBasedTableOptions & table_options,const InternalKeyComparator & internal_comparator,const std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories,uint32_t column_family_id,WritableFileWriter * file,const CompressionType compression_type,const uint64_t sample_for_compression,const CompressionOptions & compression_opts,const bool skip_filters,const std::string & column_family_name,const int level_at_creation,const uint64_t creation_time,const uint64_t oldest_key_time,const uint64_t target_file_size,const uint64_t file_creation_time)456 BlockBasedTableBuilder::BlockBasedTableBuilder(
457     const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
458     const BlockBasedTableOptions& table_options,
459     const InternalKeyComparator& internal_comparator,
460     const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
461         int_tbl_prop_collector_factories,
462     uint32_t column_family_id, WritableFileWriter* file,
463     const CompressionType compression_type,
464     const uint64_t sample_for_compression,
465     const CompressionOptions& compression_opts, const bool skip_filters,
466     const std::string& column_family_name, const int level_at_creation,
467     const uint64_t creation_time, const uint64_t oldest_key_time,
468     const uint64_t target_file_size, const uint64_t file_creation_time) {
469   BlockBasedTableOptions sanitized_table_options(table_options);
470   if (sanitized_table_options.format_version == 0 &&
471       sanitized_table_options.checksum != kCRC32c) {
472     ROCKS_LOG_WARN(
473         ioptions.info_log,
474         "Silently converting format_version to 1 because checksum is "
475         "non-default");
476     // silently convert format_version to 1 to keep consistent with current
477     // behavior
478     sanitized_table_options.format_version = 1;
479   }
480 
481   rep_ = new Rep(ioptions, moptions, sanitized_table_options,
482                  internal_comparator, int_tbl_prop_collector_factories,
483                  column_family_id, file, compression_type,
484                  sample_for_compression, compression_opts, skip_filters,
485                  level_at_creation, column_family_name, creation_time,
486                  oldest_key_time, target_file_size, file_creation_time);
487 
488   if (rep_->filter_builder != nullptr) {
489     rep_->filter_builder->StartBlock(0);
490   }
491   if (table_options.block_cache_compressed.get() != nullptr) {
492     BlockBasedTable::GenerateCachePrefix(
493         table_options.block_cache_compressed.get(), file->writable_file(),
494         &rep_->compressed_cache_key_prefix[0],
495         &rep_->compressed_cache_key_prefix_size);
496   }
497 }
498 
~BlockBasedTableBuilder()499 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
500   // Catch errors where caller forgot to call Finish()
501   assert(rep_->state == Rep::State::kClosed);
502   delete rep_;
503 }
504 
Add(const Slice & key,const Slice & value)505 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
506   Rep* r = rep_;
507   assert(rep_->state != Rep::State::kClosed);
508   if (!ok()) return;
509   ValueType value_type = ExtractValueType(key);
510   if (IsValueType(value_type)) {
511 #ifndef NDEBUG
512     if (r->props.num_entries > r->props.num_range_deletions) {
513       assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
514     }
515 #endif  // NDEBUG
516 
517     auto should_flush = r->flush_block_policy->Update(key, value);
518     if (should_flush) {
519       assert(!r->data_block.empty());
520       Flush();
521 
522       if (r->state == Rep::State::kBuffered &&
523           r->data_begin_offset > r->target_file_size) {
524         EnterUnbuffered();
525       }
526 
527       // Add item to index block.
528       // We do not emit the index entry for a block until we have seen the
529       // first key for the next data block.  This allows us to use shorter
530       // keys in the index block.  For example, consider a block boundary
531       // between the keys "the quick brown fox" and "the who".  We can use
532       // "the r" as the key for the index block entry since it is >= all
533       // entries in the first block and < all entries in subsequent
534       // blocks.
535       if (ok() && r->state == Rep::State::kUnbuffered) {
536         r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
537       }
538     }
539 
540     // Note: PartitionedFilterBlockBuilder requires key being added to filter
541     // builder after being added to index builder.
542     if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
543       size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size();
544       r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
545     }
546 
547     r->last_key.assign(key.data(), key.size());
548     r->data_block.Add(key, value);
549     if (r->state == Rep::State::kBuffered) {
550       // Buffer keys to be replayed during `Finish()` once compression
551       // dictionary has been finalized.
552       if (r->data_block_and_keys_buffers.empty() || should_flush) {
553         r->data_block_and_keys_buffers.emplace_back();
554       }
555       r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
556     } else {
557       r->index_builder->OnKeyAdded(key);
558     }
559     NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
560                                       r->table_properties_collectors,
561                                       r->ioptions.info_log);
562 
563   } else if (value_type == kTypeRangeDeletion) {
564     r->range_del_block.Add(key, value);
565     NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
566                                       r->table_properties_collectors,
567                                       r->ioptions.info_log);
568   } else {
569     assert(false);
570   }
571 
572   r->props.num_entries++;
573   r->props.raw_key_size += key.size();
574   r->props.raw_value_size += value.size();
575   if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
576     r->props.num_deletions++;
577   } else if (value_type == kTypeRangeDeletion) {
578     r->props.num_deletions++;
579     r->props.num_range_deletions++;
580   } else if (value_type == kTypeMerge) {
581     r->props.num_merge_operands++;
582   }
583 }
584 
Flush()585 void BlockBasedTableBuilder::Flush() {
586   Rep* r = rep_;
587   assert(rep_->state != Rep::State::kClosed);
588   if (!ok()) return;
589   if (r->data_block.empty()) return;
590   WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
591 }
592 
WriteBlock(BlockBuilder * block,BlockHandle * handle,bool is_data_block)593 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
594                                         BlockHandle* handle,
595                                         bool is_data_block) {
596   WriteBlock(block->Finish(), handle, is_data_block);
597   block->Reset();
598 }
599 
WriteBlock(const Slice & raw_block_contents,BlockHandle * handle,bool is_data_block)600 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
601                                         BlockHandle* handle,
602                                         bool is_data_block) {
603   // File format contains a sequence of blocks where each block has:
604   //    block_data: uint8[n]
605   //    type: uint8
606   //    crc: uint32
607   assert(ok());
608   Rep* r = rep_;
609 
610   auto type = r->compression_type;
611   uint64_t sample_for_compression = r->sample_for_compression;
612   Slice block_contents;
613   bool abort_compression = false;
614 
615   StopWatchNano timer(
616       r->ioptions.env,
617       ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
618 
619   if (r->state == Rep::State::kBuffered) {
620     assert(is_data_block);
621     assert(!r->data_block_and_keys_buffers.empty());
622     r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
623     r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
624     return;
625   }
626 
627   if (raw_block_contents.size() < kCompressionSizeLimit) {
628     const CompressionDict* compression_dict;
629     if (!is_data_block || r->compression_dict == nullptr) {
630       compression_dict = &CompressionDict::GetEmptyDict();
631     } else {
632       compression_dict = r->compression_dict.get();
633     }
634     assert(compression_dict != nullptr);
635     CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
636                                      *compression_dict, type,
637                                      sample_for_compression);
638 
639     std::string sampled_output_fast;
640     std::string sampled_output_slow;
641     block_contents = CompressBlock(
642         raw_block_contents, compression_info, &type,
643         r->table_options.format_version, is_data_block /* do_sample */,
644         &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
645 
646     // notify collectors on block add
647     NotifyCollectTableCollectorsOnBlockAdd(
648         r->table_properties_collectors, raw_block_contents.size(),
649         sampled_output_fast.size(), sampled_output_slow.size());
650 
651     // Some of the compression algorithms are known to be unreliable. If
652     // the verify_compression flag is set then try to de-compress the
653     // compressed data and compare to the input.
654     if (type != kNoCompression && r->table_options.verify_compression) {
655       // Retrieve the uncompressed contents into a new buffer
656       const UncompressionDict* verify_dict;
657       if (!is_data_block || r->verify_dict == nullptr) {
658         verify_dict = &UncompressionDict::GetEmptyDict();
659       } else {
660         verify_dict = r->verify_dict.get();
661       }
662       assert(verify_dict != nullptr);
663       BlockContents contents;
664       UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
665                                            r->compression_type);
666       Status stat = UncompressBlockContentsForCompressionType(
667           uncompression_info, block_contents.data(), block_contents.size(),
668           &contents, r->table_options.format_version, r->ioptions);
669 
670       if (stat.ok()) {
671         bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
672         if (!compressed_ok) {
673           // The result of the compression was invalid. abort.
674           abort_compression = true;
675           ROCKS_LOG_ERROR(r->ioptions.info_log,
676                           "Decompressed block did not match raw block");
677           r->status =
678               Status::Corruption("Decompressed block did not match raw block");
679         }
680       } else {
681         // Decompression reported an error. abort.
682         r->status = Status::Corruption("Could not decompress");
683         abort_compression = true;
684       }
685     }
686   } else {
687     // Block is too big to be compressed.
688     abort_compression = true;
689   }
690 
691   // Abort compression if the block is too big, or did not pass
692   // verification.
693   if (abort_compression) {
694     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
695     type = kNoCompression;
696     block_contents = raw_block_contents;
697   } else if (type != kNoCompression) {
698     if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
699       RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
700                             timer.ElapsedNanos());
701     }
702     RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
703                       raw_block_contents.size());
704     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
705   } else if (type != r->compression_type) {
706     RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
707   }
708 
709   WriteRawBlock(block_contents, type, handle, is_data_block);
710   r->compressed_output.clear();
711   if (is_data_block) {
712     if (r->filter_builder != nullptr) {
713       r->filter_builder->StartBlock(r->offset);
714     }
715     r->props.data_size = r->offset;
716     ++r->props.num_data_blocks;
717   }
718 }
719 
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle,bool is_data_block)720 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
721                                            CompressionType type,
722                                            BlockHandle* handle,
723                                            bool is_data_block) {
724   Rep* r = rep_;
725   StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
726   handle->set_offset(r->offset);
727   handle->set_size(block_contents.size());
728   assert(r->status.ok());
729   assert(r->io_status.ok());
730   r->io_status = r->file->Append(block_contents);
731   if (r->io_status.ok()) {
732     char trailer[kBlockTrailerSize];
733     trailer[0] = type;
734     char* trailer_without_type = trailer + 1;
735     switch (r->table_options.checksum) {
736       case kNoChecksum:
737         EncodeFixed32(trailer_without_type, 0);
738         break;
739       case kCRC32c: {
740         auto crc = crc32c::Value(block_contents.data(), block_contents.size());
741         crc = crc32c::Extend(crc, trailer, 1);  // Extend to cover block type
742         EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
743         break;
744       }
745       case kxxHash: {
746         XXH32_state_t* const state = XXH32_createState();
747         XXH32_reset(state, 0);
748         XXH32_update(state, block_contents.data(),
749                      static_cast<uint32_t>(block_contents.size()));
750         XXH32_update(state, trailer, 1);  // Extend  to cover block type
751         EncodeFixed32(trailer_without_type, XXH32_digest(state));
752         XXH32_freeState(state);
753         break;
754       }
755       case kxxHash64: {
756         XXH64_state_t* const state = XXH64_createState();
757         XXH64_reset(state, 0);
758         XXH64_update(state, block_contents.data(),
759                      static_cast<uint32_t>(block_contents.size()));
760         XXH64_update(state, trailer, 1);  // Extend  to cover block type
761         EncodeFixed32(
762             trailer_without_type,
763             static_cast<uint32_t>(XXH64_digest(state) &  // lower 32 bits
764                                   uint64_t{0xffffffff}));
765         XXH64_freeState(state);
766         break;
767       }
768     }
769 
770     assert(r->io_status.ok());
771     TEST_SYNC_POINT_CALLBACK(
772         "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
773         static_cast<char*>(trailer));
774     r->io_status = r->file->Append(Slice(trailer, kBlockTrailerSize));
775     if (r->io_status.ok()) {
776       r->status = InsertBlockInCache(block_contents, type, handle);
777     }
778     if (r->status.ok() && r->io_status.ok()) {
779       r->offset += block_contents.size() + kBlockTrailerSize;
780       if (r->table_options.block_align && is_data_block) {
781         size_t pad_bytes =
782             (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
783                              (r->alignment - 1))) &
784             (r->alignment - 1);
785         r->io_status = r->file->Pad(pad_bytes);
786         if (r->io_status.ok()) {
787           r->offset += pad_bytes;
788         }
789       }
790     }
791   }
792   r->status = r->io_status;
793 }
794 
status() const795 Status BlockBasedTableBuilder::status() const { return rep_->status; }
796 
io_status() const797 IOStatus BlockBasedTableBuilder::io_status() const { return rep_->io_status; }
798 
799 //
800 // Make a copy of the block contents and insert into compressed block cache
801 //
InsertBlockInCache(const Slice & block_contents,const CompressionType type,const BlockHandle * handle)802 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
803                                                   const CompressionType type,
804                                                   const BlockHandle* handle) {
805   Rep* r = rep_;
806   Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
807 
808   if (type != kNoCompression && block_cache_compressed != nullptr) {
809     size_t size = block_contents.size();
810 
811     auto ubuf =
812         AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
813     memcpy(ubuf.get(), block_contents.data(), size);
814     ubuf[size] = type;
815 
816     BlockContents* block_contents_to_cache =
817         new BlockContents(std::move(ubuf), size);
818 #ifndef NDEBUG
819     block_contents_to_cache->is_raw_block = true;
820 #endif  // NDEBUG
821 
822     // make cache key by appending the file offset to the cache prefix id
823     char* end = EncodeVarint64(
824         r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
825         handle->offset());
826     Slice key(r->compressed_cache_key_prefix,
827               static_cast<size_t>(end - r->compressed_cache_key_prefix));
828 
829     // Insert into compressed block cache.
830     block_cache_compressed->Insert(
831         key, block_contents_to_cache,
832         block_contents_to_cache->ApproximateMemoryUsage(),
833         SimpleDeleter<BlockContents>::GetInstance());
834 
835     // Invalidate OS cache.
836     r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
837   }
838   return Status::OK();
839 }
840 
WriteFilterBlock(MetaIndexBuilder * meta_index_builder)841 void BlockBasedTableBuilder::WriteFilterBlock(
842     MetaIndexBuilder* meta_index_builder) {
843   BlockHandle filter_block_handle;
844   bool empty_filter_block = (rep_->filter_builder == nullptr ||
845                              rep_->filter_builder->NumAdded() == 0);
846   if (ok() && !empty_filter_block) {
847     Status s = Status::Incomplete();
848     while (ok() && s.IsIncomplete()) {
849       Slice filter_content =
850           rep_->filter_builder->Finish(filter_block_handle, &s);
851       assert(s.ok() || s.IsIncomplete());
852       rep_->props.filter_size += filter_content.size();
853       WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
854     }
855   }
856   if (ok() && !empty_filter_block) {
857     // Add mapping from "<filter_block_prefix>.Name" to location
858     // of filter data.
859     std::string key;
860     if (rep_->filter_builder->IsBlockBased()) {
861       key = BlockBasedTable::kFilterBlockPrefix;
862     } else {
863       key = rep_->table_options.partition_filters
864                 ? BlockBasedTable::kPartitionedFilterBlockPrefix
865                 : BlockBasedTable::kFullFilterBlockPrefix;
866     }
867     key.append(rep_->table_options.filter_policy->Name());
868     meta_index_builder->Add(key, filter_block_handle);
869   }
870 }
871 
WriteIndexBlock(MetaIndexBuilder * meta_index_builder,BlockHandle * index_block_handle)872 void BlockBasedTableBuilder::WriteIndexBlock(
873     MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
874   IndexBuilder::IndexBlocks index_blocks;
875   auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
876   if (index_builder_status.IsIncomplete()) {
877     // We we have more than one index partition then meta_blocks are not
878     // supported for the index. Currently meta_blocks are used only by
879     // HashIndexBuilder which is not multi-partition.
880     assert(index_blocks.meta_blocks.empty());
881   } else if (ok() && !index_builder_status.ok()) {
882     rep_->status = index_builder_status;
883   }
884   if (ok()) {
885     for (const auto& item : index_blocks.meta_blocks) {
886       BlockHandle block_handle;
887       WriteBlock(item.second, &block_handle, false /* is_data_block */);
888       if (!ok()) {
889         break;
890       }
891       meta_index_builder->Add(item.first, block_handle);
892     }
893   }
894   if (ok()) {
895     if (rep_->table_options.enable_index_compression) {
896       WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
897     } else {
898       WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
899                     index_block_handle);
900     }
901   }
902   // If there are more index partitions, finish them and write them out
903   Status s = index_builder_status;
904   while (ok() && s.IsIncomplete()) {
905     s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
906     if (!s.ok() && !s.IsIncomplete()) {
907       rep_->status = s;
908       return;
909     }
910     if (rep_->table_options.enable_index_compression) {
911       WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
912     } else {
913       WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
914                     index_block_handle);
915     }
916     // The last index_block_handle will be for the partition index block
917   }
918 }
919 
WritePropertiesBlock(MetaIndexBuilder * meta_index_builder)920 void BlockBasedTableBuilder::WritePropertiesBlock(
921     MetaIndexBuilder* meta_index_builder) {
922   BlockHandle properties_block_handle;
923   if (ok()) {
924     PropertyBlockBuilder property_block_builder;
925     rep_->props.column_family_id = rep_->column_family_id;
926     rep_->props.column_family_name = rep_->column_family_name;
927     rep_->props.filter_policy_name =
928         rep_->table_options.filter_policy != nullptr
929             ? rep_->table_options.filter_policy->Name()
930             : "";
931     rep_->props.index_size =
932         rep_->index_builder->IndexSize() + kBlockTrailerSize;
933     rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
934                                       ? rep_->ioptions.user_comparator->Name()
935                                       : "nullptr";
936     rep_->props.merge_operator_name =
937         rep_->ioptions.merge_operator != nullptr
938             ? rep_->ioptions.merge_operator->Name()
939             : "nullptr";
940     rep_->props.compression_name =
941         CompressionTypeToString(rep_->compression_type);
942     rep_->props.compression_options =
943         CompressionOptionsToString(rep_->compression_opts);
944     rep_->props.prefix_extractor_name =
945         rep_->moptions.prefix_extractor != nullptr
946             ? rep_->moptions.prefix_extractor->Name()
947             : "nullptr";
948 
949     std::string property_collectors_names = "[";
950     for (size_t i = 0;
951          i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
952       if (i != 0) {
953         property_collectors_names += ",";
954       }
955       property_collectors_names +=
956           rep_->ioptions.table_properties_collector_factories[i]->Name();
957     }
958     property_collectors_names += "]";
959     rep_->props.property_collectors_names = property_collectors_names;
960     if (rep_->table_options.index_type ==
961         BlockBasedTableOptions::kTwoLevelIndexSearch) {
962       assert(rep_->p_index_builder_ != nullptr);
963       rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
964       rep_->props.top_level_index_size =
965           rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
966     }
967     rep_->props.index_key_is_user_key =
968         !rep_->index_builder->seperator_is_key_plus_seq();
969     rep_->props.index_value_is_delta_encoded =
970         rep_->use_delta_encoding_for_index_values;
971     rep_->props.creation_time = rep_->creation_time;
972     rep_->props.oldest_key_time = rep_->oldest_key_time;
973     rep_->props.file_creation_time = rep_->file_creation_time;
974 
975     // Add basic properties
976     property_block_builder.AddTableProperty(rep_->props);
977 
978     // Add use collected properties
979     NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
980                                          rep_->ioptions.info_log,
981                                          &property_block_builder);
982 
983     WriteRawBlock(property_block_builder.Finish(), kNoCompression,
984                   &properties_block_handle);
985   }
986   if (ok()) {
987 #ifndef NDEBUG
988     {
989       uint64_t props_block_offset = properties_block_handle.offset();
990       uint64_t props_block_size = properties_block_handle.size();
991       TEST_SYNC_POINT_CALLBACK(
992           "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
993           &props_block_offset);
994       TEST_SYNC_POINT_CALLBACK(
995           "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
996           &props_block_size);
997     }
998 #endif  // !NDEBUG
999     meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
1000   }
1001 }
1002 
WriteCompressionDictBlock(MetaIndexBuilder * meta_index_builder)1003 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1004     MetaIndexBuilder* meta_index_builder) {
1005   if (rep_->compression_dict != nullptr &&
1006       rep_->compression_dict->GetRawDict().size()) {
1007     BlockHandle compression_dict_block_handle;
1008     if (ok()) {
1009       WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
1010                     &compression_dict_block_handle);
1011 #ifndef NDEBUG
1012       Slice compression_dict = rep_->compression_dict->GetRawDict();
1013       TEST_SYNC_POINT_CALLBACK(
1014           "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1015           &compression_dict);
1016 #endif  // NDEBUG
1017     }
1018     if (ok()) {
1019       meta_index_builder->Add(kCompressionDictBlock,
1020                               compression_dict_block_handle);
1021     }
1022   }
1023 }
1024 
WriteRangeDelBlock(MetaIndexBuilder * meta_index_builder)1025 void BlockBasedTableBuilder::WriteRangeDelBlock(
1026     MetaIndexBuilder* meta_index_builder) {
1027   if (ok() && !rep_->range_del_block.empty()) {
1028     BlockHandle range_del_block_handle;
1029     WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1030                   &range_del_block_handle);
1031     meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1032   }
1033 }
1034 
WriteFooter(BlockHandle & metaindex_block_handle,BlockHandle & index_block_handle)1035 void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1036                                          BlockHandle& index_block_handle) {
1037   Rep* r = rep_;
1038   // No need to write out new footer if we're using default checksum.
1039   // We're writing legacy magic number because we want old versions of RocksDB
1040   // be able to read files generated with new release (just in case if
1041   // somebody wants to roll back after an upgrade)
1042   // TODO(icanadi) at some point in the future, when we're absolutely sure
1043   // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1044   // number and always write new table files with new magic number
1045   bool legacy = (r->table_options.format_version == 0);
1046   // this is guaranteed by BlockBasedTableBuilder's constructor
1047   assert(r->table_options.checksum == kCRC32c ||
1048          r->table_options.format_version != 0);
1049   Footer footer(
1050       legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1051       r->table_options.format_version);
1052   footer.set_metaindex_handle(metaindex_block_handle);
1053   footer.set_index_handle(index_block_handle);
1054   footer.set_checksum(r->table_options.checksum);
1055   std::string footer_encoding;
1056   footer.EncodeTo(&footer_encoding);
1057   assert(r->status.ok());
1058   assert(r->io_status.ok());
1059   r->io_status = r->file->Append(footer_encoding);
1060   if (r->io_status.ok()) {
1061     r->offset += footer_encoding.size();
1062   }
1063   r->status = r->io_status;
1064 }
1065 
EnterUnbuffered()1066 void BlockBasedTableBuilder::EnterUnbuffered() {
1067   Rep* r = rep_;
1068   assert(r->state == Rep::State::kBuffered);
1069   r->state = Rep::State::kUnbuffered;
1070   const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1071                                   ? r->compression_opts.zstd_max_train_bytes
1072                                   : r->compression_opts.max_dict_bytes;
1073   Random64 generator{r->creation_time};
1074   std::string compression_dict_samples;
1075   std::vector<size_t> compression_dict_sample_lens;
1076   if (!r->data_block_and_keys_buffers.empty()) {
1077     while (compression_dict_samples.size() < kSampleBytes) {
1078       size_t rand_idx =
1079           static_cast<size_t>(
1080               generator.Uniform(r->data_block_and_keys_buffers.size()));
1081       size_t copy_len =
1082           std::min(kSampleBytes - compression_dict_samples.size(),
1083                    r->data_block_and_keys_buffers[rand_idx].first.size());
1084       compression_dict_samples.append(
1085           r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
1086       compression_dict_sample_lens.emplace_back(copy_len);
1087     }
1088   }
1089 
1090   // final data block flushed, now we can generate dictionary from the samples.
1091   // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1092   std::string dict;
1093   if (r->compression_opts.zstd_max_train_bytes > 0) {
1094     dict = ZSTD_TrainDictionary(compression_dict_samples,
1095                                 compression_dict_sample_lens,
1096                                 r->compression_opts.max_dict_bytes);
1097   } else {
1098     dict = std::move(compression_dict_samples);
1099   }
1100   r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1101                                                 r->compression_opts.level));
1102   r->verify_dict.reset(new UncompressionDict(
1103       dict, r->compression_type == kZSTD ||
1104                 r->compression_type == kZSTDNotFinalCompression));
1105 
1106   for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
1107     const auto& data_block = r->data_block_and_keys_buffers[i].first;
1108     auto& keys = r->data_block_and_keys_buffers[i].second;
1109     assert(!data_block.empty());
1110     assert(!keys.empty());
1111 
1112     for (const auto& key : keys) {
1113       if (r->filter_builder != nullptr) {
1114         size_t ts_sz =
1115             r->internal_comparator.user_comparator()->timestamp_size();
1116         r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1117       }
1118       r->index_builder->OnKeyAdded(key);
1119     }
1120     WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
1121     if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
1122       Slice first_key_in_next_block =
1123           r->data_block_and_keys_buffers[i + 1].second.front();
1124       Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1125       r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
1126                                       r->pending_handle);
1127     }
1128   }
1129   r->data_block_and_keys_buffers.clear();
1130 }
1131 
Finish()1132 Status BlockBasedTableBuilder::Finish() {
1133   Rep* r = rep_;
1134   assert(r->state != Rep::State::kClosed);
1135   bool empty_data_block = r->data_block.empty();
1136   Flush();
1137   if (r->state == Rep::State::kBuffered) {
1138     EnterUnbuffered();
1139   }
1140   // To make sure properties block is able to keep the accurate size of index
1141   // block, we will finish writing all index entries first.
1142   if (ok() && !empty_data_block) {
1143     r->index_builder->AddIndexEntry(
1144         &r->last_key, nullptr /* no next data block */, r->pending_handle);
1145   }
1146 
1147   // Write meta blocks, metaindex block and footer in the following order.
1148   //    1. [meta block: filter]
1149   //    2. [meta block: index]
1150   //    3. [meta block: compression dictionary]
1151   //    4. [meta block: range deletion tombstone]
1152   //    5. [meta block: properties]
1153   //    6. [metaindex block]
1154   //    7. Footer
1155   BlockHandle metaindex_block_handle, index_block_handle;
1156   MetaIndexBuilder meta_index_builder;
1157   WriteFilterBlock(&meta_index_builder);
1158   WriteIndexBlock(&meta_index_builder, &index_block_handle);
1159   WriteCompressionDictBlock(&meta_index_builder);
1160   WriteRangeDelBlock(&meta_index_builder);
1161   WritePropertiesBlock(&meta_index_builder);
1162   if (ok()) {
1163     // flush the meta index block
1164     WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1165                   &metaindex_block_handle);
1166   }
1167   if (ok()) {
1168     WriteFooter(metaindex_block_handle, index_block_handle);
1169   }
1170   r->state = Rep::State::kClosed;
1171   return r->status;
1172 }
1173 
Abandon()1174 void BlockBasedTableBuilder::Abandon() {
1175   assert(rep_->state != Rep::State::kClosed);
1176   rep_->state = Rep::State::kClosed;
1177 }
1178 
NumEntries() const1179 uint64_t BlockBasedTableBuilder::NumEntries() const {
1180   return rep_->props.num_entries;
1181 }
1182 
FileSize() const1183 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
1184 
NeedCompact() const1185 bool BlockBasedTableBuilder::NeedCompact() const {
1186   for (const auto& collector : rep_->table_properties_collectors) {
1187     if (collector->NeedCompact()) {
1188       return true;
1189     }
1190   }
1191   return false;
1192 }
1193 
GetTableProperties() const1194 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1195   TableProperties ret = rep_->props;
1196   for (const auto& collector : rep_->table_properties_collectors) {
1197     for (const auto& prop : collector->GetReadableProperties()) {
1198       ret.readable_properties.insert(prop);
1199     }
1200     collector->Finish(&ret.user_collected_properties);
1201   }
1202   return ret;
1203 }
1204 
GetFileChecksum() const1205 std::string BlockBasedTableBuilder::GetFileChecksum() const {
1206   if (rep_->file != nullptr) {
1207     return rep_->file->GetFileChecksum();
1208   } else {
1209     return kUnknownFileChecksum;
1210   }
1211 }
1212 
GetFileChecksumFuncName() const1213 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
1214   if (rep_->file != nullptr) {
1215     return rep_->file->GetFileChecksumFuncName();
1216   } else {
1217     return kUnknownFileChecksumFuncName.c_str();
1218   }
1219 }
1220 
1221 const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1222 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1223 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1224     "partitionedfilter.";
1225 }  // namespace ROCKSDB_NAMESPACE
1226