1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/block_based/block_based_table_builder.h"
11
12 #include <assert.h>
13 #include <stdio.h>
14 #include <list>
15 #include <map>
16 #include <memory>
17 #include <string>
18 #include <unordered_map>
19 #include <utility>
20
21 #include "cache/simple_deleter.h"
22 #include "db/dbformat.h"
23 #include "index_builder.h"
24
25 #include "rocksdb/cache.h"
26 #include "rocksdb/comparator.h"
27 #include "rocksdb/env.h"
28 #include "rocksdb/flush_block_policy.h"
29 #include "rocksdb/merge_operator.h"
30 #include "rocksdb/table.h"
31
32 #include "table/block_based/block.h"
33 #include "table/block_based/block_based_filter_block.h"
34 #include "table/block_based/block_based_table_factory.h"
35 #include "table/block_based/block_based_table_reader.h"
36 #include "table/block_based/block_builder.h"
37 #include "table/block_based/filter_block.h"
38 #include "table/block_based/filter_policy_internal.h"
39 #include "table/block_based/full_filter_block.h"
40 #include "table/block_based/partitioned_filter_block.h"
41 #include "table/format.h"
42 #include "table/table_builder.h"
43
44 #include "memory/memory_allocator.h"
45 #include "util/coding.h"
46 #include "util/compression.h"
47 #include "util/crc32c.h"
48 #include "util/stop_watch.h"
49 #include "util/string_util.h"
50 #include "util/xxhash.h"
51
52 namespace ROCKSDB_NAMESPACE {
53
54 extern const std::string kHashIndexPrefixesBlock;
55 extern const std::string kHashIndexPrefixesMetadataBlock;
56
57 typedef BlockBasedTableOptions::IndexType IndexType;
58
59 // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
60 namespace {
61
62 // Create a filter block builder based on its type.
CreateFilterBlockBuilder(const ImmutableCFOptions &,const MutableCFOptions & mopt,const FilterBuildingContext & context,const bool use_delta_encoding_for_index_values,PartitionedIndexBuilder * const p_index_builder)63 FilterBlockBuilder* CreateFilterBlockBuilder(
64 const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
65 const FilterBuildingContext& context,
66 const bool use_delta_encoding_for_index_values,
67 PartitionedIndexBuilder* const p_index_builder) {
68 const BlockBasedTableOptions& table_opt = context.table_options;
69 if (table_opt.filter_policy == nullptr) return nullptr;
70
71 FilterBitsBuilder* filter_bits_builder =
72 BloomFilterPolicy::GetBuilderFromContext(context);
73 if (filter_bits_builder == nullptr) {
74 return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(),
75 table_opt);
76 } else {
77 if (table_opt.partition_filters) {
78 assert(p_index_builder != nullptr);
79 // Since after partition cut request from filter builder it takes time
80 // until index builder actully cuts the partition, we take the lower bound
81 // as partition size.
82 assert(table_opt.block_size_deviation <= 100);
83 auto partition_size =
84 static_cast<uint32_t>(((table_opt.metadata_block_size *
85 (100 - table_opt.block_size_deviation)) +
86 99) /
87 100);
88 partition_size = std::max(partition_size, static_cast<uint32_t>(1));
89 return new PartitionedFilterBlockBuilder(
90 mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
91 filter_bits_builder, table_opt.index_block_restart_interval,
92 use_delta_encoding_for_index_values, p_index_builder, partition_size);
93 } else {
94 return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
95 table_opt.whole_key_filtering,
96 filter_bits_builder);
97 }
98 }
99 }
100
GoodCompressionRatio(size_t compressed_size,size_t raw_size)101 bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
102 // Check to see if compressed less than 12.5%
103 return compressed_size < raw_size - (raw_size / 8u);
104 }
105
CompressBlockInternal(const Slice & raw,const CompressionInfo & compression_info,uint32_t format_version,std::string * compressed_output)106 bool CompressBlockInternal(const Slice& raw,
107 const CompressionInfo& compression_info,
108 uint32_t format_version,
109 std::string* compressed_output) {
110 // Will return compressed block contents if (1) the compression method is
111 // supported in this platform and (2) the compression rate is "good enough".
112 switch (compression_info.type()) {
113 case kSnappyCompression:
114 return Snappy_Compress(compression_info, raw.data(), raw.size(),
115 compressed_output);
116 case kZlibCompression:
117 return Zlib_Compress(
118 compression_info,
119 GetCompressFormatForVersion(kZlibCompression, format_version),
120 raw.data(), raw.size(), compressed_output);
121 case kBZip2Compression:
122 return BZip2_Compress(
123 compression_info,
124 GetCompressFormatForVersion(kBZip2Compression, format_version),
125 raw.data(), raw.size(), compressed_output);
126 case kLZ4Compression:
127 return LZ4_Compress(
128 compression_info,
129 GetCompressFormatForVersion(kLZ4Compression, format_version),
130 raw.data(), raw.size(), compressed_output);
131 case kLZ4HCCompression:
132 return LZ4HC_Compress(
133 compression_info,
134 GetCompressFormatForVersion(kLZ4HCCompression, format_version),
135 raw.data(), raw.size(), compressed_output);
136 case kXpressCompression:
137 return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
138 case kZSTD:
139 case kZSTDNotFinalCompression:
140 return ZSTD_Compress(compression_info, raw.data(), raw.size(),
141 compressed_output);
142 default:
143 // Do not recognize this compression type
144 return false;
145 }
146 }
147
148 } // namespace
149
150 // format_version is the block format as defined in include/rocksdb/table.h
CompressBlock(const Slice & raw,const CompressionInfo & info,CompressionType * type,uint32_t format_version,bool do_sample,std::string * compressed_output,std::string * sampled_output_fast,std::string * sampled_output_slow)151 Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
152 CompressionType* type, uint32_t format_version,
153 bool do_sample, std::string* compressed_output,
154 std::string* sampled_output_fast,
155 std::string* sampled_output_slow) {
156 *type = info.type();
157
158 if (info.type() == kNoCompression && !info.SampleForCompression()) {
159 return raw;
160 }
161
162 // If requested, we sample one in every N block with a
163 // fast and slow compression algorithm and report the stats.
164 // The users can use these stats to decide if it is worthwhile
165 // enabling compression and they also get a hint about which
166 // compression algorithm wil be beneficial.
167 if (do_sample && info.SampleForCompression() &&
168 Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
169 sampled_output_fast && sampled_output_slow) {
170 // Sampling with a fast compression algorithm
171 if (LZ4_Supported() || Snappy_Supported()) {
172 CompressionType c =
173 LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
174 CompressionContext context(c);
175 CompressionOptions options;
176 CompressionInfo info_tmp(options, context,
177 CompressionDict::GetEmptyDict(), c,
178 info.SampleForCompression());
179
180 CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
181 }
182
183 // Sampling with a slow but high-compression algorithm
184 if (ZSTD_Supported() || Zlib_Supported()) {
185 CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
186 CompressionContext context(c);
187 CompressionOptions options;
188 CompressionInfo info_tmp(options, context,
189 CompressionDict::GetEmptyDict(), c,
190 info.SampleForCompression());
191 CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
192 }
193 }
194
195 // Actually compress the data
196 if (*type != kNoCompression) {
197 if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
198 GoodCompressionRatio(compressed_output->size(), raw.size())) {
199 return *compressed_output;
200 }
201 }
202
203 // Compression method is not supported, or not good
204 // compression ratio, so just fall back to uncompressed form.
205 *type = kNoCompression;
206 return raw;
207 }
208
209 // kBlockBasedTableMagicNumber was picked by running
210 // echo rocksdb.table.block_based | sha1sum
211 // and taking the leading 64 bits.
212 // Please note that kBlockBasedTableMagicNumber may also be accessed by other
213 // .cc files
214 // for that reason we declare it extern in the header but to get the space
215 // allocated
216 // it must be not extern in one place.
217 const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull;
218 // We also support reading and writing legacy block based table format (for
219 // backwards compatibility)
220 const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull;
221
222 // A collector that collects properties of interest to block-based table.
223 // For now this class looks heavy-weight since we only write one additional
224 // property.
225 // But in the foreseeable future, we will add more and more properties that are
226 // specific to block-based table.
227 class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
228 : public IntTblPropCollector {
229 public:
BlockBasedTablePropertiesCollector(BlockBasedTableOptions::IndexType index_type,bool whole_key_filtering,bool prefix_filtering)230 explicit BlockBasedTablePropertiesCollector(
231 BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering,
232 bool prefix_filtering)
233 : index_type_(index_type),
234 whole_key_filtering_(whole_key_filtering),
235 prefix_filtering_(prefix_filtering) {}
236
InternalAdd(const Slice &,const Slice &,uint64_t)237 Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/,
238 uint64_t /*file_size*/) override {
239 // Intentionally left blank. Have no interest in collecting stats for
240 // individual key/value pairs.
241 return Status::OK();
242 }
243
BlockAdd(uint64_t,uint64_t,uint64_t)244 virtual void BlockAdd(uint64_t /* blockRawBytes */,
245 uint64_t /* blockCompressedBytesFast */,
246 uint64_t /* blockCompressedBytesSlow */) override {
247 // Intentionally left blank. No interest in collecting stats for
248 // blocks.
249 return;
250 }
251
Finish(UserCollectedProperties * properties)252 Status Finish(UserCollectedProperties* properties) override {
253 std::string val;
254 PutFixed32(&val, static_cast<uint32_t>(index_type_));
255 properties->insert({BlockBasedTablePropertyNames::kIndexType, val});
256 properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering,
257 whole_key_filtering_ ? kPropTrue : kPropFalse});
258 properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering,
259 prefix_filtering_ ? kPropTrue : kPropFalse});
260 return Status::OK();
261 }
262
263 // The name of the properties collector can be used for debugging purpose.
Name() const264 const char* Name() const override {
265 return "BlockBasedTablePropertiesCollector";
266 }
267
GetReadableProperties() const268 UserCollectedProperties GetReadableProperties() const override {
269 // Intentionally left blank.
270 return UserCollectedProperties();
271 }
272
273 private:
274 BlockBasedTableOptions::IndexType index_type_;
275 bool whole_key_filtering_;
276 bool prefix_filtering_;
277 };
278
279 struct BlockBasedTableBuilder::Rep {
280 const ImmutableCFOptions ioptions;
281 const MutableCFOptions moptions;
282 const BlockBasedTableOptions table_options;
283 const InternalKeyComparator& internal_comparator;
284 WritableFileWriter* file;
285 uint64_t offset = 0;
286 Status status;
287 IOStatus io_status;
288 size_t alignment;
289 BlockBuilder data_block;
290 // Buffers uncompressed data blocks and keys to replay later. Needed when
291 // compression dictionary is enabled so we can finalize the dictionary before
292 // compressing any data blocks.
293 // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data
294 // blocks as it's redundant, but it's easier to implement for now.
295 std::vector<std::pair<std::string, std::vector<std::string>>>
296 data_block_and_keys_buffers;
297 BlockBuilder range_del_block;
298
299 InternalKeySliceTransform internal_prefix_transform;
300 std::unique_ptr<IndexBuilder> index_builder;
301 PartitionedIndexBuilder* p_index_builder_ = nullptr;
302
303 std::string last_key;
304 CompressionType compression_type;
305 uint64_t sample_for_compression;
306 CompressionOptions compression_opts;
307 std::unique_ptr<CompressionDict> compression_dict;
308 CompressionContext compression_ctx;
309 std::unique_ptr<UncompressionContext> verify_ctx;
310 std::unique_ptr<UncompressionDict> verify_dict;
311
312 size_t data_begin_offset = 0;
313
314 TableProperties props;
315
316 // States of the builder.
317 //
318 // - `kBuffered`: This is the initial state where zero or more data blocks are
319 // accumulated uncompressed in-memory. From this state, call
320 // `EnterUnbuffered()` to finalize the compression dictionary if enabled,
321 // compress/write out any buffered blocks, and proceed to the `kUnbuffered`
322 // state.
323 //
324 // - `kUnbuffered`: This is the state when compression dictionary is finalized
325 // either because it wasn't enabled in the first place or it's been created
326 // from sampling previously buffered data. In this state, blocks are simply
327 // compressed/written out as they fill up. From this state, call `Finish()`
328 // to complete the file (write meta-blocks, etc.), or `Abandon()` to delete
329 // the partially created file.
330 //
331 // - `kClosed`: This indicates either `Finish()` or `Abandon()` has been
332 // called, so the table builder is no longer usable. We must be in this
333 // state by the time the destructor runs.
334 enum class State {
335 kBuffered,
336 kUnbuffered,
337 kClosed,
338 };
339 State state;
340
341 const bool use_delta_encoding_for_index_values;
342 std::unique_ptr<FilterBlockBuilder> filter_builder;
343 char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
344 size_t compressed_cache_key_prefix_size;
345
346 BlockHandle pending_handle; // Handle to add to index block
347
348 std::string compressed_output;
349 std::unique_ptr<FlushBlockPolicy> flush_block_policy;
350 int level_at_creation;
351 uint32_t column_family_id;
352 const std::string& column_family_name;
353 uint64_t creation_time = 0;
354 uint64_t oldest_key_time = 0;
355 const uint64_t target_file_size;
356 uint64_t file_creation_time = 0;
357
358 std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
359
RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep360 Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions,
361 const BlockBasedTableOptions& table_opt,
362 const InternalKeyComparator& icomparator,
363 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
364 int_tbl_prop_collector_factories,
365 uint32_t _column_family_id, WritableFileWriter* f,
366 const CompressionType _compression_type,
367 const uint64_t _sample_for_compression,
368 const CompressionOptions& _compression_opts, const bool skip_filters,
369 const int _level_at_creation, const std::string& _column_family_name,
370 const uint64_t _creation_time, const uint64_t _oldest_key_time,
371 const uint64_t _target_file_size, const uint64_t _file_creation_time)
372 : ioptions(_ioptions),
373 moptions(_moptions),
374 table_options(table_opt),
375 internal_comparator(icomparator),
376 file(f),
377 alignment(table_options.block_align
378 ? std::min(table_options.block_size, kDefaultPageSize)
379 : 0),
380 data_block(table_options.block_restart_interval,
381 table_options.use_delta_encoding,
382 false /* use_value_delta_encoding */,
383 icomparator.user_comparator()
384 ->CanKeysWithDifferentByteContentsBeEqual()
385 ? BlockBasedTableOptions::kDataBlockBinarySearch
386 : table_options.data_block_index_type,
387 table_options.data_block_hash_table_util_ratio),
388 range_del_block(1 /* block_restart_interval */),
389 internal_prefix_transform(_moptions.prefix_extractor.get()),
390 compression_type(_compression_type),
391 sample_for_compression(_sample_for_compression),
392 compression_opts(_compression_opts),
393 compression_dict(),
394 compression_ctx(_compression_type),
395 verify_dict(),
396 state((_compression_opts.max_dict_bytes > 0) ? State::kBuffered
397 : State::kUnbuffered),
398 use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
399 !table_opt.block_align),
400 compressed_cache_key_prefix_size(0),
401 flush_block_policy(
402 table_options.flush_block_policy_factory->NewFlushBlockPolicy(
403 table_options, data_block)),
404 level_at_creation(_level_at_creation),
405 column_family_id(_column_family_id),
406 column_family_name(_column_family_name),
407 creation_time(_creation_time),
408 oldest_key_time(_oldest_key_time),
409 target_file_size(_target_file_size),
410 file_creation_time(_file_creation_time) {
411 if (table_options.index_type ==
412 BlockBasedTableOptions::kTwoLevelIndexSearch) {
413 p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
414 &internal_comparator, use_delta_encoding_for_index_values,
415 table_options);
416 index_builder.reset(p_index_builder_);
417 } else {
418 index_builder.reset(IndexBuilder::CreateIndexBuilder(
419 table_options.index_type, &internal_comparator,
420 &this->internal_prefix_transform, use_delta_encoding_for_index_values,
421 table_options));
422 }
423 if (skip_filters) {
424 filter_builder = nullptr;
425 } else {
426 FilterBuildingContext context(table_options);
427 context.column_family_name = column_family_name;
428 context.compaction_style = ioptions.compaction_style;
429 context.level_at_creation = level_at_creation;
430 context.info_log = ioptions.info_log;
431 filter_builder.reset(CreateFilterBlockBuilder(
432 ioptions, moptions, context, use_delta_encoding_for_index_values,
433 p_index_builder_));
434 }
435
436 for (auto& collector_factories : *int_tbl_prop_collector_factories) {
437 table_properties_collectors.emplace_back(
438 collector_factories->CreateIntTblPropCollector(column_family_id));
439 }
440 table_properties_collectors.emplace_back(
441 new BlockBasedTablePropertiesCollector(
442 table_options.index_type, table_options.whole_key_filtering,
443 _moptions.prefix_extractor != nullptr));
444 if (table_options.verify_compression) {
445 verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
446 compression_type));
447 }
448 }
449
450 Rep(const Rep&) = delete;
451 Rep& operator=(const Rep&) = delete;
452
~RepROCKSDB_NAMESPACE::BlockBasedTableBuilder::Rep453 ~Rep() {}
454 };
455
BlockBasedTableBuilder(const ImmutableCFOptions & ioptions,const MutableCFOptions & moptions,const BlockBasedTableOptions & table_options,const InternalKeyComparator & internal_comparator,const std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories,uint32_t column_family_id,WritableFileWriter * file,const CompressionType compression_type,const uint64_t sample_for_compression,const CompressionOptions & compression_opts,const bool skip_filters,const std::string & column_family_name,const int level_at_creation,const uint64_t creation_time,const uint64_t oldest_key_time,const uint64_t target_file_size,const uint64_t file_creation_time)456 BlockBasedTableBuilder::BlockBasedTableBuilder(
457 const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions,
458 const BlockBasedTableOptions& table_options,
459 const InternalKeyComparator& internal_comparator,
460 const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
461 int_tbl_prop_collector_factories,
462 uint32_t column_family_id, WritableFileWriter* file,
463 const CompressionType compression_type,
464 const uint64_t sample_for_compression,
465 const CompressionOptions& compression_opts, const bool skip_filters,
466 const std::string& column_family_name, const int level_at_creation,
467 const uint64_t creation_time, const uint64_t oldest_key_time,
468 const uint64_t target_file_size, const uint64_t file_creation_time) {
469 BlockBasedTableOptions sanitized_table_options(table_options);
470 if (sanitized_table_options.format_version == 0 &&
471 sanitized_table_options.checksum != kCRC32c) {
472 ROCKS_LOG_WARN(
473 ioptions.info_log,
474 "Silently converting format_version to 1 because checksum is "
475 "non-default");
476 // silently convert format_version to 1 to keep consistent with current
477 // behavior
478 sanitized_table_options.format_version = 1;
479 }
480
481 rep_ = new Rep(ioptions, moptions, sanitized_table_options,
482 internal_comparator, int_tbl_prop_collector_factories,
483 column_family_id, file, compression_type,
484 sample_for_compression, compression_opts, skip_filters,
485 level_at_creation, column_family_name, creation_time,
486 oldest_key_time, target_file_size, file_creation_time);
487
488 if (rep_->filter_builder != nullptr) {
489 rep_->filter_builder->StartBlock(0);
490 }
491 if (table_options.block_cache_compressed.get() != nullptr) {
492 BlockBasedTable::GenerateCachePrefix(
493 table_options.block_cache_compressed.get(), file->writable_file(),
494 &rep_->compressed_cache_key_prefix[0],
495 &rep_->compressed_cache_key_prefix_size);
496 }
497 }
498
~BlockBasedTableBuilder()499 BlockBasedTableBuilder::~BlockBasedTableBuilder() {
500 // Catch errors where caller forgot to call Finish()
501 assert(rep_->state == Rep::State::kClosed);
502 delete rep_;
503 }
504
Add(const Slice & key,const Slice & value)505 void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
506 Rep* r = rep_;
507 assert(rep_->state != Rep::State::kClosed);
508 if (!ok()) return;
509 ValueType value_type = ExtractValueType(key);
510 if (IsValueType(value_type)) {
511 #ifndef NDEBUG
512 if (r->props.num_entries > r->props.num_range_deletions) {
513 assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
514 }
515 #endif // NDEBUG
516
517 auto should_flush = r->flush_block_policy->Update(key, value);
518 if (should_flush) {
519 assert(!r->data_block.empty());
520 Flush();
521
522 if (r->state == Rep::State::kBuffered &&
523 r->data_begin_offset > r->target_file_size) {
524 EnterUnbuffered();
525 }
526
527 // Add item to index block.
528 // We do not emit the index entry for a block until we have seen the
529 // first key for the next data block. This allows us to use shorter
530 // keys in the index block. For example, consider a block boundary
531 // between the keys "the quick brown fox" and "the who". We can use
532 // "the r" as the key for the index block entry since it is >= all
533 // entries in the first block and < all entries in subsequent
534 // blocks.
535 if (ok() && r->state == Rep::State::kUnbuffered) {
536 r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle);
537 }
538 }
539
540 // Note: PartitionedFilterBlockBuilder requires key being added to filter
541 // builder after being added to index builder.
542 if (r->state == Rep::State::kUnbuffered && r->filter_builder != nullptr) {
543 size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size();
544 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
545 }
546
547 r->last_key.assign(key.data(), key.size());
548 r->data_block.Add(key, value);
549 if (r->state == Rep::State::kBuffered) {
550 // Buffer keys to be replayed during `Finish()` once compression
551 // dictionary has been finalized.
552 if (r->data_block_and_keys_buffers.empty() || should_flush) {
553 r->data_block_and_keys_buffers.emplace_back();
554 }
555 r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString());
556 } else {
557 r->index_builder->OnKeyAdded(key);
558 }
559 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
560 r->table_properties_collectors,
561 r->ioptions.info_log);
562
563 } else if (value_type == kTypeRangeDeletion) {
564 r->range_del_block.Add(key, value);
565 NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
566 r->table_properties_collectors,
567 r->ioptions.info_log);
568 } else {
569 assert(false);
570 }
571
572 r->props.num_entries++;
573 r->props.raw_key_size += key.size();
574 r->props.raw_value_size += value.size();
575 if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion) {
576 r->props.num_deletions++;
577 } else if (value_type == kTypeRangeDeletion) {
578 r->props.num_deletions++;
579 r->props.num_range_deletions++;
580 } else if (value_type == kTypeMerge) {
581 r->props.num_merge_operands++;
582 }
583 }
584
Flush()585 void BlockBasedTableBuilder::Flush() {
586 Rep* r = rep_;
587 assert(rep_->state != Rep::State::kClosed);
588 if (!ok()) return;
589 if (r->data_block.empty()) return;
590 WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
591 }
592
WriteBlock(BlockBuilder * block,BlockHandle * handle,bool is_data_block)593 void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
594 BlockHandle* handle,
595 bool is_data_block) {
596 WriteBlock(block->Finish(), handle, is_data_block);
597 block->Reset();
598 }
599
WriteBlock(const Slice & raw_block_contents,BlockHandle * handle,bool is_data_block)600 void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
601 BlockHandle* handle,
602 bool is_data_block) {
603 // File format contains a sequence of blocks where each block has:
604 // block_data: uint8[n]
605 // type: uint8
606 // crc: uint32
607 assert(ok());
608 Rep* r = rep_;
609
610 auto type = r->compression_type;
611 uint64_t sample_for_compression = r->sample_for_compression;
612 Slice block_contents;
613 bool abort_compression = false;
614
615 StopWatchNano timer(
616 r->ioptions.env,
617 ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
618
619 if (r->state == Rep::State::kBuffered) {
620 assert(is_data_block);
621 assert(!r->data_block_and_keys_buffers.empty());
622 r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
623 r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
624 return;
625 }
626
627 if (raw_block_contents.size() < kCompressionSizeLimit) {
628 const CompressionDict* compression_dict;
629 if (!is_data_block || r->compression_dict == nullptr) {
630 compression_dict = &CompressionDict::GetEmptyDict();
631 } else {
632 compression_dict = r->compression_dict.get();
633 }
634 assert(compression_dict != nullptr);
635 CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
636 *compression_dict, type,
637 sample_for_compression);
638
639 std::string sampled_output_fast;
640 std::string sampled_output_slow;
641 block_contents = CompressBlock(
642 raw_block_contents, compression_info, &type,
643 r->table_options.format_version, is_data_block /* do_sample */,
644 &r->compressed_output, &sampled_output_fast, &sampled_output_slow);
645
646 // notify collectors on block add
647 NotifyCollectTableCollectorsOnBlockAdd(
648 r->table_properties_collectors, raw_block_contents.size(),
649 sampled_output_fast.size(), sampled_output_slow.size());
650
651 // Some of the compression algorithms are known to be unreliable. If
652 // the verify_compression flag is set then try to de-compress the
653 // compressed data and compare to the input.
654 if (type != kNoCompression && r->table_options.verify_compression) {
655 // Retrieve the uncompressed contents into a new buffer
656 const UncompressionDict* verify_dict;
657 if (!is_data_block || r->verify_dict == nullptr) {
658 verify_dict = &UncompressionDict::GetEmptyDict();
659 } else {
660 verify_dict = r->verify_dict.get();
661 }
662 assert(verify_dict != nullptr);
663 BlockContents contents;
664 UncompressionInfo uncompression_info(*r->verify_ctx, *verify_dict,
665 r->compression_type);
666 Status stat = UncompressBlockContentsForCompressionType(
667 uncompression_info, block_contents.data(), block_contents.size(),
668 &contents, r->table_options.format_version, r->ioptions);
669
670 if (stat.ok()) {
671 bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
672 if (!compressed_ok) {
673 // The result of the compression was invalid. abort.
674 abort_compression = true;
675 ROCKS_LOG_ERROR(r->ioptions.info_log,
676 "Decompressed block did not match raw block");
677 r->status =
678 Status::Corruption("Decompressed block did not match raw block");
679 }
680 } else {
681 // Decompression reported an error. abort.
682 r->status = Status::Corruption("Could not decompress");
683 abort_compression = true;
684 }
685 }
686 } else {
687 // Block is too big to be compressed.
688 abort_compression = true;
689 }
690
691 // Abort compression if the block is too big, or did not pass
692 // verification.
693 if (abort_compression) {
694 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
695 type = kNoCompression;
696 block_contents = raw_block_contents;
697 } else if (type != kNoCompression) {
698 if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) {
699 RecordTimeToHistogram(r->ioptions.statistics, COMPRESSION_TIMES_NANOS,
700 timer.ElapsedNanos());
701 }
702 RecordInHistogram(r->ioptions.statistics, BYTES_COMPRESSED,
703 raw_block_contents.size());
704 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED);
705 } else if (type != r->compression_type) {
706 RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
707 }
708
709 WriteRawBlock(block_contents, type, handle, is_data_block);
710 r->compressed_output.clear();
711 if (is_data_block) {
712 if (r->filter_builder != nullptr) {
713 r->filter_builder->StartBlock(r->offset);
714 }
715 r->props.data_size = r->offset;
716 ++r->props.num_data_blocks;
717 }
718 }
719
WriteRawBlock(const Slice & block_contents,CompressionType type,BlockHandle * handle,bool is_data_block)720 void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
721 CompressionType type,
722 BlockHandle* handle,
723 bool is_data_block) {
724 Rep* r = rep_;
725 StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS);
726 handle->set_offset(r->offset);
727 handle->set_size(block_contents.size());
728 assert(r->status.ok());
729 assert(r->io_status.ok());
730 r->io_status = r->file->Append(block_contents);
731 if (r->io_status.ok()) {
732 char trailer[kBlockTrailerSize];
733 trailer[0] = type;
734 char* trailer_without_type = trailer + 1;
735 switch (r->table_options.checksum) {
736 case kNoChecksum:
737 EncodeFixed32(trailer_without_type, 0);
738 break;
739 case kCRC32c: {
740 auto crc = crc32c::Value(block_contents.data(), block_contents.size());
741 crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type
742 EncodeFixed32(trailer_without_type, crc32c::Mask(crc));
743 break;
744 }
745 case kxxHash: {
746 XXH32_state_t* const state = XXH32_createState();
747 XXH32_reset(state, 0);
748 XXH32_update(state, block_contents.data(),
749 static_cast<uint32_t>(block_contents.size()));
750 XXH32_update(state, trailer, 1); // Extend to cover block type
751 EncodeFixed32(trailer_without_type, XXH32_digest(state));
752 XXH32_freeState(state);
753 break;
754 }
755 case kxxHash64: {
756 XXH64_state_t* const state = XXH64_createState();
757 XXH64_reset(state, 0);
758 XXH64_update(state, block_contents.data(),
759 static_cast<uint32_t>(block_contents.size()));
760 XXH64_update(state, trailer, 1); // Extend to cover block type
761 EncodeFixed32(
762 trailer_without_type,
763 static_cast<uint32_t>(XXH64_digest(state) & // lower 32 bits
764 uint64_t{0xffffffff}));
765 XXH64_freeState(state);
766 break;
767 }
768 }
769
770 assert(r->io_status.ok());
771 TEST_SYNC_POINT_CALLBACK(
772 "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
773 static_cast<char*>(trailer));
774 r->io_status = r->file->Append(Slice(trailer, kBlockTrailerSize));
775 if (r->io_status.ok()) {
776 r->status = InsertBlockInCache(block_contents, type, handle);
777 }
778 if (r->status.ok() && r->io_status.ok()) {
779 r->offset += block_contents.size() + kBlockTrailerSize;
780 if (r->table_options.block_align && is_data_block) {
781 size_t pad_bytes =
782 (r->alignment - ((block_contents.size() + kBlockTrailerSize) &
783 (r->alignment - 1))) &
784 (r->alignment - 1);
785 r->io_status = r->file->Pad(pad_bytes);
786 if (r->io_status.ok()) {
787 r->offset += pad_bytes;
788 }
789 }
790 }
791 }
792 r->status = r->io_status;
793 }
794
status() const795 Status BlockBasedTableBuilder::status() const { return rep_->status; }
796
io_status() const797 IOStatus BlockBasedTableBuilder::io_status() const { return rep_->io_status; }
798
799 //
800 // Make a copy of the block contents and insert into compressed block cache
801 //
InsertBlockInCache(const Slice & block_contents,const CompressionType type,const BlockHandle * handle)802 Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
803 const CompressionType type,
804 const BlockHandle* handle) {
805 Rep* r = rep_;
806 Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
807
808 if (type != kNoCompression && block_cache_compressed != nullptr) {
809 size_t size = block_contents.size();
810
811 auto ubuf =
812 AllocateBlock(size + 1, block_cache_compressed->memory_allocator());
813 memcpy(ubuf.get(), block_contents.data(), size);
814 ubuf[size] = type;
815
816 BlockContents* block_contents_to_cache =
817 new BlockContents(std::move(ubuf), size);
818 #ifndef NDEBUG
819 block_contents_to_cache->is_raw_block = true;
820 #endif // NDEBUG
821
822 // make cache key by appending the file offset to the cache prefix id
823 char* end = EncodeVarint64(
824 r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size,
825 handle->offset());
826 Slice key(r->compressed_cache_key_prefix,
827 static_cast<size_t>(end - r->compressed_cache_key_prefix));
828
829 // Insert into compressed block cache.
830 block_cache_compressed->Insert(
831 key, block_contents_to_cache,
832 block_contents_to_cache->ApproximateMemoryUsage(),
833 SimpleDeleter<BlockContents>::GetInstance());
834
835 // Invalidate OS cache.
836 r->file->InvalidateCache(static_cast<size_t>(r->offset), size);
837 }
838 return Status::OK();
839 }
840
WriteFilterBlock(MetaIndexBuilder * meta_index_builder)841 void BlockBasedTableBuilder::WriteFilterBlock(
842 MetaIndexBuilder* meta_index_builder) {
843 BlockHandle filter_block_handle;
844 bool empty_filter_block = (rep_->filter_builder == nullptr ||
845 rep_->filter_builder->NumAdded() == 0);
846 if (ok() && !empty_filter_block) {
847 Status s = Status::Incomplete();
848 while (ok() && s.IsIncomplete()) {
849 Slice filter_content =
850 rep_->filter_builder->Finish(filter_block_handle, &s);
851 assert(s.ok() || s.IsIncomplete());
852 rep_->props.filter_size += filter_content.size();
853 WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
854 }
855 }
856 if (ok() && !empty_filter_block) {
857 // Add mapping from "<filter_block_prefix>.Name" to location
858 // of filter data.
859 std::string key;
860 if (rep_->filter_builder->IsBlockBased()) {
861 key = BlockBasedTable::kFilterBlockPrefix;
862 } else {
863 key = rep_->table_options.partition_filters
864 ? BlockBasedTable::kPartitionedFilterBlockPrefix
865 : BlockBasedTable::kFullFilterBlockPrefix;
866 }
867 key.append(rep_->table_options.filter_policy->Name());
868 meta_index_builder->Add(key, filter_block_handle);
869 }
870 }
871
WriteIndexBlock(MetaIndexBuilder * meta_index_builder,BlockHandle * index_block_handle)872 void BlockBasedTableBuilder::WriteIndexBlock(
873 MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) {
874 IndexBuilder::IndexBlocks index_blocks;
875 auto index_builder_status = rep_->index_builder->Finish(&index_blocks);
876 if (index_builder_status.IsIncomplete()) {
877 // We we have more than one index partition then meta_blocks are not
878 // supported for the index. Currently meta_blocks are used only by
879 // HashIndexBuilder which is not multi-partition.
880 assert(index_blocks.meta_blocks.empty());
881 } else if (ok() && !index_builder_status.ok()) {
882 rep_->status = index_builder_status;
883 }
884 if (ok()) {
885 for (const auto& item : index_blocks.meta_blocks) {
886 BlockHandle block_handle;
887 WriteBlock(item.second, &block_handle, false /* is_data_block */);
888 if (!ok()) {
889 break;
890 }
891 meta_index_builder->Add(item.first, block_handle);
892 }
893 }
894 if (ok()) {
895 if (rep_->table_options.enable_index_compression) {
896 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
897 } else {
898 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
899 index_block_handle);
900 }
901 }
902 // If there are more index partitions, finish them and write them out
903 Status s = index_builder_status;
904 while (ok() && s.IsIncomplete()) {
905 s = rep_->index_builder->Finish(&index_blocks, *index_block_handle);
906 if (!s.ok() && !s.IsIncomplete()) {
907 rep_->status = s;
908 return;
909 }
910 if (rep_->table_options.enable_index_compression) {
911 WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
912 } else {
913 WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
914 index_block_handle);
915 }
916 // The last index_block_handle will be for the partition index block
917 }
918 }
919
WritePropertiesBlock(MetaIndexBuilder * meta_index_builder)920 void BlockBasedTableBuilder::WritePropertiesBlock(
921 MetaIndexBuilder* meta_index_builder) {
922 BlockHandle properties_block_handle;
923 if (ok()) {
924 PropertyBlockBuilder property_block_builder;
925 rep_->props.column_family_id = rep_->column_family_id;
926 rep_->props.column_family_name = rep_->column_family_name;
927 rep_->props.filter_policy_name =
928 rep_->table_options.filter_policy != nullptr
929 ? rep_->table_options.filter_policy->Name()
930 : "";
931 rep_->props.index_size =
932 rep_->index_builder->IndexSize() + kBlockTrailerSize;
933 rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr
934 ? rep_->ioptions.user_comparator->Name()
935 : "nullptr";
936 rep_->props.merge_operator_name =
937 rep_->ioptions.merge_operator != nullptr
938 ? rep_->ioptions.merge_operator->Name()
939 : "nullptr";
940 rep_->props.compression_name =
941 CompressionTypeToString(rep_->compression_type);
942 rep_->props.compression_options =
943 CompressionOptionsToString(rep_->compression_opts);
944 rep_->props.prefix_extractor_name =
945 rep_->moptions.prefix_extractor != nullptr
946 ? rep_->moptions.prefix_extractor->Name()
947 : "nullptr";
948
949 std::string property_collectors_names = "[";
950 for (size_t i = 0;
951 i < rep_->ioptions.table_properties_collector_factories.size(); ++i) {
952 if (i != 0) {
953 property_collectors_names += ",";
954 }
955 property_collectors_names +=
956 rep_->ioptions.table_properties_collector_factories[i]->Name();
957 }
958 property_collectors_names += "]";
959 rep_->props.property_collectors_names = property_collectors_names;
960 if (rep_->table_options.index_type ==
961 BlockBasedTableOptions::kTwoLevelIndexSearch) {
962 assert(rep_->p_index_builder_ != nullptr);
963 rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions();
964 rep_->props.top_level_index_size =
965 rep_->p_index_builder_->TopLevelIndexSize(rep_->offset);
966 }
967 rep_->props.index_key_is_user_key =
968 !rep_->index_builder->seperator_is_key_plus_seq();
969 rep_->props.index_value_is_delta_encoded =
970 rep_->use_delta_encoding_for_index_values;
971 rep_->props.creation_time = rep_->creation_time;
972 rep_->props.oldest_key_time = rep_->oldest_key_time;
973 rep_->props.file_creation_time = rep_->file_creation_time;
974
975 // Add basic properties
976 property_block_builder.AddTableProperty(rep_->props);
977
978 // Add use collected properties
979 NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors,
980 rep_->ioptions.info_log,
981 &property_block_builder);
982
983 WriteRawBlock(property_block_builder.Finish(), kNoCompression,
984 &properties_block_handle);
985 }
986 if (ok()) {
987 #ifndef NDEBUG
988 {
989 uint64_t props_block_offset = properties_block_handle.offset();
990 uint64_t props_block_size = properties_block_handle.size();
991 TEST_SYNC_POINT_CALLBACK(
992 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
993 &props_block_offset);
994 TEST_SYNC_POINT_CALLBACK(
995 "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
996 &props_block_size);
997 }
998 #endif // !NDEBUG
999 meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
1000 }
1001 }
1002
WriteCompressionDictBlock(MetaIndexBuilder * meta_index_builder)1003 void BlockBasedTableBuilder::WriteCompressionDictBlock(
1004 MetaIndexBuilder* meta_index_builder) {
1005 if (rep_->compression_dict != nullptr &&
1006 rep_->compression_dict->GetRawDict().size()) {
1007 BlockHandle compression_dict_block_handle;
1008 if (ok()) {
1009 WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
1010 &compression_dict_block_handle);
1011 #ifndef NDEBUG
1012 Slice compression_dict = rep_->compression_dict->GetRawDict();
1013 TEST_SYNC_POINT_CALLBACK(
1014 "BlockBasedTableBuilder::WriteCompressionDictBlock:RawDict",
1015 &compression_dict);
1016 #endif // NDEBUG
1017 }
1018 if (ok()) {
1019 meta_index_builder->Add(kCompressionDictBlock,
1020 compression_dict_block_handle);
1021 }
1022 }
1023 }
1024
WriteRangeDelBlock(MetaIndexBuilder * meta_index_builder)1025 void BlockBasedTableBuilder::WriteRangeDelBlock(
1026 MetaIndexBuilder* meta_index_builder) {
1027 if (ok() && !rep_->range_del_block.empty()) {
1028 BlockHandle range_del_block_handle;
1029 WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
1030 &range_del_block_handle);
1031 meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
1032 }
1033 }
1034
WriteFooter(BlockHandle & metaindex_block_handle,BlockHandle & index_block_handle)1035 void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
1036 BlockHandle& index_block_handle) {
1037 Rep* r = rep_;
1038 // No need to write out new footer if we're using default checksum.
1039 // We're writing legacy magic number because we want old versions of RocksDB
1040 // be able to read files generated with new release (just in case if
1041 // somebody wants to roll back after an upgrade)
1042 // TODO(icanadi) at some point in the future, when we're absolutely sure
1043 // nobody will roll back to RocksDB 2.x versions, retire the legacy magic
1044 // number and always write new table files with new magic number
1045 bool legacy = (r->table_options.format_version == 0);
1046 // this is guaranteed by BlockBasedTableBuilder's constructor
1047 assert(r->table_options.checksum == kCRC32c ||
1048 r->table_options.format_version != 0);
1049 Footer footer(
1050 legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber,
1051 r->table_options.format_version);
1052 footer.set_metaindex_handle(metaindex_block_handle);
1053 footer.set_index_handle(index_block_handle);
1054 footer.set_checksum(r->table_options.checksum);
1055 std::string footer_encoding;
1056 footer.EncodeTo(&footer_encoding);
1057 assert(r->status.ok());
1058 assert(r->io_status.ok());
1059 r->io_status = r->file->Append(footer_encoding);
1060 if (r->io_status.ok()) {
1061 r->offset += footer_encoding.size();
1062 }
1063 r->status = r->io_status;
1064 }
1065
EnterUnbuffered()1066 void BlockBasedTableBuilder::EnterUnbuffered() {
1067 Rep* r = rep_;
1068 assert(r->state == Rep::State::kBuffered);
1069 r->state = Rep::State::kUnbuffered;
1070 const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
1071 ? r->compression_opts.zstd_max_train_bytes
1072 : r->compression_opts.max_dict_bytes;
1073 Random64 generator{r->creation_time};
1074 std::string compression_dict_samples;
1075 std::vector<size_t> compression_dict_sample_lens;
1076 if (!r->data_block_and_keys_buffers.empty()) {
1077 while (compression_dict_samples.size() < kSampleBytes) {
1078 size_t rand_idx =
1079 static_cast<size_t>(
1080 generator.Uniform(r->data_block_and_keys_buffers.size()));
1081 size_t copy_len =
1082 std::min(kSampleBytes - compression_dict_samples.size(),
1083 r->data_block_and_keys_buffers[rand_idx].first.size());
1084 compression_dict_samples.append(
1085 r->data_block_and_keys_buffers[rand_idx].first, 0, copy_len);
1086 compression_dict_sample_lens.emplace_back(copy_len);
1087 }
1088 }
1089
1090 // final data block flushed, now we can generate dictionary from the samples.
1091 // OK if compression_dict_samples is empty, we'll just get empty dictionary.
1092 std::string dict;
1093 if (r->compression_opts.zstd_max_train_bytes > 0) {
1094 dict = ZSTD_TrainDictionary(compression_dict_samples,
1095 compression_dict_sample_lens,
1096 r->compression_opts.max_dict_bytes);
1097 } else {
1098 dict = std::move(compression_dict_samples);
1099 }
1100 r->compression_dict.reset(new CompressionDict(dict, r->compression_type,
1101 r->compression_opts.level));
1102 r->verify_dict.reset(new UncompressionDict(
1103 dict, r->compression_type == kZSTD ||
1104 r->compression_type == kZSTDNotFinalCompression));
1105
1106 for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) {
1107 const auto& data_block = r->data_block_and_keys_buffers[i].first;
1108 auto& keys = r->data_block_and_keys_buffers[i].second;
1109 assert(!data_block.empty());
1110 assert(!keys.empty());
1111
1112 for (const auto& key : keys) {
1113 if (r->filter_builder != nullptr) {
1114 size_t ts_sz =
1115 r->internal_comparator.user_comparator()->timestamp_size();
1116 r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
1117 }
1118 r->index_builder->OnKeyAdded(key);
1119 }
1120 WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */);
1121 if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) {
1122 Slice first_key_in_next_block =
1123 r->data_block_and_keys_buffers[i + 1].second.front();
1124 Slice* first_key_in_next_block_ptr = &first_key_in_next_block;
1125 r->index_builder->AddIndexEntry(&keys.back(), first_key_in_next_block_ptr,
1126 r->pending_handle);
1127 }
1128 }
1129 r->data_block_and_keys_buffers.clear();
1130 }
1131
Finish()1132 Status BlockBasedTableBuilder::Finish() {
1133 Rep* r = rep_;
1134 assert(r->state != Rep::State::kClosed);
1135 bool empty_data_block = r->data_block.empty();
1136 Flush();
1137 if (r->state == Rep::State::kBuffered) {
1138 EnterUnbuffered();
1139 }
1140 // To make sure properties block is able to keep the accurate size of index
1141 // block, we will finish writing all index entries first.
1142 if (ok() && !empty_data_block) {
1143 r->index_builder->AddIndexEntry(
1144 &r->last_key, nullptr /* no next data block */, r->pending_handle);
1145 }
1146
1147 // Write meta blocks, metaindex block and footer in the following order.
1148 // 1. [meta block: filter]
1149 // 2. [meta block: index]
1150 // 3. [meta block: compression dictionary]
1151 // 4. [meta block: range deletion tombstone]
1152 // 5. [meta block: properties]
1153 // 6. [metaindex block]
1154 // 7. Footer
1155 BlockHandle metaindex_block_handle, index_block_handle;
1156 MetaIndexBuilder meta_index_builder;
1157 WriteFilterBlock(&meta_index_builder);
1158 WriteIndexBlock(&meta_index_builder, &index_block_handle);
1159 WriteCompressionDictBlock(&meta_index_builder);
1160 WriteRangeDelBlock(&meta_index_builder);
1161 WritePropertiesBlock(&meta_index_builder);
1162 if (ok()) {
1163 // flush the meta index block
1164 WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
1165 &metaindex_block_handle);
1166 }
1167 if (ok()) {
1168 WriteFooter(metaindex_block_handle, index_block_handle);
1169 }
1170 r->state = Rep::State::kClosed;
1171 return r->status;
1172 }
1173
Abandon()1174 void BlockBasedTableBuilder::Abandon() {
1175 assert(rep_->state != Rep::State::kClosed);
1176 rep_->state = Rep::State::kClosed;
1177 }
1178
NumEntries() const1179 uint64_t BlockBasedTableBuilder::NumEntries() const {
1180 return rep_->props.num_entries;
1181 }
1182
FileSize() const1183 uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; }
1184
NeedCompact() const1185 bool BlockBasedTableBuilder::NeedCompact() const {
1186 for (const auto& collector : rep_->table_properties_collectors) {
1187 if (collector->NeedCompact()) {
1188 return true;
1189 }
1190 }
1191 return false;
1192 }
1193
GetTableProperties() const1194 TableProperties BlockBasedTableBuilder::GetTableProperties() const {
1195 TableProperties ret = rep_->props;
1196 for (const auto& collector : rep_->table_properties_collectors) {
1197 for (const auto& prop : collector->GetReadableProperties()) {
1198 ret.readable_properties.insert(prop);
1199 }
1200 collector->Finish(&ret.user_collected_properties);
1201 }
1202 return ret;
1203 }
1204
GetFileChecksum() const1205 std::string BlockBasedTableBuilder::GetFileChecksum() const {
1206 if (rep_->file != nullptr) {
1207 return rep_->file->GetFileChecksum();
1208 } else {
1209 return kUnknownFileChecksum;
1210 }
1211 }
1212
GetFileChecksumFuncName() const1213 const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
1214 if (rep_->file != nullptr) {
1215 return rep_->file->GetFileChecksumFuncName();
1216 } else {
1217 return kUnknownFileChecksumFuncName.c_str();
1218 }
1219 }
1220
1221 const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
1222 const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
1223 const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =
1224 "partitionedfilter.";
1225 } // namespace ROCKSDB_NAMESPACE
1226