From 0463f6183772474285be11f08251d4acc2f89793 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Fri, 7 Dec 2018 13:15:09 -0800 Subject: [PATCH] Refactor BlockBasedTable::Open (#4636) Summary: Refactored and simplified `BlockBasedTable::Open` to be similar to `BlockBasedTableBuilder::Finish` as both these functions complement each other. Also added `BlockBasedTableBuilder::WriteFooter` along the way. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4636 Differential Revision: D12933319 Pulled By: sagar0 fbshipit-source-id: 1ff1d02f6d80a63b5ba720a1fc75e71c7344137b --- table/block_based_table_builder.cc | 59 +++--- table/block_based_table_builder.h | 2 + table/block_based_table_reader.cc | 318 +++++++++++++++++------------ table/block_based_table_reader.h | 23 ++- 4 files changed, 245 insertions(+), 157 deletions(-) diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index a4007b07a..2ef40a2cf 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -875,6 +875,35 @@ void BlockBasedTableBuilder::WriteRangeDelBlock( } } +void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle, + BlockHandle& index_block_handle) { + Rep* r = rep_; + // No need to write out new footer if we're using default checksum. + // We're writing legacy magic number because we want old versions of RocksDB + // be able to read files generated with new release (just in case if + // somebody wants to roll back after an upgrade) + // TODO(icanadi) at some point in the future, when we're absolutely sure + // nobody will roll back to RocksDB 2.x versions, retire the legacy magic + // number and always write new table files with new magic number + bool legacy = (r->table_options.format_version == 0); + // this is guaranteed by BlockBasedTableBuilder's constructor + assert(r->table_options.checksum == kCRC32c || + r->table_options.format_version != 0); + Footer footer( + legacy ? kLegacyBlockBasedTableMagicNumber : kBlockBasedTableMagicNumber, + r->table_options.format_version); + footer.set_metaindex_handle(metaindex_block_handle); + footer.set_index_handle(index_block_handle); + footer.set_checksum(r->table_options.checksum); + std::string footer_encoding; + footer.EncodeTo(&footer_encoding); + assert(r->status.ok()); + r->status = r->file->Append(footer_encoding); + if (r->status.ok()) { + r->offset += footer_encoding.size(); + } +} + Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; bool empty_data_block = r->data_block.empty(); @@ -889,13 +918,14 @@ Status BlockBasedTableBuilder::Finish() { &r->last_key, nullptr /* no next data block */, r->pending_handle); } - // Write meta blocks and metaindex block with the following order. + // Write meta blocks, metaindex block and footer in the following order. // 1. [meta block: filter] // 2. [meta block: index] // 3. [meta block: compression dictionary] // 4. [meta block: range deletion tombstone] // 5. [meta block: properties] // 6. [metaindex block] + // 7. Footer BlockHandle metaindex_block_handle, index_block_handle; MetaIndexBuilder meta_index_builder; WriteFilterBlock(&meta_index_builder); @@ -908,33 +938,8 @@ Status BlockBasedTableBuilder::Finish() { WriteRawBlock(meta_index_builder.Finish(), kNoCompression, &metaindex_block_handle); } - - // Write footer if (ok()) { - // No need to write out new footer if we're using default checksum. - // We're writing legacy magic number because we want old versions of RocksDB - // be able to read files generated with new release (just in case if - // somebody wants to roll back after an upgrade) - // TODO(icanadi) at some point in the future, when we're absolutely sure - // nobody will roll back to RocksDB 2.x versions, retire the legacy magic - // number and always write new table files with new magic number - bool legacy = (r->table_options.format_version == 0); - // this is guaranteed by BlockBasedTableBuilder's constructor - assert(r->table_options.checksum == kCRC32c || - r->table_options.format_version != 0); - Footer footer(legacy ? kLegacyBlockBasedTableMagicNumber - : kBlockBasedTableMagicNumber, - r->table_options.format_version); - footer.set_metaindex_handle(metaindex_block_handle); - footer.set_index_handle(index_block_handle); - footer.set_checksum(r->table_options.checksum); - std::string footer_encoding; - footer.EncodeTo(&footer_encoding); - assert(r->status.ok()); - r->status = r->file->Append(footer_encoding); - if (r->status.ok()) { - r->offset += footer_encoding.size(); - } + WriteFooter(metaindex_block_handle, index_block_handle); } return r->status; diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index eba0bb7c1..4dc715fd9 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -114,6 +114,8 @@ class BlockBasedTableBuilder : public TableBuilder { void WritePropertiesBlock(MetaIndexBuilder* meta_index_builder); void WriteCompressionDictBlock(MetaIndexBuilder* meta_index_builder); void WriteRangeDelBlock(MetaIndexBuilder* meta_index_builder); + void WriteFooter(BlockHandle& metaindex_block_handle, + BlockHandle& index_block_handle); struct Rep; class BlockBasedTablePropertiesCollectorFactory; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index fbc9af4ba..3fba7b103 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -780,49 +780,25 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, TailPrefetchStats* tail_prefetch_stats) { table_reader->reset(); + Status s; Footer footer; - std::unique_ptr prefetch_buffer; // prefetch both index and filters, down to all partitions const bool prefetch_all = prefetch_index_and_filter_in_cache || level == 0; const bool preload_all = !table_options.cache_index_and_filter_blocks; - size_t tail_prefetch_size = 0; - if (tail_prefetch_stats != nullptr) { - // Multiple threads may get a 0 (no history) when running in parallel, - // but it will get cleared after the first of them finishes. - tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize(); - } - if (tail_prefetch_size == 0) { - // Before read footer, readahead backwards to prefetch data. Do more - // readahead if we're going to read index/filter. - // TODO: This may incorrectly select small readahead in case partitioned - // index/filter is enabled and top-level partition pinning is enabled. - // That's because we need to issue readahead before we read the properties, - // at which point we don't yet know the index type. - tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024; - } - size_t prefetch_off; - size_t prefetch_len; - if (file_size < tail_prefetch_size) { - prefetch_off = 0; - prefetch_len = static_cast(file_size); - } else { - prefetch_off = static_cast(file_size - tail_prefetch_size); - prefetch_len = tail_prefetch_size; - } - TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen", - &tail_prefetch_size); - Status s; - // TODO should not have this special logic in the future. - if (!file->use_direct_io()) { - prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true)); - s = file->Prefetch(prefetch_off, prefetch_len); - } else { - prefetch_buffer.reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true)); - s = prefetch_buffer->Prefetch(file.get(), prefetch_off, prefetch_len); - } + s = PrefetchTail(file.get(), file_size, tail_prefetch_stats, prefetch_all, + preload_all, &prefetch_buffer); + + // Read in the following order: + // 1. Footer + // 2. [metaindex block] + // 3. [meta block: properties] + // 4. [meta block: range deletion tombstone] + // 5. [meta block: compression dictionary] + // 6. [meta block: index] + // 7. [meta block: filter] s = ReadFooterFromFile(file.get(), prefetch_buffer.get(), file_size, &footer, kBlockBasedTableMagicNumber); if (!s.ok()) { @@ -857,9 +833,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, PersistentCacheOptions(rep->table_options.persistent_cache, std::string(rep->persistent_cache_key_prefix, rep->persistent_cache_key_prefix_size), - rep->ioptions.statistics); + rep->ioptions.statistics); - // Read meta index + // Read metaindex std::unique_ptr meta; std::unique_ptr meta_iter; s = ReadMetaBlock(rep, prefetch_buffer.get(), &meta, &meta_iter); @@ -867,38 +843,85 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, return s; } - // Find filter handle and filter type - if (rep->filter_policy) { - for (auto filter_type : - {Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter, - Rep::FilterType::kBlockFilter}) { - std::string prefix; - switch (filter_type) { - case Rep::FilterType::kFullFilter: - prefix = kFullFilterBlockPrefix; - break; - case Rep::FilterType::kPartitionedFilter: - prefix = kPartitionedFilterBlockPrefix; - break; - case Rep::FilterType::kBlockFilter: - prefix = kFilterBlockPrefix; - break; - default: - assert(0); - } - std::string filter_block_key = prefix; - filter_block_key.append(rep->filter_policy->Name()); - if (FindMetaBlock(meta_iter.get(), filter_block_key, &rep->filter_handle) - .ok()) { - rep->filter_type = filter_type; - break; - } + s = ReadPropertiesBlock(rep, prefetch_buffer.get(), meta_iter.get(), + largest_seqno); + if (!s.ok()) { + return s; + } + // Disregard return status of ReadRangeDelBlock and ReadCompressionDictBlock. + s = ReadRangeDelBlock(rep, prefetch_buffer.get(), meta_iter.get(), + new_table.get(), internal_comparator); + s = ReadCompressionDictBlock(rep, prefetch_buffer.get(), meta_iter.get()); + + s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(), + new_table.get(), prefix_extractor, + prefetch_all, table_options, level, + prefetch_index_and_filter_in_cache); + + if (s.ok()) { + // Update tail prefetch stats + assert(prefetch_buffer.get() != nullptr); + if (tail_prefetch_stats != nullptr) { + assert(prefetch_buffer->min_offset_read() < file_size); + tail_prefetch_stats->RecordEffectiveSize( + static_cast(file_size) - prefetch_buffer->min_offset_read()); } + + *table_reader = std::move(new_table); } - // Read the properties + return s; +} + +Status BlockBasedTable::PrefetchTail( + RandomAccessFileReader* file, uint64_t file_size, + TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, + const bool preload_all, + std::unique_ptr* prefetch_buffer) { + size_t tail_prefetch_size = 0; + if (tail_prefetch_stats != nullptr) { + // Multiple threads may get a 0 (no history) when running in parallel, + // but it will get cleared after the first of them finishes. + tail_prefetch_size = tail_prefetch_stats->GetSuggestedPrefetchSize(); + } + if (tail_prefetch_size == 0) { + // Before read footer, readahead backwards to prefetch data. Do more + // readahead if we're going to read index/filter. + // TODO: This may incorrectly select small readahead in case partitioned + // index/filter is enabled and top-level partition pinning is enabled. + // That's because we need to issue readahead before we read the properties, + // at which point we don't yet know the index type. + tail_prefetch_size = prefetch_all || preload_all ? 512 * 1024 : 4 * 1024; + } + size_t prefetch_off; + size_t prefetch_len; + if (file_size < tail_prefetch_size) { + prefetch_off = 0; + prefetch_len = static_cast(file_size); + } else { + prefetch_off = static_cast(file_size - tail_prefetch_size); + prefetch_len = tail_prefetch_size; + } + TEST_SYNC_POINT_CALLBACK("BlockBasedTable::Open::TailPrefetchLen", + &tail_prefetch_size); + Status s; + // TODO should not have this special logic in the future. + if (!file->use_direct_io()) { + prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, false, true)); + s = file->Prefetch(prefetch_off, prefetch_len); + } else { + prefetch_buffer->reset(new FilePrefetchBuffer(nullptr, 0, 0, true, true)); + s = (*prefetch_buffer)->Prefetch(file, prefetch_off, prefetch_len); + } + return s; +} + +Status BlockBasedTable::ReadPropertiesBlock( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, + const SequenceNumber largest_seqno) { bool found_properties_block = true; - s = SeekToPropertiesBlock(meta_iter.get(), &found_properties_block); + Status s; + s = SeekToPropertiesBlock(meta_iter, &found_properties_block); if (!s.ok()) { ROCKS_LOG_WARN(rep->ioptions.info_log, @@ -908,9 +931,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, s = meta_iter->status(); TableProperties* table_properties = nullptr; if (s.ok()) { - s = ReadProperties(meta_iter->value(), rep->file.get(), - prefetch_buffer.get(), rep->footer, rep->ioptions, - &table_properties, + s = ReadProperties(meta_iter->value(), rep->file.get(), prefetch_buffer, + rep->footer, rep->ioptions, &table_properties, false /* compression_type_missing */, nullptr /* memory_allocator */); } @@ -937,11 +959,66 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, } #endif // ROCKSDB_LITE - // Read the compression dictionary meta block + // Read the table properties, if provided. + if (rep->table_properties) { + rep->whole_key_filtering &= + IsFeatureSupported(*(rep->table_properties), + BlockBasedTablePropertyNames::kWholeKeyFiltering, + rep->ioptions.info_log); + rep->prefix_filtering &= IsFeatureSupported( + *(rep->table_properties), + BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log); + + s = GetGlobalSequenceNumber(*(rep->table_properties), largest_seqno, + &(rep->global_seqno)); + if (!s.ok()) { + ROCKS_LOG_ERROR(rep->ioptions.info_log, "%s", s.ToString().c_str()); + } + } + return s; +} + +Status BlockBasedTable::ReadRangeDelBlock( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, + BlockBasedTable* new_table, + const InternalKeyComparator& internal_comparator) { + Status s; + bool found_range_del_block; + s = SeekToRangeDelBlock(meta_iter, &found_range_del_block, + &rep->range_del_handle); + if (!s.ok()) { + ROCKS_LOG_WARN( + rep->ioptions.info_log, + "Error when seeking to range delete tombstones block from file: %s", + s.ToString().c_str()); + } else if (found_range_del_block && !rep->range_del_handle.IsNull()) { + ReadOptions read_options; + s = MaybeReadBlockAndLoadToCache( + prefetch_buffer, rep, read_options, rep->range_del_handle, + Slice() /* compression_dict */, &rep->range_del_entry, + false /* is_index */, nullptr /* get_context */); + if (!s.ok()) { + ROCKS_LOG_WARN( + rep->ioptions.info_log, + "Encountered error while reading data from range del block %s", + s.ToString().c_str()); + } + auto iter = std::unique_ptr( + new_table->NewUnfragmentedRangeTombstoneIterator(read_options)); + rep->fragmented_range_dels = std::make_shared( + std::move(iter), internal_comparator); + } + return s; +} + +Status BlockBasedTable::ReadCompressionDictBlock( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter) { + Status s; bool found_compression_dict; BlockHandle compression_dict_handle; - s = SeekToCompressionDictBlock(meta_iter.get(), &found_compression_dict, - &compression_dict_handle); + s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict, + &compression_dict_handle); if (!s.ok()) { ROCKS_LOG_WARN( rep->ioptions.info_log, @@ -955,7 +1032,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, ReadOptions read_options; read_options.verify_checksums = false; BlockFetcher compression_block_fetcher( - rep->file.get(), prefetch_buffer.get(), rep->footer, read_options, + rep->file.get(), prefetch_buffer, rep->footer, read_options, compression_dict_handle, compression_dict_cont.get(), rep->ioptions, false /* decompress */, false /*maybe_compressed*/, Slice() /*compression dict*/, cache_options); @@ -971,52 +1048,45 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, rep->compression_dict_block = std::move(compression_dict_cont); } } + return s; +} - // Read the table properties, if provided. - if (rep->table_properties) { - rep->whole_key_filtering &= - IsFeatureSupported(*(rep->table_properties), - BlockBasedTablePropertyNames::kWholeKeyFiltering, - rep->ioptions.info_log); - rep->prefix_filtering &= IsFeatureSupported( - *(rep->table_properties), - BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log); +Status BlockBasedTable::PrefetchIndexAndFilterBlocks( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, + BlockBasedTable* new_table, const SliceTransform* prefix_extractor, + bool prefetch_all, const BlockBasedTableOptions& table_options, + const int level, const bool prefetch_index_and_filter_in_cache) { + Status s; - s = GetGlobalSequenceNumber(*(rep->table_properties), largest_seqno, - &(rep->global_seqno)); - if (!s.ok()) { - ROCKS_LOG_ERROR(rep->ioptions.info_log, "%s", s.ToString().c_str()); - return s; + // Find filter handle and filter type + if (rep->filter_policy) { + for (auto filter_type : + {Rep::FilterType::kFullFilter, Rep::FilterType::kPartitionedFilter, + Rep::FilterType::kBlockFilter}) { + std::string prefix; + switch (filter_type) { + case Rep::FilterType::kFullFilter: + prefix = kFullFilterBlockPrefix; + break; + case Rep::FilterType::kPartitionedFilter: + prefix = kPartitionedFilterBlockPrefix; + break; + case Rep::FilterType::kBlockFilter: + prefix = kFilterBlockPrefix; + break; + default: + assert(0); + } + std::string filter_block_key = prefix; + filter_block_key.append(rep->filter_policy->Name()); + if (FindMetaBlock(meta_iter, filter_block_key, &rep->filter_handle) + .ok()) { + rep->filter_type = filter_type; + break; + } } } - // Read the range del meta block - bool found_range_del_block; - s = SeekToRangeDelBlock(meta_iter.get(), &found_range_del_block, - &rep->range_del_handle); - if (!s.ok()) { - ROCKS_LOG_WARN( - rep->ioptions.info_log, - "Error when seeking to range delete tombstones block from file: %s", - s.ToString().c_str()); - } else if (found_range_del_block && !rep->range_del_handle.IsNull()) { - ReadOptions read_options; - s = MaybeReadBlockAndLoadToCache( - prefetch_buffer.get(), rep, read_options, rep->range_del_handle, - Slice() /* compression_dict */, &rep->range_del_entry, - false /* is_index */, nullptr /* get_context */); - if (!s.ok()) { - ROCKS_LOG_WARN( - rep->ioptions.info_log, - "Encountered error while reading data from range del block %s", - s.ToString().c_str()); - } - auto iter = std::unique_ptr( - new_table->NewUnfragmentedRangeTombstoneIterator(read_options)); - rep->fragmented_range_dels = std::make_shared( - std::move(iter), internal_comparator); - } - bool need_upper_bound_check = PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor); @@ -1097,8 +1167,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // pre-load these blocks, which will kept in member variables in Rep // and with a same life-time as this table object. IndexReader* index_reader = nullptr; - s = new_table->CreateIndexReader(prefetch_buffer.get(), &index_reader, - meta_iter.get(), level); + s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, meta_iter, + level); if (s.ok()) { rep->index_reader.reset(index_reader); // The partitions of partitioned index are always stored in cache. They @@ -1111,9 +1181,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // Set filter block if (rep->filter_policy) { const bool is_a_filter_partition = true; - auto filter = new_table->ReadFilter( - prefetch_buffer.get(), rep->filter_handle, !is_a_filter_partition, - rep->table_prefix_extractor.get()); + auto filter = new_table->ReadFilter(prefetch_buffer, rep->filter_handle, + !is_a_filter_partition, + rep->table_prefix_extractor.get()); rep->filter.reset(filter); // Refer to the comment above about paritioned indexes always being // cached @@ -1126,16 +1196,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, } } - if (s.ok()) { - assert(prefetch_buffer.get() != nullptr); - if (tail_prefetch_stats != nullptr) { - assert(prefetch_buffer->min_offset_read() < file_size); - tail_prefetch_stats->RecordEffectiveSize( - static_cast(file_size) - prefetch_buffer->min_offset_read()); - } - *table_reader = std::move(new_table); - } - return s; } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index cb6a86566..3fe0d0c1a 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -351,10 +351,31 @@ class BlockBasedTable : public TableReader { const Slice& user_key, const bool no_io, const SliceTransform* prefix_extractor = nullptr) const; - // Read the meta block from sst. + static Status PrefetchTail( + RandomAccessFileReader* file, uint64_t file_size, + TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all, + const bool preload_all, + std::unique_ptr* prefetch_buffer); static Status ReadMetaBlock(Rep* rep, FilePrefetchBuffer* prefetch_buffer, std::unique_ptr* meta_block, std::unique_ptr* iter); + static Status ReadPropertiesBlock(Rep* rep, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter, + const SequenceNumber largest_seqno); + static Status ReadRangeDelBlock( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter, BlockBasedTable* new_table, + const InternalKeyComparator& internal_comparator); + static Status ReadCompressionDictBlock(Rep* rep, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter); + static Status PrefetchIndexAndFilterBlocks( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_iter, BlockBasedTable* new_table, + const SliceTransform* prefix_extractor, bool prefetch_all, + const BlockBasedTableOptions& table_options, const int level, + const bool prefetch_index_and_filter_in_cache); Status VerifyChecksumInBlocks(InternalIteratorBase* index_iter); Status VerifyChecksumInBlocks(InternalIteratorBase* index_iter);