From 88ba331c1acc90fc80c0d8d7821556d3786467dc Mon Sep 17 00:00:00 2001 From: Kai Liu Date: Tue, 12 Nov 2013 22:46:51 -0800 Subject: [PATCH] Add the index/filter block cache Summary: This diff leverage the existing block cache and extend it to cache index/filter block. Test Plan: Added new tests in db_test and table_test The correctness is checked by: 1. make check 2. make valgrind_check Performance is test by: 1. 10 times of build_tools/regression_build_test.sh on two versions of rocksdb before/after the code change. Test results suggests no significant difference between them. For the two key operatons `overwrite` and `readrandom`, the average iops are both 20k and ~260k, with very small variance). 2. db_stress. Reviewers: dhruba Reviewed By: dhruba CC: leveldb, haobo, xjin Differential Revision: https://reviews.facebook.net/D13167 --- db/db_test.cc | 108 +++- include/rocksdb/options.h | 3 +- include/rocksdb/statistics.h | 32 +- table/block_based_table_reader.cc | 833 +++++++++++++++++++++--------- table/block_based_table_reader.h | 49 ++ table/filter_block.cc | 6 +- table/filter_block.h | 9 +- table/table_test.cc | 189 ++++++- tools/db_stress.cc | 10 +- 9 files changed, 945 insertions(+), 294 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 1556a932c..048256549 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -698,6 +698,63 @@ TEST(DBTest, ReadWrite) { } while (ChangeOptions()); } +// Make sure that when options.block_cache is set, after a new table is +// created its index/filter blocks are added to block cache. +TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) { + Options options = CurrentOptions(); + std::unique_ptr filter_policy(NewBloomFilterPolicy(20)); + options.filter_policy = filter_policy.get(); + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + DestroyAndReopen(&options); + + ASSERT_OK(db_->Put(WriteOptions(), "key", "val")); + // Create a new talbe. + dbfull()->Flush(FlushOptions()); + + // index/filter blocks added to block cache right after table creation. + ASSERT_EQ(1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(2, /* only index/filter were added */ + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); + ASSERT_EQ(0, + options.statistics.get()->getTickerCount(BLOCK_CACHE_DATA_MISS)); + + // Make sure filter block is in cache. + std::string value; + ReadOptions ropt; + db_->KeyMayExist(ReadOptions(), "key", &value); + + // Miss count should remain the same. + ASSERT_EQ(1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT)); + + db_->KeyMayExist(ReadOptions(), "key", &value); + ASSERT_EQ(1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(2, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT)); + + // Make sure index block is in cache. + auto index_block_hit = + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT); + value = Get("key"); + ASSERT_EQ(1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(index_block_hit + 1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT)); + + value = Get("key"); + ASSERT_EQ(1, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(index_block_hit + 2, + options.statistics.get()->getTickerCount(BLOCK_CACHE_FILTER_HIT)); +} + static std::string Key(int i) { char buf[100]; snprintf(buf, sizeof(buf), "key%06d", i); @@ -768,6 +825,7 @@ TEST(DBTest, PutDeleteGet) { } while (ChangeOptions()); } + TEST(DBTest, GetFromImmutableLayer) { do { Options options = CurrentOptions(); @@ -917,43 +975,46 @@ TEST(DBTest, KeyMayExist) { value.clear(); long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); - long cache_miss = - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + long cache_added = + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD); ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); ASSERT_TRUE(!value_found); // assert that no new files were opened and no new blocks were // read into block cache. ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); - ASSERT_EQ(cache_miss, - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + ASSERT_EQ(cache_added, + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); ASSERT_OK(db_->Delete(WriteOptions(), "a")); numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); - cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + cache_added = + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); - ASSERT_EQ(cache_miss, - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + ASSERT_EQ(cache_added, + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); dbfull()->Flush(FlushOptions()); dbfull()->CompactRange(nullptr, nullptr); numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); - cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + cache_added = + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); - ASSERT_EQ(cache_miss, - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + ASSERT_EQ(cache_added, + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); ASSERT_OK(db_->Delete(WriteOptions(), "c")); numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); - cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + cache_added = + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD); ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value)); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); - ASSERT_EQ(cache_miss, - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + ASSERT_EQ(cache_added, + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); delete options.filter_policy; } while (ChangeOptions()); @@ -987,8 +1048,8 @@ TEST(DBTest, NonBlockingIteration) { // verify that a non-blocking iterator does not find any // kvs. Neither does it do any IOs to storage. long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); - long cache_miss = - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + long cache_added = + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD); iter = db_->NewIterator(non_blocking_opts); count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -997,8 +1058,8 @@ TEST(DBTest, NonBlockingIteration) { ASSERT_EQ(count, 0); ASSERT_TRUE(iter->status().IsIncomplete()); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); - ASSERT_EQ(cache_miss, - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + ASSERT_EQ(cache_added, + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); delete iter; // read in the specified block via a regular get @@ -1006,7 +1067,8 @@ TEST(DBTest, NonBlockingIteration) { // verify that we can find it via a non-blocking scan numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); - cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + cache_added = + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD); iter = db_->NewIterator(non_blocking_opts); count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { @@ -1015,8 +1077,8 @@ TEST(DBTest, NonBlockingIteration) { } ASSERT_EQ(count, 1); ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); - ASSERT_EQ(cache_miss, - options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + ASSERT_EQ(cache_added, + options.statistics.get()->getTickerCount(BLOCK_CACHE_ADD)); delete iter; } while (ChangeOptions()); @@ -3534,7 +3596,7 @@ TEST(DBTest, BloomFilter) { env_->count_random_reads_ = true; Options options = CurrentOptions(); options.env = env_; - options.block_cache = NewLRUCache(0); // Prevent cache hits + options.no_block_cache = true; options.filter_policy = NewBloomFilterPolicy(10); Reopen(&options); @@ -4128,7 +4190,7 @@ TEST(DBTest, ReadCompaction) { options.write_buffer_size = 64 * 1024; options.filter_policy = nullptr; options.block_size = 4096; - options.block_cache = NewLRUCache(0); // Prevent cache hits + options.no_block_cache = true; Reopen(&options); @@ -4708,7 +4770,7 @@ TEST(DBTest, PrefixScan) { env_->count_random_reads_ = true; Options options = CurrentOptions(); options.env = env_; - options.block_cache = NewLRUCache(0); // Prevent cache hits + options.no_block_cache = true; options.filter_policy = NewBloomFilterPolicy(10); options.prefix_extractor = prefix_extractor; options.whole_key_filtering = false; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 5aca34065..43a0bcd91 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -653,7 +653,8 @@ struct ReadOptions { // Default: false bool verify_checksums; - // Should the data read for this iteration be cached in memory? + // Should the "data block"/"index block"/"filter block" read for this + // iteration be cached in memory? // Callers may wish to set this field to false for bulk scans. // Default: true bool fill_cache; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 44cf1d3ab..dd273af1b 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -23,9 +23,32 @@ namespace rocksdb { * And incrementing TICKER_ENUM_MAX. */ enum Tickers { + // total block cache misses + // REQUIRES: BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + + // BLOCK_CACHE_FILTER_MISS + + // BLOCK_CACHE_DATA_MISS; BLOCK_CACHE_MISS, + // total block cache hit + // REQUIRES: BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + + // BLOCK_CACHE_FILTER_HIT + + // BLOCK_CACHE_DATA_HIT; BLOCK_CACHE_HIT, - BLOOM_FILTER_USEFUL, // no. of times bloom filter has avoided file reads. + // # of blocks added to block cache. + BLOCK_CACHE_ADD, + // # of times cache miss when accessing index block from block cache. + BLOCK_CACHE_INDEX_MISS, + // # of times cache hit when accessing index block from block cache. + BLOCK_CACHE_INDEX_HIT, + // # of times cache miss when accessing filter block from block cache. + BLOCK_CACHE_FILTER_MISS, + // # of times cache hit when accessing filter block from block cache. + BLOCK_CACHE_FILTER_HIT, + // # of times cache miss when accessing data block from block cache. + BLOCK_CACHE_DATA_MISS, + // # of times cache hit when accessing data block from block cache. + BLOCK_CACHE_DATA_HIT, + // # of times bloom filter has avoided file reads. + BLOOM_FILTER_USEFUL, /** * COMPACTION_KEY_DROP_* count the reasons for key drop during compaction @@ -93,6 +116,13 @@ enum Tickers { const std::vector> TickersNameMap = { { BLOCK_CACHE_MISS, "rocksdb.block.cache.miss" }, { BLOCK_CACHE_HIT, "rocksdb.block.cache.hit" }, + { BLOCK_CACHE_ADD, "rocksdb.block.cache.add" }, + { BLOCK_CACHE_INDEX_MISS, "rocksdb.block.cache.index.miss" }, + { BLOCK_CACHE_INDEX_HIT, "rocksdb.block.cache.index.hit" }, + { BLOCK_CACHE_FILTER_MISS, "rocksdb.block.cache.filter.miss" }, + { BLOCK_CACHE_FILTER_HIT, "rocksdb.block.cache.filter.hit" }, + { BLOCK_CACHE_DATA_MISS, "rocksdb.block.cache.data.miss" }, + { BLOCK_CACHE_DATA_HIT, "rocksdb.block.cache.data.hit" }, { BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful" }, { COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new" }, { COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete" }, diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index fdcfe6ba8..54b75336c 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -33,13 +33,9 @@ namespace rocksdb { // We are using the fact that we know for Posix files the unique ID is three // varints. const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; +using std::unique_ptr; struct BlockBasedTable::Rep { - ~Rep() { - delete filter; - delete [] filter_data; - delete index_block; - } Rep(const EnvOptions& storage_options) : soptions(storage_options) { } @@ -52,11 +48,16 @@ struct BlockBasedTable::Rep { size_t cache_key_prefix_size; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size; - FilterBlockReader* filter; - const char* filter_data; - BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer - Block* index_block; + // Handle to metaindex_block: saved from footer + BlockHandle metaindex_handle; + // Handle to index: saved from footer + BlockHandle index_handle; + // index_block will be populated and used only when options.block_cache is + // NULL; otherwise we will get the index block via the block cache. + unique_ptr index_block; + unique_ptr filter; + TableStats table_stats; }; @@ -64,6 +65,30 @@ BlockBasedTable::~BlockBasedTable() { delete rep_; } +// CachableEntry represents the entries that *may* be fetched from block cache. +// field `value` is the item we want to get. +// field `cache_handle` is the cache handle to the block cache. If the value +// was not read from cache, `cache_handle` will be nullptr. +template +struct BlockBasedTable::CachableEntry { + CachableEntry(TValue* value, Cache::Handle* cache_handle) + : value(value) + , cache_handle(cache_handle) { + } + CachableEntry(): CachableEntry(nullptr, nullptr) { } + void Release(Cache* cache) { + if (cache_handle) { + cache->Release(cache_handle); + value = nullptr; + cache_handle = nullptr; + } + } + + TValue* value = nullptr; + // if the entry is from the cache, cache_handle will be populated. + Cache::Handle* cache_handle = nullptr; +}; + // Helper function to setup the cache key's prefix for the Table. void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { assert(kMaxCacheKeyPrefixSize >= 10); @@ -116,13 +141,14 @@ namespace { // anonymous namespace, not visible externally // Set *didIO to true if didIO is not null. // On failure return non-OK. // On success fill *result and return OK - caller owns *result -Status ReadBlock(RandomAccessFile* file, - const ReadOptions& options, - const BlockHandle& handle, - Block** result, - Env* env, - bool* didIO = nullptr, - bool do_uncompress = true) { +Status ReadBlockFromFile( + RandomAccessFile* file, + const ReadOptions& options, + const BlockHandle& handle, + Block** result, + Env* env, + bool* didIO = nullptr, + bool do_uncompress = true) { BlockContents contents; Status s = ReadBlockContents(file, options, handle, &contents, env, do_uncompress); @@ -136,6 +162,62 @@ Status ReadBlock(RandomAccessFile* file, return s; } +void DeleteBlock(void* arg, void* ignored) { + delete reinterpret_cast(arg); +} + +void DeleteCachedBlock(const Slice& key, void* value) { + Block* block = reinterpret_cast(value); + delete block; +} + +void DeleteCachedFilter(const Slice& key, void* value) { + auto filter = reinterpret_cast(value); + delete filter; +} + +void ReleaseBlock(void* arg, void* h) { + Cache* cache = reinterpret_cast(arg); + Cache::Handle* handle = reinterpret_cast(h); + cache->Release(handle); +} + +Slice GetCacheKey(const char* cache_key_prefix, + size_t cache_key_prefix_size, + const BlockHandle& handle, + char* cache_key) { + assert(cache_key != nullptr); + assert(cache_key_prefix_size != 0); + assert(cache_key_prefix_size <= kMaxCacheKeyPrefixSize); + memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + cache_key_prefix_size, + handle.offset()); + return Slice(cache_key, static_cast(end - cache_key)); +} + +Cache::Handle* GetFromBlockCache( + Cache* block_cache, + const Slice& key, + Tickers block_cache_miss_ticker, + Tickers block_cache_hit_ticker, + std::shared_ptr statistics) { + auto cache_handle = block_cache->Lookup(key); + if (cache_handle != nullptr) { + BumpPerfCount(&perf_context.block_cache_hit_count); + // overall cache hit + RecordTick(statistics, BLOCK_CACHE_HIT); + // block-type specific cache hit + RecordTick(statistics, block_cache_hit_ticker); + } else { + // overall cache miss + RecordTick(statistics, BLOCK_CACHE_MISS); + // block-type specific cache miss + RecordTick(statistics, block_cache_miss_ticker); + } + + return cache_handle; +} + } // end of anonymous namespace Status BlockBasedTable::Open(const Options& options, @@ -164,27 +246,85 @@ Status BlockBasedTable::Open(const Options& options, s = footer.DecodeFrom(&footer_input); if (!s.ok()) return s; - Block* index_block = nullptr; - // TODO: we never really verify check sum for index block - s = ReadBlock(file.get(), ReadOptions(), footer.index_handle(), &index_block, - options.env); + // We've successfully read the footer and the index block: we're + // ready to serve requests. + Rep* rep = new BlockBasedTable::Rep(soptions); + rep->options = options; + rep->file = std::move(file); + rep->metaindex_handle = footer.metaindex_handle(); + rep->index_handle = footer.index_handle(); + SetupCacheKeyPrefix(rep); + unique_ptr new_table(new BlockBasedTable(rep)); + + // Read meta index + std::unique_ptr meta; + std::unique_ptr meta_iter; + s = ReadMetaBlock(rep, &meta, &meta_iter); + + // Read the stats + meta_iter->Seek(kStatsBlock); + if (meta_iter->Valid() && meta_iter->key() == Slice(kStatsBlock)) { + s = meta_iter->status(); + if (s.ok()) { + s = ReadStats(meta_iter->value(), rep, &rep->table_stats); + } + + if (!s.ok()) { + auto err_msg = + "[Warning] Encountered error while reading data from stats block " + + s.ToString(); + Log(rep->options.info_log, err_msg.c_str()); + } + } + + // Initialize index/filter blocks. If block cache is not specified, + // these blocks will be kept in member variables in Rep, which will + // reside in the memory as long as this table object is alive; otherwise + // they will be added to block cache. + if (!options.block_cache) { + Block* index_block = nullptr; + // TODO: we never really verify check sum for index block + s = ReadBlockFromFile( + rep->file.get(), + ReadOptions(), + footer.index_handle(), + &index_block, + options.env + ); + + if (s.ok()) { + assert(index_block->compressionType() == kNoCompression); + rep->index_block.reset(index_block); + + // Set index block + if (rep->options.filter_policy) { + std::string key = kFilterBlockPrefix; + key.append(rep->options.filter_policy->Name()); + meta_iter->Seek(key); + + if (meta_iter->Valid() && meta_iter->key() == Slice(key)) { + rep->filter.reset(ReadFilter(meta_iter->value(), rep)); + } + } + } else { + delete index_block; + } + } else { + // Call IndexBlockReader() to implicitly add index to the block_cache + unique_ptr iter( + new_table->IndexBlockReader(ReadOptions()) + ); + s = iter->status(); + + if (s.ok()) { + // Call GetFilter() to implicitly add filter to the block_cache + auto filter_entry = new_table->GetFilter(); + filter_entry.Release(options.block_cache.get()); + } + } if (s.ok()) { - // We've successfully read the footer and the index block: we're - // ready to serve requests. - assert(index_block->compressionType() == kNoCompression); - BlockBasedTable::Rep* rep = new BlockBasedTable::Rep(soptions); - rep->options = options; - rep->file = std::move(file); - rep->metaindex_handle = footer.metaindex_handle(); - rep->index_block = index_block; - SetupCacheKeyPrefix(rep); - rep->filter_data = nullptr; - rep->filter = nullptr; - table_reader->reset(new BlockBasedTable(rep)); - ((BlockBasedTable*) (table_reader->get()))->ReadMeta(footer); - } else { - if (index_block) delete index_block; + *table_reader = std::move(new_table); } return s; @@ -213,72 +353,71 @@ TableStats& BlockBasedTable::GetTableStats() { return rep_->table_stats; } -void BlockBasedTable::ReadMeta(const Footer& footer) { +// Load the meta-block from the file. On success, return the loaded meta block +// and its iterator. +Status BlockBasedTable::ReadMetaBlock( + Rep* rep, + std::unique_ptr* meta_block, + std::unique_ptr* iter) { // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates // it is an empty block. // TODO: we never really verify check sum for meta index block Block* meta = nullptr; - if (!ReadBlock(rep_->file.get(), ReadOptions(), footer.metaindex_handle(), - &meta, rep_->options.env).ok()) { - // Do not propagate errors since meta info is not needed for operation - return; - } - assert(meta->compressionType() == kNoCompression); - - Iterator* iter = meta->NewIterator(BytewiseComparator()); - // read filter - if (rep_->options.filter_policy) { - std::string key = kFilterBlockPrefix; - key.append(rep_->options.filter_policy->Name()); - iter->Seek(key); - - if (iter->Valid() && iter->key() == Slice(key)) { - ReadFilter(iter->value()); - } - } - - // read stats - iter->Seek(kStatsBlock); - if (iter->Valid() && iter->key() == Slice(kStatsBlock)) { - auto s = iter->status(); - if (s.ok()) { - s = ReadStats(iter->value(), rep_); - } + Status s = ReadBlockFromFile( + rep->file.get(), + ReadOptions(), + rep->metaindex_handle, + &meta, + rep->options.env); if (!s.ok()) { auto err_msg = "[Warning] Encountered error while reading data from stats block " + s.ToString(); - Log(rep_->options.info_log, "%s", err_msg.c_str()); + Log(rep->options.info_log, "%s", err_msg.c_str()); } + if (!s.ok()) { + delete meta; + return s; } - delete iter; - delete meta; + meta_block->reset(meta); + // meta block uses bytewise comparator. + iter->reset(meta->NewIterator(BytewiseComparator())); + return Status::OK(); } -void BlockBasedTable::ReadFilter(const Slice& filter_handle_value) { +FilterBlockReader* BlockBasedTable::ReadFilter ( + const Slice& filter_handle_value, + BlockBasedTable::Rep* rep, + size_t* filter_size) { Slice v = filter_handle_value; BlockHandle filter_handle; if (!filter_handle.DecodeFrom(&v).ok()) { - return; + return nullptr; } - // TODO: We might want to unify with ReadBlock() if we start - // requiring checksum verification in BlockBasedTable::Open. + // TODO: We might want to unify with ReadBlockFromFile() if we start + // requiring checksum verification in Table::Open. ReadOptions opt; BlockContents block; - if (!ReadBlockContents(rep_->file.get(), opt, filter_handle, &block, - rep_->options.env, false).ok()) { - return; + if (!ReadBlockContents(rep->file.get(), opt, filter_handle, &block, + rep->options.env, false).ok()) { + return nullptr; } - if (block.heap_allocated) { - rep_->filter_data = block.data.data(); // Will need to delete later + + if (filter_size) { + *filter_size = block.data.size(); } - rep_->filter = new FilterBlockReader(rep_->options, block.data); + + return new FilterBlockReader( + rep->options, block.data, block.heap_allocated); } -Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { +Status BlockBasedTable::ReadStats( + const Slice& handle_value, Rep* rep, TableStats* table_stats) { + assert(table_stats); + Slice v = handle_value; BlockHandle handle; if (!handle.DecodeFrom(&v).ok()) { @@ -304,15 +443,15 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { stats_block.NewIterator(BytewiseComparator()) ); - auto& table_stats = rep->table_stats; // All pre-defined stats of type uint64_t std::unordered_map predefined_uint64_stats = { - { BlockBasedTableStatsNames::kDataSize, &table_stats.data_size }, - { BlockBasedTableStatsNames::kIndexSize, &table_stats.index_size }, - { BlockBasedTableStatsNames::kRawKeySize, &table_stats.raw_key_size }, - { BlockBasedTableStatsNames::kRawValueSize, &table_stats.raw_value_size }, - { BlockBasedTableStatsNames::kNumDataBlocks, &table_stats.num_data_blocks}, - { BlockBasedTableStatsNames::kNumEntries, &table_stats.num_entries }, + { BlockBasedTableStatsNames::kDataSize, &table_stats->data_size }, + { BlockBasedTableStatsNames::kIndexSize, &table_stats->index_size }, + { BlockBasedTableStatsNames::kRawKeySize, &table_stats->raw_key_size }, + { BlockBasedTableStatsNames::kRawValueSize, &table_stats->raw_value_size }, + { BlockBasedTableStatsNames::kNumDataBlocks, + &table_stats->num_data_blocks }, + { BlockBasedTableStatsNames::kNumEntries, &table_stats->num_entries }, }; std::string last_key; @@ -346,11 +485,11 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { } *(pos->second) = val; } else if (key == BlockBasedTableStatsNames::kFilterPolicy) { - table_stats.filter_policy_name = raw_val.ToString(); + table_stats->filter_policy_name = raw_val.ToString(); } else { // handle user-collected - table_stats.user_collected_stats.insert( - std::make_pair(iter->key().ToString(), raw_val.ToString()) + table_stats->user_collected_stats.insert( + std::make_pair(key, raw_val.ToString()) ); } } @@ -358,19 +497,81 @@ Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { return s; } -static void DeleteBlock(void* arg, void* ignored) { - delete reinterpret_cast(arg); -} +Status BlockBasedTable::GetBlock( + const BlockBasedTable* table, + const BlockHandle& handle, + const ReadOptions& options, + const bool for_compaction, + const Tickers block_cache_miss_ticker, + const Tickers block_cache_hit_ticker, + bool* didIO, + CachableEntry* entry) { + bool no_io = options.read_tier == kBlockCacheTier; + Cache* block_cache = table->rep_->options.block_cache.get(); + auto statistics = table->rep_->options.statistics; + Status s; -static void DeleteCachedBlock(const Slice& key, void* value) { - Block* block = reinterpret_cast(value); - delete block; -} + if (block_cache != nullptr) { + char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = GetCacheKey( + table->rep_->cache_key_prefix, + table->rep_->cache_key_prefix_size, + handle, + cache_key + ); -static void ReleaseBlock(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle); + entry->cache_handle = GetFromBlockCache( + block_cache, + key, + block_cache_miss_ticker, + block_cache_hit_ticker, + table->rep_->options.statistics + ); + + if (entry->cache_handle != nullptr) { + entry->value = + reinterpret_cast(block_cache->Value(entry->cache_handle)); + } else if (no_io) { + // Did not find in block_cache and can't do IO + return Status::Incomplete("no blocking io"); + } else { + Histograms histogram = for_compaction ? + READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; + { + // block for stop watch + StopWatch sw(table->rep_->options.env, statistics, histogram); + s = ReadBlockFromFile( + table->rep_->file.get(), + options, + handle, + &entry->value, + table->rep_->options.env, + didIO + ); + } + if (s.ok()) { + if (options.fill_cache && entry->value->isCachable()) { + entry->cache_handle = block_cache->Insert( + key, entry->value, entry->value->size(), &DeleteCachedBlock); + RecordTick(statistics, BLOCK_CACHE_ADD); + } + } + } + } else if (no_io) { + // Could not read from block_cache and can't do IO + return Status::Incomplete("no blocking io"); + } else { + s = ReadBlockFromFile( + table->rep_->file.get(), + options, + handle, + &entry->value, + table->rep_->options.env, + didIO + ); + } + + return s; } // Convert an index iterator value (i.e., an encoded BlockHandle) @@ -397,153 +598,160 @@ Iterator* BlockBasedTable::BlockReader(void* arg, // We intentionally allow extra stuff in index_value so that we // can add more features in the future. - if (s.ok()) { - if (block_cache != nullptr || block_cache_compressed != nullptr) { - char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - char* end = cache_key; + if (!s.ok()) { + return NewErrorIterator(s); + } - // create key for block cache - if (block_cache != nullptr) { - assert(table->rep_->cache_key_prefix_size != 0); - assert(table->rep_->cache_key_prefix_size <= kMaxCacheKeyPrefixSize); - memcpy(cache_key, table->rep_->cache_key_prefix, - table->rep_->cache_key_prefix_size); - end = EncodeVarint64(cache_key + table->rep_->cache_key_prefix_size, - handle.offset()); - } - Slice key(cache_key, static_cast(end - cache_key)); + if (block_cache != nullptr || block_cache_compressed != nullptr) { + char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + Slice key, /* key to the block cache */ + ckey /* key to the compressed block cache */ ; - // create key for compressed block cache - end = compressed_cache_key; - if (block_cache_compressed != nullptr) { - assert(table->rep_->compressed_cache_key_prefix_size != 0); - assert(table->rep_->compressed_cache_key_prefix_size <= - kMaxCacheKeyPrefixSize); - memcpy(compressed_cache_key, table->rep_->compressed_cache_key_prefix, - table->rep_->compressed_cache_key_prefix_size); - end = EncodeVarint64(compressed_cache_key + - table->rep_->compressed_cache_key_prefix_size, - handle.offset()); - } - Slice ckey(compressed_cache_key, static_cast - (end - compressed_cache_key)); + // create key for block cache + if (block_cache != nullptr) { + key = GetCacheKey( + table->rep_->cache_key_prefix, + table->rep_->cache_key_prefix_size, + handle, + cache_key + ); + } - // Lookup uncompressed cache first - if (block_cache != nullptr) { - cache_handle = block_cache->Lookup(key); - if (cache_handle != nullptr) { - block = reinterpret_cast(block_cache->Value(cache_handle)); - RecordTick(statistics, BLOCK_CACHE_HIT); - } - } + if (block_cache_compressed != nullptr) { + ckey = GetCacheKey( + table->rep_->compressed_cache_key_prefix, + table->rep_->compressed_cache_key_prefix_size, + handle, + compressed_cache_key + ); + } - // If not found in uncompressed cache, lookup compressed cache - if (block == nullptr && block_cache_compressed != nullptr) { - compressed_cache_handle = block_cache_compressed->Lookup(ckey); - - // if we found in the compressed cache, then uncompress and - // insert into uncompressed cache - if (compressed_cache_handle != nullptr) { - // found compressed block - cblock = reinterpret_cast(block_cache_compressed-> - Value(compressed_cache_handle)); - assert(cblock->compressionType() != kNoCompression); - - // Retrieve the uncompressed contents into a new buffer - BlockContents contents; - s = UncompressBlockContents(cblock->data(), cblock->size(), - &contents); - - // Insert uncompressed block into block cache - if (s.ok()) { - block = new Block(contents); // uncompressed block - assert(block->compressionType() == kNoCompression); - if (block_cache != nullptr && block->isCachable() && - options.fill_cache) { - cache_handle = block_cache->Insert(key, block, block->size(), - &DeleteCachedBlock); - assert(reinterpret_cast(block_cache->Value(cache_handle)) - == block); - } - } - // Release hold on compressed cache entry - block_cache_compressed->Release(compressed_cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); - } - } - - if (block != nullptr) { - BumpPerfCount(&perf_context.block_cache_hit_count); - } else if (no_io) { - // Did not find in block_cache and can't do IO - return NewErrorIterator(Status::Incomplete("no blocking io")); + // Lookup uncompressed cache first + if (block_cache != nullptr) { + assert(!key.empty()); + cache_handle = block_cache->Lookup(key); + if (cache_handle != nullptr) { + block = reinterpret_cast(block_cache->Value(cache_handle)); + RecordTick(statistics, BLOCK_CACHE_HIT); + RecordTick(statistics, BLOCK_CACHE_DATA_HIT); } else { - - Histograms histogram = for_compaction ? - READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; - { // block for stop watch - StopWatch sw(table->rep_->options.env, statistics, histogram); - s = ReadBlock( - table->rep_->file.get(), - options, - handle, - &cblock, - table->rep_->options.env, - didIO, - block_cache_compressed == nullptr - ); - } - if (s.ok()) { - assert(cblock->compressionType() == kNoCompression || - block_cache_compressed != nullptr); - - // Retrieve the uncompressed contents into a new buffer - BlockContents contents; - if (cblock->compressionType() != kNoCompression) { - s = UncompressBlockContents(cblock->data(), cblock->size(), - &contents); - } - if (s.ok()) { - if (cblock->compressionType() != kNoCompression) { - block = new Block(contents); // uncompressed block - } else { - block = cblock; - cblock = nullptr; - } - if (block->isCachable() && options.fill_cache) { - // Insert compressed block into compressed block cache. - // Release the hold on the compressed cache entry immediately. - if (block_cache_compressed != nullptr && cblock != nullptr) { - compressed_cache_handle = block_cache_compressed->Insert( - ckey, cblock, cblock->size(), &DeleteCachedBlock); - block_cache_compressed->Release(compressed_cache_handle); - RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); - cblock = nullptr; - } - // insert into uncompressed block cache - assert((block->compressionType() == kNoCompression)); - if (block_cache != nullptr) { - cache_handle = block_cache->Insert( - key, block, block->size(), &DeleteCachedBlock); - RecordTick(statistics, BLOCK_CACHE_MISS); - assert(reinterpret_cast(block_cache->Value( - cache_handle))== block); - } - } - } - } - if (cblock != nullptr) { - delete cblock; - } + RecordTick(statistics, BLOCK_CACHE_MISS); + RecordTick(statistics, BLOCK_CACHE_DATA_MISS); } + } + + // If not found in uncompressed cache, lookup compressed cache + if (block == nullptr && block_cache_compressed != nullptr) { + assert(!ckey.empty()); + compressed_cache_handle = block_cache_compressed->Lookup(ckey); + + // if we found in the compressed cache, then uncompress and + // insert into uncompressed cache + if (compressed_cache_handle != nullptr) { + // found compressed block + cblock = reinterpret_cast(block_cache_compressed-> + Value(compressed_cache_handle)); + assert(cblock->compressionType() != kNoCompression); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + s = UncompressBlockContents(cblock->data(), cblock->size(), + &contents); + + // Insert uncompressed block into block cache + if (s.ok()) { + block = new Block(contents); // uncompressed block + assert(block->compressionType() == kNoCompression); + if (block_cache != nullptr && block->isCachable() && + options.fill_cache) { + cache_handle = block_cache->Insert(key, block, block->size(), + &DeleteCachedBlock); + assert(reinterpret_cast(block_cache->Value(cache_handle)) + == block); + } + } + // Release hold on compressed cache entry + block_cache_compressed->Release(compressed_cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT); + } + } + + if (block != nullptr) { + BumpPerfCount(&perf_context.block_cache_hit_count); } else if (no_io) { - // Could not read from block_cache and can't do IO + // Did not find in block_cache and can't do IO return NewErrorIterator(Status::Incomplete("no blocking io")); } else { - s = ReadBlock(table->rep_->file.get(), options, handle, &block, - table->rep_->options.env, didIO); + Histograms histogram = for_compaction ? + READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; + { // block for stop watch + StopWatch sw(table->rep_->options.env, statistics, histogram); + s = ReadBlockFromFile( + table->rep_->file.get(), + options, + handle, + &cblock, + table->rep_->options.env, + didIO, + block_cache_compressed == nullptr + ); + } + if (s.ok()) { + assert(cblock->compressionType() == kNoCompression || + block_cache_compressed != nullptr); + + // Retrieve the uncompressed contents into a new buffer + BlockContents contents; + if (cblock->compressionType() != kNoCompression) { + s = UncompressBlockContents(cblock->data(), cblock->size(), + &contents); + } + if (s.ok()) { + if (cblock->compressionType() != kNoCompression) { + block = new Block(contents); // uncompressed block + } else { + block = cblock; + cblock = nullptr; + } + if (block->isCachable() && options.fill_cache) { + // Insert compressed block into compressed block cache. + // Release the hold on the compressed cache entry immediately. + if (block_cache_compressed != nullptr && cblock != nullptr) { + compressed_cache_handle = block_cache_compressed->Insert( + ckey, cblock, cblock->size(), &DeleteCachedBlock); + block_cache_compressed->Release(compressed_cache_handle); + RecordTick(statistics, BLOCK_CACHE_COMPRESSED_MISS); + cblock = nullptr; + } + // insert into uncompressed block cache + assert((block->compressionType() == kNoCompression)); + if (block_cache != nullptr) { + cache_handle = block_cache->Insert( + key, block, block->size(), &DeleteCachedBlock); + RecordTick(statistics, BLOCK_CACHE_ADD); + assert(reinterpret_cast(block_cache->Value( + cache_handle))== block); + } + } + } + } + if (cblock != nullptr) { + delete cblock; + } } + } else if (no_io) { + // Could not read from block_cache and can't do IO + return NewErrorIterator(Status::Incomplete("no blocking io")); + } else { + s = ReadBlockFromFile( + table->rep_->file.get(), + options, + handle, + &block, + table->rep_->options.env, + didIO + ); } Iterator* iter; @@ -560,6 +768,102 @@ Iterator* BlockBasedTable::BlockReader(void* arg, return iter; } +BlockBasedTable::CachableEntry +BlockBasedTable::GetFilter(bool no_io) const { + if (!rep_->options.filter_policy || !rep_->options.block_cache) { + return {rep_->filter.get(), nullptr}; + } + + // Fetching from the cache + Cache* block_cache = rep_->options.block_cache.get(); + char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto key = GetCacheKey( + rep_->cache_key_prefix, + rep_->cache_key_prefix_size, + rep_->metaindex_handle, + cache_key + ); + + auto cache_handle = GetFromBlockCache( + block_cache, + key, + BLOCK_CACHE_FILTER_MISS, + BLOCK_CACHE_FILTER_HIT, + rep_->options.statistics + ); + + FilterBlockReader* filter = nullptr; + if (cache_handle != nullptr) { + filter = reinterpret_cast( + block_cache->Value(cache_handle)); + } else if (no_io) { + // Do not invoke any io. + return CachableEntry(); + } else { + size_t filter_size = 0; + std::unique_ptr meta; + std::unique_ptr iter; + auto s = ReadMetaBlock(rep_, &meta, &iter); + + if (s.ok()) { + std::string filter_block_key = kFilterBlockPrefix; + filter_block_key.append(rep_->options.filter_policy->Name()); + iter->Seek(filter_block_key); + + if (iter->Valid() && iter->key() == Slice(filter_block_key)) { + filter = ReadFilter(iter->value(), rep_, &filter_size); + assert(filter); + assert(filter_size > 0); + + cache_handle = block_cache->Insert( + key, filter, filter_size, &DeleteCachedFilter); + RecordTick(rep_->options.statistics, BLOCK_CACHE_ADD); + } + } + } + + return { filter, cache_handle }; +} + +// Get the iterator from the index block. +Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { + if (rep_->index_block) { + assert (!rep_->options.block_cache); + return rep_->index_block->NewIterator(rep_->options.comparator); + } + + // get index block from cache + assert (rep_->options.block_cache); + bool didIO = false; + CachableEntry entry; + + auto s = GetBlock( + this, + rep_->index_handle, + options, + false, /* for compaction */ + BLOCK_CACHE_INDEX_MISS, + BLOCK_CACHE_INDEX_HIT, + &didIO, + &entry + ); + + Iterator* iter; + if (entry.value != nullptr) { + iter = entry.value->NewIterator(rep_->options.comparator); + if (entry.cache_handle) { + iter->RegisterCleanup( + &ReleaseBlock, rep_->options.block_cache.get(), entry.cache_handle + ); + } else { + iter->RegisterCleanup(&DeleteBlock, entry.value, nullptr); + } + } else { + iter = NewErrorIterator(s); + } + return iter; +} + Iterator* BlockBasedTable::BlockReader(void* arg, const ReadOptions& options, const EnvOptions& soptions, @@ -577,27 +881,32 @@ Iterator* BlockBasedTable::BlockReader(void* arg, // 2) Compare(prefix(key), key) <= 0. // 3) If Compare(key1, key2) <= 0, then Compare(prefix(key1), prefix(key2)) <= 0 // -// TODO(tylerharter): right now, this won't cause I/O since blooms are -// in memory. When blooms may need to be paged in, we should refactor so that -// this is only ever called lazily. In particular, this shouldn't be called -// while the DB lock is held like it is now. +// Otherwise, this method guarantees no I/O will be incurred. +// +// REQUIRES: this method shouldn't be called while the DB lock is held. bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) { - FilterBlockReader* filter = rep_->filter; bool may_match = true; Status s; - if (filter == nullptr) { + if (!rep_->options.filter_policy) { return true; } - std::unique_ptr iiter(rep_->index_block->NewIterator( - rep_->options.comparator)); + // To prevent any io operation in this method, we set `read_tier` to make + // sure we always read index or filter only when they have already been + // loaded to memory. + ReadOptions no_io_read_options; + no_io_read_options.read_tier = kBlockCacheTier; + unique_ptr iiter( + IndexBlockReader(no_io_read_options) + ); iiter->Seek(internal_prefix); + if (!iiter->Valid()) { // we're past end of file may_match = false; } else if (ExtractUserKey(iiter->key()).starts_with( - ExtractUserKey(internal_prefix))) { + ExtractUserKey(internal_prefix))) { // we need to check for this subtle case because our only // guarantee is that "the key is a string >= last key in that data // block" according to the doc/table_format.txt spec. @@ -619,7 +928,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) { BlockHandle handle; s = handle.DecodeFrom(&handle_value); assert(s.ok()); - may_match = filter->PrefixMayMatch(handle.offset(), internal_prefix); + auto filter_entry = GetFilter(true /* no io */); + may_match = + filter_entry.value != nullptr && + filter_entry.value->PrefixMayMatch(handle.offset(), internal_prefix); + filter_entry.Release(rep_->options.block_cache.get()); } RecordTick(rep_->options.statistics, BLOOM_FILTER_PREFIX_CHECKED); @@ -641,9 +954,12 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) { } return NewTwoLevelIterator( - rep_->index_block->NewIterator(rep_->options.comparator), - &BlockBasedTable::BlockReader, const_cast(this), - options, rep_->soptions); + IndexBlockReader(options), + &BlockBasedTable::BlockReader, + const_cast(this), + options, + rep_->soptions + ); } Status BlockBasedTable::Get( @@ -654,15 +970,20 @@ Status BlockBasedTable::Get( const Slice& v, bool didIO), void (*mark_key_may_exist_handler)(void* handle_context)) { Status s; - Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); + Iterator* iiter = IndexBlockReader(readOptions); + auto filter_entry = GetFilter(readOptions.read_tier == kBlockCacheTier); + FilterBlockReader* filter = filter_entry.value; bool done = false; for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { Slice handle_value = iiter->value(); - FilterBlockReader* filter = rep_->filter; + BlockHandle handle; - if (filter != nullptr && - handle.DecodeFrom(&handle_value).ok() && - !filter->KeyMayMatch(handle.offset(), key)) { + bool may_not_exist_in_filter = + filter != nullptr && + handle.DecodeFrom(&handle_value).ok() && + !filter->KeyMayMatch(handle.offset(), key); + + if (may_not_exist_in_filter) { // Not found // TODO: think about interaction with Merge. If a user key cannot // cross one data block, we should be fine. @@ -670,7 +991,7 @@ Status BlockBasedTable::Get( break; } else { bool didIO = false; - std::unique_ptr block_iter( + unique_ptr block_iter( BlockReader(this, readOptions, iiter->value(), &didIO)); if (readOptions.read_tier && block_iter->status().IsIncomplete()) { @@ -692,6 +1013,8 @@ Status BlockBasedTable::Get( s = block_iter->status(); } } + + filter_entry.Release(rep_->options.block_cache.get()); if (s.ok()) { s = iiter->status(); } @@ -714,8 +1037,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, } uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { - Iterator* index_iter = - rep_->index_block->NewIterator(rep_->options.comparator); + Iterator* index_iter = IndexBlockReader(ReadOptions()); + index_iter->Seek(key); uint64_t result; if (index_iter->Valid()) { diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index b6e87b2f8..aba6085a5 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -13,6 +13,7 @@ #include "rocksdb/cache.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" +#include "rocksdb/statistics.h" #include "rocksdb/table_stats.h" #include "rocksdb/table.h" #include "util/coding.h" @@ -27,6 +28,7 @@ class RandomAccessFile; struct ReadOptions; class TableCache; class TableReader; +class FilterBlockReader; using std::unique_ptr; @@ -91,6 +93,9 @@ class BlockBasedTable : public TableReader { ~BlockBasedTable(); private: + template + struct CachableEntry; + struct Rep; Rep* rep_; bool compaction_optimized_; @@ -98,9 +103,37 @@ class BlockBasedTable : public TableReader { static Iterator* BlockReader(void*, const ReadOptions&, const EnvOptions& soptions, const Slice&, bool for_compaction); + static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, bool* didIO, bool for_compaction = false); + // if `no_io == true`, we will not try to read filter from sst file + // if it is not cached yet. + CachableEntry GetFilter(bool no_io = false) const; + + Iterator* IndexBlockReader(const ReadOptions& options) const; + + // Read the block, either from sst file or from cache. This method will try + // to read from cache only when block_cache is set or ReadOption doesn't + // explicitly prohibit storage IO. + // + // If the block is read from cache, the statistics for cache miss/hit of the + // the given type of block will be updated. User can specify + // `block_cache_miss_ticker` and `block_cache_hit_ticker` for the statistics + // update. + // + // On success, the `result` parameter will be populated, which contains a + // pointer to the block and its cache handle, which will be nullptr if it's + // not read from the cache. + static Status GetBlock(const BlockBasedTable* table, + const BlockHandle& handle, + const ReadOptions& options, + bool for_compaction, + Tickers block_cache_miss_ticker, + Tickers block_cache_hit_ticker, + bool* didIO, + CachableEntry* result); + // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. // May not make such a call if filter policy says that key is not present. @@ -111,6 +144,22 @@ class BlockBasedTable : public TableReader { void ReadFilter(const Slice& filter_handle_value); static Status ReadStats(const Slice& handle_value, Rep* rep); + // Read the meta block from sst. + static Status ReadMetaBlock( + Rep* rep, + std::unique_ptr* meta_block, + std::unique_ptr* iter); + + // Create the filter from the filter block. + static FilterBlockReader* ReadFilter( + const Slice& filter_handle_value, + Rep* rep, + size_t* filter_size = nullptr); + + // Read the table stats from stats block. + static Status ReadStats( + const Slice& handle_value, Rep* rep, TableStats* stats); + static void SetupCacheKeyPrefix(Rep* rep); explicit BlockBasedTable(Rep* rep) : diff --git a/table/filter_block.cc b/table/filter_block.cc index ce195a8e2..98030a4c3 100644 --- a/table/filter_block.cc +++ b/table/filter_block.cc @@ -127,7 +127,8 @@ void FilterBlockBuilder::GenerateFilter() { start_.clear(); } -FilterBlockReader::FilterBlockReader(const Options& opt, const Slice& contents) +FilterBlockReader::FilterBlockReader( + const Options& opt, const Slice& contents, bool delete_contents_after_use) : policy_(opt.filter_policy), prefix_extractor_(opt.prefix_extractor), whole_key_filtering_(opt.whole_key_filtering), @@ -143,6 +144,9 @@ FilterBlockReader::FilterBlockReader(const Options& opt, const Slice& contents) data_ = contents.data(); offset_ = data_ + last_word; num_ = (n - 5 - last_word) / 4; + if (delete_contents_after_use) { + filter_data.reset(contents.data()); + } } bool FilterBlockReader::KeyMayMatch(uint64_t block_offset, diff --git a/table/filter_block.h b/table/filter_block.h index cea60d862..e47f94653 100644 --- a/table/filter_block.h +++ b/table/filter_block.h @@ -12,6 +12,8 @@ // into a single filter block. #pragma once + +#include #include #include #include @@ -62,7 +64,10 @@ class FilterBlockBuilder { class FilterBlockReader { public: // REQUIRES: "contents" and *policy must stay live while *this is live. - FilterBlockReader(const Options& opt, const Slice& contents); + FilterBlockReader( + const Options& opt, + const Slice& contents, + bool delete_contents_after_use = false); bool KeyMayMatch(uint64_t block_offset, const Slice& key); bool PrefixMayMatch(uint64_t block_offset, const Slice& prefix); @@ -74,6 +79,8 @@ class FilterBlockReader { const char* offset_; // Pointer to beginning of offset array (at block-end) size_t num_; // Number of entries in offset array size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file) + std::unique_ptr filter_data; + bool MayMatch(uint64_t block_offset, const Slice& entry); }; diff --git a/table/table_test.cc b/table/table_test.cc index 07e8a05e8..452819d8e 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -12,18 +12,19 @@ #include #include "db/dbformat.h" +#include "db/db_statistics.h" #include "db/memtable.h" #include "db/write_batch_internal.h" +#include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" -#include "rocksdb/table.h" #include "rocksdb/memtablerep.h" -#include "table/block.h" -#include "table/block_builder.h" -#include "table/format.h" -#include "table/block_based_table_reader.h" #include "table/block_based_table_builder.h" +#include "table/block_based_table_reader.h" +#include "table/block_builder.h" +#include "table/block.h" +#include "table/format.h" #include "util/random.h" #include "util/testharness.h" #include "util/testutil.h" @@ -486,8 +487,7 @@ struct TestArgs { }; -static std::vector Generate_Arg_List() -{ +static std::vector Generate_Arg_List() { std::vector ret; TestType test_type[4] = {TABLE_TEST, BLOCK_TEST, MEMTABLE_TEST, DB_TEST}; int test_type_len = 4; @@ -928,6 +928,181 @@ TEST(TableTest, NumBlockStat) { ); } +class BlockCacheStats { + public: + explicit BlockCacheStats(std::shared_ptr statistics) { + block_cache_miss = + statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + block_cache_hit = + statistics.get()->getTickerCount(BLOCK_CACHE_HIT); + index_block_cache_miss = + statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_MISS); + index_block_cache_hit = + statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_HIT); + data_block_cache_miss = + statistics.get()->getTickerCount(BLOCK_CACHE_DATA_MISS); + data_block_cache_hit = + statistics.get()->getTickerCount(BLOCK_CACHE_DATA_HIT); + } + + // Check if the fetched stats matches the expected ones. + void AssertEqual( + long index_block_cache_miss, + long index_block_cache_hit, + long data_block_cache_miss, + long data_block_cache_hit) const { + ASSERT_EQ(index_block_cache_miss, this->index_block_cache_miss); + ASSERT_EQ(index_block_cache_hit, this->index_block_cache_hit); + ASSERT_EQ(data_block_cache_miss, this->data_block_cache_miss); + ASSERT_EQ(data_block_cache_hit, this->data_block_cache_hit); + ASSERT_EQ( + index_block_cache_miss + data_block_cache_miss, + this->block_cache_miss + ); + ASSERT_EQ( + index_block_cache_hit + data_block_cache_hit, + this->block_cache_hit + ); + } + + private: + long block_cache_miss = 0; + long block_cache_hit = 0; + long index_block_cache_miss = 0; + long index_block_cache_hit = 0; + long data_block_cache_miss = 0; + long data_block_cache_hit = 0; +}; + +TEST(TableTest, BlockCacheTest) { + // -- Table construction + Options options; + options.create_if_missing = true; + options.statistics = CreateDBStatistics(); + options.block_cache = NewLRUCache(1024); + std::vector keys; + KVMap kvmap; + + BlockBasedTableConstructor c(BytewiseComparator()); + c.Add("key", "value"); + c.Finish(options, &keys, &kvmap); + + // -- PART 1: Open with regular block cache. + // Since block_cache is disabled, no cache activities will be involved. + unique_ptr iter; + + // At first, no block will be accessed. + { + BlockCacheStats stats(options.statistics); + // index will be added to block cache. + stats.AssertEqual( + 1, // index block miss + 0, + 0, + 0 + ); + } + + // Only index block will be accessed + { + iter.reset(c.NewIterator()); + BlockCacheStats stats(options.statistics); + // NOTE: to help better highlight the "detla" of each ticker, I use + // + to indicate the increment of changed + // value; other numbers remain the same. + stats.AssertEqual( + 1, + 0 + 1, // index block hit + 0, + 0 + ); + } + + // Only data block will be accessed + { + iter->SeekToFirst(); + BlockCacheStats stats(options.statistics); + stats.AssertEqual( + 1, + 1, + 0 + 1, // data block miss + 0 + ); + } + + // Data block will be in cache + { + iter.reset(c.NewIterator()); + iter->SeekToFirst(); + BlockCacheStats stats(options.statistics); + stats.AssertEqual( + 1, + 1 + 1, // index block hit + 1, + 0 + 1 // data block hit + ); + } + // release the iterator so that the block cache can reset correctly. + iter.reset(); + + // -- PART 2: Open without block cache + options.block_cache.reset(); + options.statistics = CreateDBStatistics(); // reset the stats + c.Reopen(options); + + { + iter.reset(c.NewIterator()); + iter->SeekToFirst(); + ASSERT_EQ("key", iter->key().ToString()); + BlockCacheStats stats(options.statistics); + // Nothing is affected at all + stats.AssertEqual(0, 0, 0, 0); + } + + // -- PART 3: Open with very small block cache + // In this test, no block will ever get hit since the block cache is + // too small to fit even one entry. + options.block_cache = NewLRUCache(1); + c.Reopen(options); + { + BlockCacheStats stats(options.statistics); + stats.AssertEqual( + 1, // index block miss + 0, + 0, + 0 + ); + } + + + { + // Both index and data block get accessed. + // It first cache index block then data block. But since the cache size + // is only 1, index block will be purged after data block is inserted. + iter.reset(c.NewIterator()); + BlockCacheStats stats(options.statistics); + stats.AssertEqual( + 1 + 1, // index block miss + 0, + 0, // data block miss + 0 + ); + } + + { + // SeekToFirst() accesses data block. With similar reason, we expect data + // block's cache miss. + iter->SeekToFirst(); + BlockCacheStats stats(options.statistics); + stats.AssertEqual( + 2, + 0, + 0 + 1, // data block miss + 0 + ); + } +} + TEST(TableTest, ApproximateOffsetOfPlain) { BlockBasedTableConstructor c(BytewiseComparator()); c.Add("k01", "hello"); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 71eabef8f..60324aefc 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1285,11 +1285,11 @@ class StressTest { ttl_state = NumberToString(FLAGS_ttl); } fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); - fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent); - fprintf(stdout, "Prefix percentage : %d\n", FLAGS_prefixpercent); - fprintf(stdout, "Write percentage : %d\n", FLAGS_writepercent); - fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); - fprintf(stdout, "Iterate percentage : %d\n", FLAGS_iterpercent); + fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent); + fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent); + fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent); + fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent); + fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent); fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); fprintf(stdout, "Iterations : %lu\n", FLAGS_num_iterations); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key);