// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/block_based_table_builder.h" #include #include #include #include #include #include #include #include #include "db/dbformat.h" #include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/merge_operator.h" #include "rocksdb/table.h" #include "table/block.h" #include "table/block_based_filter_block.h" #include "table/block_based_table_factory.h" #include "table/block_based_table_reader.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" #include "table/full_filter_block.h" #include "table/table_builder.h" #include "util/string_util.h" #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" #include "util/stop_watch.h" #include "util/xxhash.h" #include "table/index_builder.h" #include "table/partitioned_filter_block.h" namespace rocksdb { extern const std::string kHashIndexPrefixesBlock; extern const std::string kHashIndexPrefixesMetadataBlock; typedef BlockBasedTableOptions::IndexType IndexType; // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { // Create a filter block builder based on its type. FilterBlockBuilder* CreateFilterBlockBuilder( const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt, const BlockBasedTableOptions& table_opt, PartitionedIndexBuilder* const p_index_builder) { if (table_opt.filter_policy == nullptr) return nullptr; FilterBitsBuilder* filter_bits_builder = table_opt.filter_policy->GetFilterBitsBuilder(); if (filter_bits_builder == nullptr) { return new BlockBasedFilterBlockBuilder(mopt.prefix_extractor.get(), table_opt); } else { if (table_opt.partition_filters) { assert(p_index_builder != nullptr); // Since after partition cut request from filter builder it takes time // until index builder actully cuts the partition, we take the lower bound // as partition size. assert(table_opt.block_size_deviation <= 100); auto partition_size = static_cast( ((table_opt.metadata_block_size * (100 - table_opt.block_size_deviation)) + 99) / 100); partition_size = std::max(partition_size, static_cast(1)); return new PartitionedFilterBlockBuilder( mopt.prefix_extractor.get(), table_opt.whole_key_filtering, filter_bits_builder, table_opt.index_block_restart_interval, p_index_builder, partition_size); } else { return new FullFilterBlockBuilder(mopt.prefix_extractor.get(), table_opt.whole_key_filtering, filter_bits_builder); } } } bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { // Check to see if compressed less than 12.5% return compressed_size < raw_size - (raw_size / 8u); } } // namespace // format_version is the block format as defined in include/rocksdb/table.h Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx, CompressionType* type, uint32_t format_version, std::string* compressed_output) { *type = compression_ctx.type(); if (compression_ctx.type() == kNoCompression) { return raw; } // Will return compressed block contents if (1) the compression method is // supported in this platform and (2) the compression rate is "good enough". switch (compression_ctx.type()) { case kSnappyCompression: if (Snappy_Compress(compression_ctx, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kZlibCompression: if (Zlib_Compress( compression_ctx, GetCompressFormatForVersion(kZlibCompression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kBZip2Compression: if (BZip2_Compress( compression_ctx, GetCompressFormatForVersion(kBZip2Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kLZ4Compression: if (LZ4_Compress( compression_ctx, GetCompressFormatForVersion(kLZ4Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kLZ4HCCompression: if (LZ4HC_Compress( compression_ctx, GetCompressFormatForVersion(kLZ4HCCompression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kXpressCompression: if (XPRESS_Compress(raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; case kZSTD: case kZSTDNotFinalCompression: if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. default: {} // Do not recognize this compression type } // Compression method is not supported, or not good compression ratio, so just // fall back to uncompressed form. *type = kNoCompression; return raw; } // kBlockBasedTableMagicNumber was picked by running // echo rocksdb.table.block_based | sha1sum // and taking the leading 64 bits. // Please note that kBlockBasedTableMagicNumber may also be accessed by other // .cc files // for that reason we declare it extern in the header but to get the space // allocated // it must be not extern in one place. const uint64_t kBlockBasedTableMagicNumber = 0x88e241b785f4cff7ull; // We also support reading and writing legacy block based table format (for // backwards compatibility) const uint64_t kLegacyBlockBasedTableMagicNumber = 0xdb4775248b80fb57ull; // A collector that collects properties of interest to block-based table. // For now this class looks heavy-weight since we only write one additional // property. // But in the foreseeable future, we will add more and more properties that are // specific to block-based table. class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector : public IntTblPropCollector { public: explicit BlockBasedTablePropertiesCollector( BlockBasedTableOptions::IndexType index_type, bool whole_key_filtering, bool prefix_filtering) : index_type_(index_type), whole_key_filtering_(whole_key_filtering), prefix_filtering_(prefix_filtering) {} virtual Status InternalAdd(const Slice& /*key*/, const Slice& /*value*/, uint64_t /*file_size*/) override { // Intentionally left blank. Have no interest in collecting stats for // individual key/value pairs. return Status::OK(); } virtual Status Finish(UserCollectedProperties* properties) override { std::string val; PutFixed32(&val, static_cast(index_type_)); properties->insert({BlockBasedTablePropertyNames::kIndexType, val}); properties->insert({BlockBasedTablePropertyNames::kWholeKeyFiltering, whole_key_filtering_ ? kPropTrue : kPropFalse}); properties->insert({BlockBasedTablePropertyNames::kPrefixFiltering, prefix_filtering_ ? kPropTrue : kPropFalse}); return Status::OK(); } // The name of the properties collector can be used for debugging purpose. virtual const char* Name() const override { return "BlockBasedTablePropertiesCollector"; } virtual UserCollectedProperties GetReadableProperties() const override { // Intentionally left blank. return UserCollectedProperties(); } private: BlockBasedTableOptions::IndexType index_type_; bool whole_key_filtering_; bool prefix_filtering_; }; struct BlockBasedTableBuilder::Rep { const ImmutableCFOptions ioptions; const MutableCFOptions moptions; const BlockBasedTableOptions table_options; const InternalKeyComparator& internal_comparator; WritableFileWriter* file; uint64_t offset = 0; Status status; size_t alignment; BlockBuilder data_block; BlockBuilder range_del_block; InternalKeySliceTransform internal_prefix_transform; std::unique_ptr index_builder; PartitionedIndexBuilder* p_index_builder_ = nullptr; std::string last_key; // Compression dictionary or nullptr const std::string* compression_dict; CompressionContext compression_ctx; std::unique_ptr verify_ctx; TableProperties props; bool closed = false; // Either Finish() or Abandon() has been called. std::unique_ptr filter_builder; char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size; BlockHandle pending_handle; // Handle to add to index block std::string compressed_output; std::unique_ptr flush_block_policy; uint32_t column_family_id; const std::string& column_family_name; uint64_t creation_time = 0; uint64_t oldest_key_time = 0; std::vector> table_properties_collectors; Rep(const ImmutableCFOptions& _ioptions, const MutableCFOptions& _moptions, const BlockBasedTableOptions& table_opt, const InternalKeyComparator& icomparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t _column_family_id, WritableFileWriter* f, const CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, const bool skip_filters, const std::string& _column_family_name, const uint64_t _creation_time, const uint64_t _oldest_key_time) : ioptions(_ioptions), moptions(_moptions), table_options(table_opt), internal_comparator(icomparator), file(f), alignment(table_options.block_align ? std::min(table_options.block_size, kDefaultPageSize) : 0), data_block(table_options.block_restart_interval, table_options.use_delta_encoding), range_del_block(1 /* block_restart_interval */), internal_prefix_transform(_moptions.prefix_extractor.get()), compression_dict(_compression_dict), compression_ctx(_compression_type, _compression_opts), compressed_cache_key_prefix_size(0), flush_block_policy( table_options.flush_block_policy_factory->NewFlushBlockPolicy( table_options, data_block)), column_family_id(_column_family_id), column_family_name(_column_family_name), creation_time(_creation_time), oldest_key_time(_oldest_key_time) { if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( &internal_comparator, table_options); index_builder.reset(p_index_builder_); } else { index_builder.reset(IndexBuilder::CreateIndexBuilder( table_options.index_type, &internal_comparator, &this->internal_prefix_transform, table_options)); } if (skip_filters) { filter_builder = nullptr; } else { filter_builder.reset(CreateFilterBlockBuilder( _ioptions, _moptions, table_options, p_index_builder_)); } for (auto& collector_factories : *int_tbl_prop_collector_factories) { table_properties_collectors.emplace_back( collector_factories->CreateIntTblPropCollector(column_family_id)); } table_properties_collectors.emplace_back( new BlockBasedTablePropertiesCollector( table_options.index_type, table_options.whole_key_filtering, _moptions.prefix_extractor != nullptr)); if (table_options.verify_compression) { verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), compression_ctx.type())); } } Rep(const Rep&) = delete; Rep& operator=(const Rep&) = delete; ~Rep() {} }; BlockBasedTableBuilder::BlockBasedTableBuilder( const ImmutableCFOptions& ioptions, const MutableCFOptions& moptions, const BlockBasedTableOptions& table_options, const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, const std::string& column_family_name, const uint64_t creation_time, const uint64_t oldest_key_time) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { ROCKS_LOG_WARN( ioptions.info_log, "Silently converting format_version to 1 because checksum is " "non-default"); // silently convert format_version to 1 to keep consistent with current // behavior sanitized_table_options.format_version = 1; } rep_ = new Rep(ioptions, moptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, compression_type, compression_opts, compression_dict, skip_filters, column_family_name, creation_time, oldest_key_time); if (rep_->filter_builder != nullptr) { rep_->filter_builder->StartBlock(0); } if (table_options.block_cache_compressed.get() != nullptr) { BlockBasedTable::GenerateCachePrefix( table_options.block_cache_compressed.get(), file->writable_file(), &rep_->compressed_cache_key_prefix[0], &rep_->compressed_cache_key_prefix_size); } } BlockBasedTableBuilder::~BlockBasedTableBuilder() { assert(rep_->closed); // Catch errors where caller forgot to call Finish() delete rep_; } void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { Rep* r = rep_; assert(!r->closed); if (!ok()) return; ValueType value_type = ExtractValueType(key); if (IsValueType(value_type)) { if (r->props.num_entries > 0) { assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0); } auto should_flush = r->flush_block_policy->Update(key, value); if (should_flush) { assert(!r->data_block.empty()); Flush(); // Add item to index block. // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter // keys in the index block. For example, consider a block boundary // between the keys "the quick brown fox" and "the who". We can use // "the r" as the key for the index block entry since it is >= all // entries in the first block and < all entries in subsequent // blocks. if (ok()) { r->index_builder->AddIndexEntry(&r->last_key, &key, r->pending_handle); } } // Note: PartitionedFilterBlockBuilder requires key being added to filter // builder after being added to index builder. if (r->filter_builder != nullptr) { r->filter_builder->Add(ExtractUserKey(key)); } r->last_key.assign(key.data(), key.size()); r->data_block.Add(key, value); r->props.num_entries++; r->props.raw_key_size += key.size(); r->props.raw_value_size += value.size(); r->index_builder->OnKeyAdded(key); NotifyCollectTableCollectorsOnAdd(key, value, r->offset, r->table_properties_collectors, r->ioptions.info_log); } else if (value_type == kTypeRangeDeletion) { r->range_del_block.Add(key, value); ++r->props.num_range_deletions; r->props.raw_key_size += key.size(); r->props.raw_value_size += value.size(); NotifyCollectTableCollectorsOnAdd(key, value, r->offset, r->table_properties_collectors, r->ioptions.info_log); } else { assert(false); } } void BlockBasedTableBuilder::Flush() { Rep* r = rep_; assert(!r->closed); if (!ok()) return; if (r->data_block.empty()) return; WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); if (r->filter_builder != nullptr) { r->filter_builder->StartBlock(r->offset); } r->props.data_size = r->offset; ++r->props.num_data_blocks; } void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block) { WriteBlock(block->Finish(), handle, is_data_block); block->Reset(); } void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, BlockHandle* handle, bool is_data_block) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 // crc: uint32 assert(ok()); Rep* r = rep_; auto type = r->compression_ctx.type(); Slice block_contents; bool abort_compression = false; StopWatchNano timer(r->ioptions.env, ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); if (raw_block_contents.size() < kCompressionSizeLimit) { Slice compression_dict; if (is_data_block && r->compression_dict && r->compression_dict->size()) { r->compression_ctx.dict() = *r->compression_dict; if (r->table_options.verify_compression) { assert(r->verify_ctx != nullptr); r->verify_ctx->dict() = *r->compression_dict; } } else { // Clear dictionary r->compression_ctx.dict() = Slice(); if (r->table_options.verify_compression) { assert(r->verify_ctx != nullptr); r->verify_ctx->dict() = Slice(); } } block_contents = CompressBlock(raw_block_contents, r->compression_ctx, &type, r->table_options.format_version, &r->compressed_output); // Some of the compression algorithms are known to be unreliable. If // the verify_compression flag is set then try to de-compress the // compressed data and compare to the input. if (type != kNoCompression && r->table_options.verify_compression) { // Retrieve the uncompressed contents into a new buffer BlockContents contents; Status stat = UncompressBlockContentsForCompressionType( *r->verify_ctx, block_contents.data(), block_contents.size(), &contents, r->table_options.format_version, r->ioptions); if (stat.ok()) { bool compressed_ok = contents.data.compare(raw_block_contents) == 0; if (!compressed_ok) { // The result of the compression was invalid. abort. abort_compression = true; ROCKS_LOG_ERROR(r->ioptions.info_log, "Decompressed block did not match raw block"); r->status = Status::Corruption("Decompressed block did not match raw block"); } } else { // Decompression reported an error. abort. r->status = Status::Corruption("Could not decompress"); abort_compression = true; } } } else { // Block is too big to be compressed. abort_compression = true; } // Abort compression if the block is too big, or did not pass // verification. if (abort_compression) { RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); type = kNoCompression; block_contents = raw_block_contents; } else if (type != kNoCompression) { if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)) { MeasureTime(r->ioptions.statistics, COMPRESSION_TIMES_NANOS, timer.ElapsedNanos()); } MeasureTime(r->ioptions.statistics, BYTES_COMPRESSED, raw_block_contents.size()); RecordTick(r->ioptions.statistics, NUMBER_BLOCK_COMPRESSED); } WriteRawBlock(block_contents, type, handle, is_data_block); r->compressed_output.clear(); } void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, CompressionType type, BlockHandle* handle, bool is_data_block) { Rep* r = rep_; StopWatch sw(r->ioptions.env, r->ioptions.statistics, WRITE_RAW_BLOCK_MICROS); handle->set_offset(r->offset); handle->set_size(block_contents.size()); assert(r->status.ok()); r->status = r->file->Append(block_contents); if (r->status.ok()) { char trailer[kBlockTrailerSize]; trailer[0] = type; char* trailer_without_type = trailer + 1; switch (r->table_options.checksum) { case kNoChecksum: EncodeFixed32(trailer_without_type, 0); break; case kCRC32c: { auto crc = crc32c::Value(block_contents.data(), block_contents.size()); crc = crc32c::Extend(crc, trailer, 1); // Extend to cover block type EncodeFixed32(trailer_without_type, crc32c::Mask(crc)); break; } case kxxHash: { void* xxh = XXH32_init(0); XXH32_update(xxh, block_contents.data(), static_cast(block_contents.size())); XXH32_update(xxh, trailer, 1); // Extend to cover block type EncodeFixed32(trailer_without_type, XXH32_digest(xxh)); break; } } assert(r->status.ok()); r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); if (r->status.ok()) { r->status = InsertBlockInCache(block_contents, type, handle); } if (r->status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; if (r->table_options.block_align && is_data_block) { size_t pad_bytes = (r->alignment - ((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) & (r->alignment - 1); r->status = r->file->Pad(pad_bytes); if (r->status.ok()) { r->offset += pad_bytes; } } } } } Status BlockBasedTableBuilder::status() const { return rep_->status; } static void DeleteCachedBlock(const Slice& /*key*/, void* value) { Block* block = reinterpret_cast(value); delete block; } // // Make a copy of the block contents and insert into compressed block cache // Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, const CompressionType type, const BlockHandle* handle) { Rep* r = rep_; Cache* block_cache_compressed = r->table_options.block_cache_compressed.get(); if (type != kNoCompression && block_cache_compressed != nullptr) { size_t size = block_contents.size(); std::unique_ptr ubuf(new char[size + 1]); memcpy(ubuf.get(), block_contents.data(), size); ubuf[size] = type; BlockContents results(std::move(ubuf), size, true, type); Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber); // make cache key by appending the file offset to the cache prefix id char* end = EncodeVarint64( r->compressed_cache_key_prefix + r->compressed_cache_key_prefix_size, handle->offset()); Slice key(r->compressed_cache_key_prefix, static_cast (end - r->compressed_cache_key_prefix)); // Insert into compressed block cache. block_cache_compressed->Insert(key, block, block->ApproximateMemoryUsage(), &DeleteCachedBlock); // Invalidate OS cache. r->file->InvalidateCache(static_cast(r->offset), size); } return Status::OK(); } void BlockBasedTableBuilder::WriteFilterBlock( MetaIndexBuilder* meta_index_builder) { BlockHandle filter_block_handle; bool empty_filter_block = (rep_->filter_builder == nullptr || rep_->filter_builder->NumAdded() == 0); if (ok() && !empty_filter_block) { Status s = Status::Incomplete(); while (ok() && s.IsIncomplete()) { Slice filter_content = rep_->filter_builder->Finish(filter_block_handle, &s); assert(s.ok() || s.IsIncomplete()); rep_->props.filter_size += filter_content.size(); WriteRawBlock(filter_content, kNoCompression, &filter_block_handle); } } if (ok() && !empty_filter_block) { // Add mapping from ".Name" to location // of filter data. std::string key; if (rep_->filter_builder->IsBlockBased()) { key = BlockBasedTable::kFilterBlockPrefix; } else { key = rep_->table_options.partition_filters ? BlockBasedTable::kPartitionedFilterBlockPrefix : BlockBasedTable::kFullFilterBlockPrefix; } key.append(rep_->table_options.filter_policy->Name()); meta_index_builder->Add(key, filter_block_handle); } } void BlockBasedTableBuilder::WriteIndexBlock( MetaIndexBuilder* meta_index_builder, BlockHandle* index_block_handle) { IndexBuilder::IndexBlocks index_blocks; auto index_builder_status = rep_->index_builder->Finish(&index_blocks); if (index_builder_status.IsIncomplete()) { // We we have more than one index partition then meta_blocks are not // supported for the index. Currently meta_blocks are used only by // HashIndexBuilder which is not multi-partition. assert(index_blocks.meta_blocks.empty()); } else if (ok() && !index_builder_status.ok()) { rep_->status = index_builder_status; } if (ok()) { for (const auto& item : index_blocks.meta_blocks) { BlockHandle block_handle; WriteBlock(item.second, &block_handle, false /* is_data_block */); if (!ok()) { break; } meta_index_builder->Add(item.first, block_handle); } } if (ok()) { if (rep_->table_options.enable_index_compression) { WriteBlock(index_blocks.index_block_contents, index_block_handle, false); } else { WriteRawBlock(index_blocks.index_block_contents, kNoCompression, index_block_handle); } } // If there are more index partitions, finish them and write them out Status s = index_builder_status; while (ok() && s.IsIncomplete()) { s = rep_->index_builder->Finish(&index_blocks, *index_block_handle); if (!s.ok() && !s.IsIncomplete()) { rep_->status = s; return; } if (rep_->table_options.enable_index_compression) { WriteBlock(index_blocks.index_block_contents, index_block_handle, false); } else { WriteRawBlock(index_blocks.index_block_contents, kNoCompression, index_block_handle); } // The last index_block_handle will be for the partition index block } } void BlockBasedTableBuilder::WritePropertiesBlock( MetaIndexBuilder* meta_index_builder) { BlockHandle properties_block_handle; if (ok()) { PropertyBlockBuilder property_block_builder; rep_->props.column_family_id = rep_->column_family_id; rep_->props.column_family_name = rep_->column_family_name; rep_->props.filter_policy_name = rep_->table_options.filter_policy != nullptr ? rep_->table_options.filter_policy->Name() : ""; rep_->props.index_size = rep_->index_builder->IndexSize() + kBlockTrailerSize; rep_->props.comparator_name = rep_->ioptions.user_comparator != nullptr ? rep_->ioptions.user_comparator->Name() : "nullptr"; rep_->props.merge_operator_name = rep_->ioptions.merge_operator != nullptr ? rep_->ioptions.merge_operator->Name() : "nullptr"; rep_->props.compression_name = CompressionTypeToString(rep_->compression_ctx.type()); rep_->props.prefix_extractor_name = rep_->moptions.prefix_extractor != nullptr ? rep_->moptions.prefix_extractor->Name() : "nullptr"; std::string property_collectors_names = "["; for (size_t i = 0; i < rep_->ioptions.table_properties_collector_factories.size(); ++i) { if (i != 0) { property_collectors_names += ","; } property_collectors_names += rep_->ioptions.table_properties_collector_factories[i]->Name(); } property_collectors_names += "]"; rep_->props.property_collectors_names = property_collectors_names; if (rep_->table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { assert(rep_->p_index_builder_ != nullptr); rep_->props.index_partitions = rep_->p_index_builder_->NumPartitions(); rep_->props.top_level_index_size = rep_->p_index_builder_->TopLevelIndexSize(rep_->offset); } rep_->props.index_key_is_user_key = !rep_->index_builder->seperator_is_key_plus_seq(); rep_->props.creation_time = rep_->creation_time; rep_->props.oldest_key_time = rep_->oldest_key_time; // Add basic properties property_block_builder.AddTableProperty(rep_->props); // Add use collected properties NotifyCollectTableCollectorsOnFinish(rep_->table_properties_collectors, rep_->ioptions.info_log, &property_block_builder); WriteRawBlock(property_block_builder.Finish(), kNoCompression, &properties_block_handle); } if (ok()) { meta_index_builder->Add(kPropertiesBlock, properties_block_handle); } } void BlockBasedTableBuilder::WriteCompressionDictBlock( MetaIndexBuilder* meta_index_builder) { if (rep_->compression_dict && rep_->compression_dict->size()) { BlockHandle compression_dict_block_handle; if (ok()) { WriteRawBlock(*rep_->compression_dict, kNoCompression, &compression_dict_block_handle); } if (ok()) { meta_index_builder->Add(kCompressionDictBlock, compression_dict_block_handle); } } } void BlockBasedTableBuilder::WriteRangeDelBlock( MetaIndexBuilder* meta_index_builder) { if (ok() && !rep_->range_del_block.empty()) { BlockHandle range_del_block_handle; WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression, &range_del_block_handle); meta_index_builder->Add(kRangeDelBlock, range_del_block_handle); } } Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; bool empty_data_block = r->data_block.empty(); Flush(); assert(!r->closed); r->closed = true; // To make sure properties block is able to keep the accurate size of index // block, we will finish writing all index entries first. if (ok() && !empty_data_block) { r->index_builder->AddIndexEntry( &r->last_key, nullptr /* no next data block */, r->pending_handle); } // Write meta blocks and metaindex block with the following order. // 1. [meta block: filter] // 2. [meta block: index] // 3. [meta block: compression dictionary] // 4. [meta block: range deletion tombstone] // 5. [meta block: properties] // 6. [metaindex block] BlockHandle metaindex_block_handle, index_block_handle; MetaIndexBuilder meta_index_builder; WriteFilterBlock(&meta_index_builder); WriteIndexBlock(&meta_index_builder, &index_block_handle); WriteCompressionDictBlock(&meta_index_builder); WriteRangeDelBlock(&meta_index_builder); WritePropertiesBlock(&meta_index_builder); if (ok()) { // flush the meta index block WriteRawBlock(meta_index_builder.Finish(), kNoCompression, &metaindex_block_handle); } // Write footer if (ok()) { // No need to write out new footer if we're using default checksum. // We're writing legacy magic number because we want old versions of RocksDB // be able to read files generated with new release (just in case if // somebody wants to roll back after an upgrade) // TODO(icanadi) at some point in the future, when we're absolutely sure // nobody will roll back to RocksDB 2.x versions, retire the legacy magic // number and always write new table files with new magic number bool legacy = (r->table_options.format_version == 0); // this is guaranteed by BlockBasedTableBuilder's constructor assert(r->table_options.checksum == kCRC32c || r->table_options.format_version != 0); Footer footer(legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber, r->table_options.format_version); footer.set_metaindex_handle(metaindex_block_handle); footer.set_index_handle(index_block_handle); footer.set_checksum(r->table_options.checksum); std::string footer_encoding; footer.EncodeTo(&footer_encoding); assert(r->status.ok()); r->status = r->file->Append(footer_encoding); if (r->status.ok()) { r->offset += footer_encoding.size(); } } return r->status; } void BlockBasedTableBuilder::Abandon() { Rep* r = rep_; assert(!r->closed); r->closed = true; } uint64_t BlockBasedTableBuilder::NumEntries() const { return rep_->props.num_entries; } uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } bool BlockBasedTableBuilder::NeedCompact() const { for (const auto& collector : rep_->table_properties_collectors) { if (collector->NeedCompact()) { return true; } } return false; } TableProperties BlockBasedTableBuilder::GetTableProperties() const { TableProperties ret = rep_->props; for (const auto& collector : rep_->table_properties_collectors) { for (const auto& prop : collector->GetReadableProperties()) { ret.readable_properties.insert(prop); } collector->Finish(&ret.user_collected_properties); } return ret; } const std::string BlockBasedTable::kFilterBlockPrefix = "filter."; const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter."; const std::string BlockBasedTable::kPartitionedFilterBlockPrefix = "partitionedfilter."; } // namespace rocksdb