diff --git a/HISTORY.md b/HISTORY.md index 42fd34181..a74567380 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ ### New Features * Made the EventListener extend the Customizable class. * EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file. +* Insert warm blocks (data blocks, uncompressed dict blocks, index and filter blocks) in Block cache during flush under option BlockBasedTableOptions.prepopulate_block_cache. Previously it was enabled for only data blocks. ### Performance Improvements * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value. diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index 6c03dbf44..faf6f8a50 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -497,12 +497,48 @@ TEST_F(DBBlockCacheTest, WarmCacheWithDataBlocksDuringFlush) { ASSERT_OK(Put(ToString(i), value)); ASSERT_OK(Flush()); ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD)); - ASSERT_EQ(value, Get(ToString(i))); ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS)); ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT)); } } + +// This test cache all types of blocks during flush. +TEST_F(DBBlockCacheTest, WarmCacheWithBlocksDuringFlush) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); + + BlockBasedTableOptions table_options; + table_options.block_cache = NewLRUCache(1 << 25, 0, false); + table_options.cache_index_and_filter_blocks = true; + table_options.prepopulate_block_cache = + BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly; + table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + std::string value(kValueSize, 'a'); + for (size_t i = 1; i < 2; i++) { + ASSERT_OK(Put(ToString(i), value)); + ASSERT_OK(Flush()); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD)); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD)); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD)); + + ASSERT_EQ(value, Get(ToString(i))); + + ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS)); + ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT)); + + ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(i * 3, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_HIT)); + + ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(i * 2, + options.statistics->getTickerCount(BLOCK_CACHE_FILTER_HIT)); + } +} #endif namespace { diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index 13b31ee47..94045ff6b 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -464,22 +464,18 @@ struct BlockBasedTableOptions { // Default: 256 KB (256 * 1024). size_t max_auto_readahead_size = 256 * 1024; - // If enabled, prepopulate warm/hot data blocks which are already in memory - // into block cache at the time of flush. On a flush, the data block that is - // in memory (in memtables) get flushed to the device. If using Direct IO, - // additional IO is incurred to read this data back into memory again, which - // is avoided by enabling this option. This further helps if the workload - // exhibits high temporal locality, where most of the reads go to recently - // written data. This also helps in case of Distributed FileSystem. - // - // Right now, this is enabled only for flush for data blocks. We plan to - // expand this option to cover compactions in the future and for other types - // of blocks. + // If enabled, prepopulate warm/hot blocks (data, uncompressed dict, index and + // filter blocks) which are already in memory into block cache at the time of + // flush. On a flush, the block that is in memory (in memtables) get flushed + // to the device. If using Direct IO, additional IO is incurred to read this + // data back into memory again, which is avoided by enabling this option. This + // further helps if the workload exhibits high temporal locality, where most + // of the reads go to recently written data. This also helps in case of + // Distributed FileSystem. enum class PrepopulateBlockCache : char { // Disable prepopulate block cache. kDisable, - // Prepopulate data blocks during flush only. Plan to extend it to all block - // types. + // Prepopulate blocks during flush only. kFlushOnly, }; diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index bb8cfa14d..189a5dd99 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -36,6 +36,7 @@ #include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_builder.h" +#include "table/block_based/block_like_traits.h" #include "table/block_based/filter_block.h" #include "table/block_based/filter_policy_internal.h" #include "table/block_based/full_filter_block.h" @@ -994,33 +995,34 @@ void BlockBasedTableBuilder::Flush() { r->get_offset()); r->pc_rep->EmitBlock(block_rep); } else { - WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); + WriteBlock(&r->data_block, &r->pending_handle, BlockType::kData); } } void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle, - bool is_data_block) { + BlockType block_type) { block->Finish(); std::string raw_block_contents; block->SwapAndReset(raw_block_contents); if (rep_->state == Rep::State::kBuffered) { - assert(is_data_block); + assert(block_type == BlockType::kData); rep_->data_block_buffers.emplace_back(std::move(raw_block_contents)); rep_->data_begin_offset += rep_->data_block_buffers.back().size(); return; } - WriteBlock(raw_block_contents, handle, is_data_block); + WriteBlock(raw_block_contents, handle, block_type); } void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, BlockHandle* handle, - bool is_data_block) { + BlockType block_type) { Rep* r = rep_; assert(r->state == Rep::State::kUnbuffered); Slice block_contents; CompressionType type; Status compress_status; + bool is_data_block = block_type == BlockType::kData; CompressAndVerifyBlock(raw_block_contents, is_data_block, *(r->compression_ctxs[0]), r->verify_ctxs[0].get(), &(r->compressed_output), &(block_contents), &type, @@ -1030,8 +1032,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, return; } - WriteRawBlock(block_contents, type, handle, is_data_block, - &raw_block_contents); + WriteRawBlock(block_contents, type, handle, block_type, &raw_block_contents); r->compressed_output.clear(); if (is_data_block) { if (r->filter_builder != nullptr) { @@ -1189,9 +1190,10 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock( void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, CompressionType type, BlockHandle* handle, - bool is_data_block, + BlockType block_type, const Slice* raw_block_contents) { Rep* r = rep_; + bool is_data_block = block_type == BlockType::kData; Status s = Status::OK(); IOStatus io_s = IOStatus::OK(); StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS); @@ -1247,13 +1249,12 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, io_s = r->file->Append(Slice(trailer, kBlockTrailerSize)); if (io_s.ok()) { assert(s.ok()); - if (is_data_block && - r->table_options.prepopulate_block_cache == - BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly) { + if (r->table_options.prepopulate_block_cache == + BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly) { if (type == kNoCompression) { - s = InsertBlockInCache(block_contents, handle); + s = InsertBlockInCacheHelper(block_contents, handle, block_type); } else if (raw_block_contents != nullptr) { - s = InsertBlockInCache(*raw_block_contents, handle); + s = InsertBlockInCacheHelper(*raw_block_contents, handle, block_type); } if (!s.ok()) { r->SetStatus(s); @@ -1328,10 +1329,8 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() { } r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size()); - WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type, - &r->pending_handle, true /* is_data_block*/, - &block_rep->contents); + &r->pending_handle, BlockType::kData, &block_rep->contents); if (!ok()) { break; } @@ -1460,8 +1459,30 @@ Status BlockBasedTableBuilder::InsertBlockInCompressedCache( return s; } +Status BlockBasedTableBuilder::InsertBlockInCacheHelper( + const Slice& block_contents, const BlockHandle* handle, + BlockType block_type) { + Status s; + if (block_type == BlockType::kData || block_type == BlockType::kIndex) { + s = InsertBlockInCache(block_contents, handle, block_type); + } else if (block_type == BlockType::kFilter) { + if (rep_->filter_builder->IsBlockBased()) { + s = InsertBlockInCache(block_contents, handle, block_type); + } else { + s = InsertBlockInCache(block_contents, handle, + block_type); + } + } else if (block_type == BlockType::kCompressionDictionary) { + s = InsertBlockInCache(block_contents, handle, + block_type); + } + return s; +} + +template Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, - const BlockHandle* handle) { + const BlockHandle* handle, + BlockType block_type) { // Uncompressed regular block cache Cache* block_cache = rep_->table_options.block_cache.get(); Status s; @@ -1479,15 +1500,25 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, const size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit; - Block* block = new Block(std::move(results), read_amp_bytes_per_bit); - size_t charge = block->ApproximateMemoryUsage(); - s = block_cache->Insert(key, block, charge, &DeleteEntryCached); - if (s.ok()) { - BlockBasedTable::UpdateCacheInsertionMetrics( - BlockType::kData, nullptr /*get_context*/, charge, - s.IsOkOverwritten(), rep_->ioptions.stats); - } else { - RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES); + + TBlocklike* block_holder = BlocklikeTraits::Create( + std::move(results), read_amp_bytes_per_bit, + rep_->ioptions.statistics.get(), + false /*rep_->blocks_definitely_zstd_compressed*/, + rep_->table_options.filter_policy.get()); + + if (block_holder->own_bytes()) { + size_t charge = block_holder->ApproximateMemoryUsage(); + s = block_cache->Insert(key, block_holder, charge, + &DeleteEntryCached); + + if (s.ok()) { + BlockBasedTable::UpdateCacheInsertionMetrics( + block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(), + rep_->ioptions.stats); + } else { + RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES); + } } } return s; @@ -1507,7 +1538,8 @@ void BlockBasedTableBuilder::WriteFilterBlock( rep_->filter_builder->Finish(filter_block_handle, &s); assert(s.ok() || s.IsIncomplete()); rep_->props.filter_size += filter_content.size(); - WriteRawBlock(filter_content, kNoCompression, &filter_block_handle); + WriteRawBlock(filter_content, kNoCompression, &filter_block_handle, + BlockType::kFilter); } } if (ok() && !empty_filter_block) { @@ -1541,7 +1573,7 @@ void BlockBasedTableBuilder::WriteIndexBlock( if (ok()) { for (const auto& item : index_blocks.meta_blocks) { BlockHandle block_handle; - WriteBlock(item.second, &block_handle, false /* is_data_block */); + WriteBlock(item.second, &block_handle, BlockType::kIndex); if (!ok()) { break; } @@ -1550,10 +1582,11 @@ void BlockBasedTableBuilder::WriteIndexBlock( } if (ok()) { if (rep_->table_options.enable_index_compression) { - WriteBlock(index_blocks.index_block_contents, index_block_handle, false); + WriteBlock(index_blocks.index_block_contents, index_block_handle, + BlockType::kIndex); } else { WriteRawBlock(index_blocks.index_block_contents, kNoCompression, - index_block_handle); + index_block_handle, BlockType::kIndex); } } // If there are more index partitions, finish them and write them out @@ -1567,10 +1600,10 @@ void BlockBasedTableBuilder::WriteIndexBlock( } if (rep_->table_options.enable_index_compression) { WriteBlock(index_blocks.index_block_contents, index_block_handle, - false); + BlockType::kIndex); } else { WriteRawBlock(index_blocks.index_block_contents, kNoCompression, - index_block_handle); + index_block_handle, BlockType::kIndex); } // The last index_block_handle will be for the partition index block } @@ -1665,7 +1698,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock( &property_block_builder); WriteRawBlock(property_block_builder.Finish(), kNoCompression, - &properties_block_handle); + &properties_block_handle, BlockType::kProperties); } if (ok()) { #ifndef NDEBUG @@ -1691,7 +1724,8 @@ void BlockBasedTableBuilder::WriteCompressionDictBlock( BlockHandle compression_dict_block_handle; if (ok()) { WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression, - &compression_dict_block_handle); + &compression_dict_block_handle, + BlockType::kCompressionDictionary); #ifndef NDEBUG Slice compression_dict = rep_->compression_dict->GetRawDict(); TEST_SYNC_POINT_CALLBACK( @@ -1711,7 +1745,7 @@ void BlockBasedTableBuilder::WriteRangeDelBlock( if (ok() && !rep_->range_del_block.empty()) { BlockHandle range_del_block_handle; WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression, - &range_del_block_handle); + &range_del_block_handle, BlockType::kRangeDeletion); meta_index_builder->Add(kRangeDelBlock, range_del_block_handle); } } @@ -1872,8 +1906,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() { } r->index_builder->OnKeyAdded(key); } - WriteBlock(Slice(data_block), &r->pending_handle, - true /* is_data_block */); + WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData); if (ok() && i + 1 < r->data_block_buffers.size()) { assert(next_block_iter != nullptr); Slice first_key_in_next_block = next_block_iter->key(); @@ -1935,7 +1968,7 @@ Status BlockBasedTableBuilder::Finish() { if (ok()) { // flush the meta index block WriteRawBlock(meta_index_builder.Finish(), kNoCompression, - &metaindex_block_handle); + &metaindex_block_handle, BlockType::kMetaIndex); } if (ok()) { WriteFooter(metaindex_block_handle, index_block_handle); diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 65be35b19..ed91dbf32 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -9,6 +9,7 @@ #pragma once #include + #include #include #include @@ -108,20 +109,27 @@ class BlockBasedTableBuilder : public TableBuilder { // Call block's Finish() method and then // - in buffered mode, buffer the uncompressed block contents. // - in unbuffered mode, write the compressed block contents to file. - void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block); + void WriteBlock(BlockBuilder* block, BlockHandle* handle, + BlockType blocktype); // Compress and write block content to the file. void WriteBlock(const Slice& block_contents, BlockHandle* handle, - bool is_data_block); + BlockType block_type); // Directly write data to the file. void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle, - bool is_data_block = false, - const Slice* raw_data = nullptr); + + BlockType block_type, const Slice* raw_data = nullptr); void SetupCacheKeyPrefix(const TableBuilderOptions& tbo); + template Status InsertBlockInCache(const Slice& block_contents, - const BlockHandle* handle); + const BlockHandle* handle, BlockType block_type); + + Status InsertBlockInCacheHelper(const Slice& block_contents, + const BlockHandle* handle, + BlockType block_type); + Status InsertBlockInCompressedCache(const Slice& block_contents, const CompressionType type, const BlockHandle* handle);