Add lowest_used_cache_tier to ImmutableDBOptions to enable or disable Secondary Cache (#9050)

Summary:
Currently, if Secondary Cache is provided to the lru cache, it is used by default. We add CacheTier to advanced_options.h to describe the cache tier we used. Add a `lowest_used_cache_tier` option to `DBOptions` (immutable) and pass it to BlockBasedTableReader to decide if secondary cache will be used or not. By default it is `CacheTier::kNonVolatileTier`, which means, we always use both block cache (kVolatileTier) and secondary cache (kNonVolatileTier). By set it to `CacheTier::kVolatileTier`, the DB will not use the secondary cache.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9050

Test Plan: added new tests

Reviewed By: anand1976

Differential Revision: D31744769

Pulled By: zhichao-cao

fbshipit-source-id: a0575ebd23e1c6dfcfc2b4c8578764e73b15bce6
This commit is contained in:
Zhichao Cao 2021-10-19 15:53:16 -07:00 committed by Facebook GitHub Bot
parent f20b07cebb
commit 6d93b87588
10 changed files with 410 additions and 32 deletions

View File

@ -26,6 +26,7 @@
* Some fields of SstFileMetaData are deprecated for compatibility with new base class FileStorageInfo.
* Add `file_temperature` to `IngestExternalFileArg` such that when ingesting SST files, we are able to indicate the temperature of the this batch of files.
* If `DB::Close()` failed with a non aborted status, calling `DB::Close()` again will return the original status instead of Status::OK.
* Add CacheTier to advanced_options.h to describe the cache tier we used. Add a `lowest_used_cache_tier` option to `DBOptions` (immutable) and pass it to BlockBasedTableReader. By default it is `CacheTier::kNonVolatileBlockTier`, which means, we always use both block cache (kVolatileTier) and secondary cache (kNonVolatileBlockTier). By set it to `CacheTier::kVolatileTier`, the DB will not use the secondary cache.
### Performance Improvements
* Improved CPU efficiency of building block-based table (SST) files (#9039 and #9040).

View File

@ -695,7 +695,7 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness1) {
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// After Flush is successful, RocksDB will do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. When block_2 is cache miss and read out, it is
// inserted to the block cache. Note that, block_1 is never successfully
@ -789,7 +789,7 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness2) {
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// After Flush is successful, RocksDB will do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. When block_2 is cache miss and read out, it is
// inserted to the block cache. Thefore, block_1 is evicted from block
@ -883,7 +883,7 @@ TEST_F(DBSecondaryCacheTest, NoSecondaryCacheInsertion) {
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// After Flush is successful, RocksDB will do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. Now, block cache is large enough, it cache
// both block_1 and block_2. When first time read block_1 and block_2
@ -985,7 +985,7 @@ TEST_F(DBSecondaryCacheTest, SecondaryCacheFailureTest) {
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// After Flush is successful, RocksDB will do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. When block_2 is cache miss and read out, it is
// inserted to the block cache. Note that, block_1 is never successfully
@ -1543,6 +1543,307 @@ TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
ASSERT_OK(DestroyDB(dbname2, options));
}
// Test the option not to use the secondary cache in a certain DB.
TEST_F(DBSecondaryCacheTest, TestSecondaryCacheOptionBasic) {
LRUCacheOptions opts(4 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
fault_fs_->SetFailGetUniqueId(true);
options.lowest_used_cache_tier = CacheTier::kVolatileTier;
// Set the file paranoid check, so after flush, the file will be read
// all the blocks will be accessed.
options.paranoid_file_checks = true;
DestroyAndReopen(options);
std::string session_id;
ASSERT_OK(db_->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(Put(Key(i + 70), p_v));
}
ASSERT_OK(Flush());
// Flush will trigger the paranoid check and read blocks. But only block cache
// will be read. No operations for secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
Compact("a", "z");
// Compaction will also insert and evict blocks, no operations to the block
// cache. No operations for secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
std::string v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// Check the data in first block. Cache miss, direclty read from SST file.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// Check the second block.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// block cache hit
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
v = Get(Key(70));
ASSERT_EQ(1007, v.size());
// Check the first block in the second SST file. Cache miss and trigger SST
// file read. No operations for secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
v = Get(Key(75));
ASSERT_EQ(1007, v.size());
// Check the second block in the second SST file. Cache miss and trigger SST
// file read. No operations for secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
Destroy(options);
}
// We disable the secondary cache in DBOptions at first. Close and reopen the DB
// with new options, which set the lowest_used_cache_tier to
// kNonVolatileBlockTier. So secondary cache will be used.
TEST_F(DBSecondaryCacheTest, TestSecondaryCacheOptionChange) {
LRUCacheOptions opts(4 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
fault_fs_->SetFailGetUniqueId(true);
options.lowest_used_cache_tier = CacheTier::kVolatileTier;
// Set the file paranoid check, so after flush, the file will be read
// all the blocks will be accessed.
options.paranoid_file_checks = true;
DestroyAndReopen(options);
std::string session_id;
ASSERT_OK(db_->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(Put(Key(i + 70), p_v));
}
ASSERT_OK(Flush());
// Flush will trigger the paranoid check and read blocks. But only block cache
// will be read.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
Compact("a", "z");
// Compaction will also insert and evict blocks, no operations to the block
// cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
std::string v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// Check the data in first block. Cache miss, direclty read from SST file.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// Check the second block.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// block cache hit
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
// Change the option to enable secondary cache after we Reopen the DB
options.lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier;
Reopen(options);
v = Get(Key(70));
ASSERT_EQ(1007, v.size());
// Enable the secondary cache, trigger lookup of the first block in second SST
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 1u);
v = Get(Key(75));
ASSERT_EQ(1007, v.size());
// trigger lookup of the second block in second SST
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
Destroy(options);
}
// Two DB test. We create 2 DBs sharing the same block cache and secondary
// cache. We diable the secondary cache option for DB2.
TEST_F(DBSecondaryCacheTest, TestSecondaryCacheOptionTwoDB) {
LRUCacheOptions opts(4 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
options.paranoid_file_checks = true;
std::string dbname1 = test::PerThreadDBPath("db_t_1");
ASSERT_OK(DestroyDB(dbname1, options));
DB* db1 = nullptr;
ASSERT_OK(DB::Open(options, dbname1, &db1));
std::string dbname2 = test::PerThreadDBPath("db_t_2");
ASSERT_OK(DestroyDB(dbname2, options));
DB* db2 = nullptr;
Options options2 = options;
options2.lowest_used_cache_tier = CacheTier::kVolatileTier;
ASSERT_OK(DB::Open(options2, dbname2, &db2));
fault_fs_->SetFailGetUniqueId(true);
// Set the file paranoid check, so after flush, the file will be read
// all the blocks will be accessed.
std::string session_id;
ASSERT_OK(db1->GetDbSessionId(session_id));
secondary_cache->SetDbSessionId(session_id);
WriteOptions wo;
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(db1->Put(wo, Key(i), p_v));
}
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 0u);
ASSERT_OK(db1->Flush(FlushOptions()));
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(db2->Put(wo, Key(i), p_v));
}
// No change in the secondary cache, since it is disabled in DB2
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
ASSERT_OK(db2->Flush(FlushOptions()));
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
Slice bg("a");
Slice ed("b");
ASSERT_OK(db1->CompactRange(CompactRangeOptions(), &bg, &ed));
ASSERT_OK(db2->CompactRange(CompactRangeOptions(), &bg, &ed));
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
ReadOptions ro;
std::string v;
ASSERT_OK(db1->Get(ro, Key(0), &v));
ASSERT_EQ(1007, v.size());
// DB 1 has lookup block 1 and it is miss in block cache, trigger secondary
// cache lookup
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 3u);
ASSERT_OK(db1->Get(ro, Key(5), &v));
ASSERT_EQ(1007, v.size());
// DB 1 lookup the second block and it is miss in block cache, trigger
// secondary cache lookup
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 4u);
ASSERT_OK(db2->Get(ro, Key(0), &v));
ASSERT_EQ(1007, v.size());
// For db2, it is not enabled with secondary cache, so no search in the
// secondary cache
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 4u);
ASSERT_OK(db2->Get(ro, Key(5), &v));
ASSERT_EQ(1007, v.size());
// For db2, it is not enabled with secondary cache, so no search in the
// secondary cache
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 4u);
fault_fs_->SetFailGetUniqueId(false);
fault_fs_->SetFilesystemActive(true);
delete db1;
delete db2;
ASSERT_OK(DestroyDB(dbname1, options));
ASSERT_OK(DestroyDB(dbname2, options));
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE

View File

@ -202,6 +202,14 @@ enum class Temperature : uint8_t {
kCold = 0x0C,
};
// The control option of how the cache tiers will be used. Currently rocksdb
// support block cahe (volatile tier), secondary cache (non-volatile tier).
// In the future, we may add more caching layers.
enum class CacheTier : uint8_t {
kVolatileTier = 0,
kNonVolatileBlockTier = 0x01,
};
enum UpdateStatus { // Return status For inplace update callback
UPDATE_FAILED = 0, // Nothing to update
UPDATED_INPLACE = 1, // Value updated inplace

View File

@ -1336,6 +1336,18 @@ struct DBOptions {
// backward/forward compatibility support for now. Some known issues are still
// under development.
std::shared_ptr<CompactionService> compaction_service = nullptr;
// It indicates, which lowest cache tier we want to
// use for a certain DB. Currently we support volatile_tier and
// non_volatile_tier. They are layered. By setting it to kVolatileTier, only
// the block cache (current implemented volatile_tier) is used. So
// cache entries will not spill to secondary cache (current
// implemented non_volatile_tier), and block cache lookup misses will not
// lookup in the secondary cache. When kNonVolatileBlockTier is used, we use
// both block cache and secondary cache.
//
// Default: kNonVolatileBlockTier
CacheTier lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier;
};
// Options to control the behavior of a database (passed to DB::Open)

View File

@ -41,6 +41,10 @@ static std::unordered_map<std::string, DBOptions::AccessHint>
{"SEQUENTIAL", DBOptions::AccessHint::SEQUENTIAL},
{"WILLNEED", DBOptions::AccessHint::WILLNEED}};
static std::unordered_map<std::string, CacheTier> cache_tier_string_map = {
{"kVolatileTier", CacheTier::kVolatileTier},
{"kNonVolatileBlockTier", CacheTier::kNonVolatileBlockTier}};
static std::unordered_map<std::string, InfoLogLevel> info_log_level_string_map =
{{"DEBUG_LEVEL", InfoLogLevel::DEBUG_LEVEL},
{"INFO_LEVEL", InfoLogLevel::INFO_LEVEL},
@ -524,6 +528,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
return Status::OK();
},
nullptr}},
{"lowest_used_cache_tier",
OptionTypeInfo::Enum<CacheTier>(
offsetof(struct ImmutableDBOptions, lowest_used_cache_tier),
&cache_tier_string_map, OptionTypeFlags::kNone)},
};
const std::string OptionsHelper::kDBOptionsName = "DBOptions";
@ -723,6 +731,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_data_in_errors(options.allow_data_in_errors),
db_host_id(options.db_host_id),
checksum_handoff_file_types(options.checksum_handoff_file_types),
lowest_used_cache_tier(options.lowest_used_cache_tier),
compaction_service(options.compaction_service) {
stats = statistics.get();
fs = env->GetFileSystem();

View File

@ -99,6 +99,7 @@ struct ImmutableDBOptions {
bool allow_data_in_errors;
std::string db_host_id;
FileTypeSet checksum_handoff_file_types;
CacheTier lowest_used_cache_tier;
// Convenience/Helper objects that are not part of the base DBOptions
std::shared_ptr<FileSystem> fs;
SystemClock* clock;

View File

@ -186,6 +186,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.allow_data_in_errors = immutable_db_options.allow_data_in_errors;
options.checksum_handoff_file_types =
immutable_db_options.checksum_handoff_file_types;
options.lowest_used_cache_tier = immutable_db_options.lowest_used_cache_tier;
return options;
}

View File

@ -342,6 +342,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"max_bgerror_resume_count=2;"
"bgerror_resume_retry_interval=1000000"
"db_host_id=hostname;"
"lowest_used_cache_tier=kNonVolatileBlockTier;"
"allow_data_in_errors=false",
new_options));

View File

@ -353,12 +353,17 @@ void BlockBasedTable::UpdateCacheInsertionMetrics(
}
Cache::Handle* BlockBasedTable::GetEntryFromCache(
Cache* block_cache, const Slice& key, BlockType block_type, const bool wait,
GetContext* get_context, const Cache::CacheItemHelper* cache_helper,
const CacheTier& cache_tier, Cache* block_cache, const Slice& key,
BlockType block_type, const bool wait, GetContext* get_context,
const Cache::CacheItemHelper* cache_helper,
const Cache::CreateCallback& create_cb, Cache::Priority priority) const {
auto cache_handle =
block_cache->Lookup(key, cache_helper, create_cb, priority, wait,
rep_->ioptions.statistics.get());
Cache::Handle* cache_handle = nullptr;
if (cache_tier == CacheTier::kNonVolatileBlockTier) {
cache_handle = block_cache->Lookup(key, cache_helper, create_cb, priority,
wait, rep_->ioptions.statistics.get());
} else {
cache_handle = block_cache->Lookup(key, rep_->ioptions.statistics.get());
}
if (cache_handle != nullptr) {
UpdateCacheHitMetrics(block_type, get_context,
@ -370,6 +375,23 @@ Cache::Handle* BlockBasedTable::GetEntryFromCache(
return cache_handle;
}
template <typename TBlocklike>
Status BlockBasedTable::InsertEntryToCache(
const CacheTier& cache_tier, Cache* block_cache, const Slice& key,
const Cache::CacheItemHelper* cache_helper,
std::unique_ptr<TBlocklike>& block_holder, size_t charge,
Cache::Handle** cache_handle, Cache::Priority priority) const {
Status s = Status::OK();
if (cache_tier == CacheTier::kNonVolatileBlockTier) {
s = block_cache->Insert(key, block_holder.get(), cache_helper, charge,
cache_handle, priority);
} else {
s = block_cache->Insert(key, block_holder.get(), charge,
cache_helper->del_cb, cache_handle, priority);
}
return s;
}
// Helper function to setup the cache key's prefix for the Table.
void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep,
const std::string& db_session_id,
@ -1174,8 +1196,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Lookup uncompressed cache first
if (block_cache != nullptr) {
auto cache_handle = GetEntryFromCache(
block_cache, block_cache_key, block_type, wait, get_context,
Cache::Handle* cache_handle = nullptr;
cache_handle = GetEntryFromCache(
rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
block_type, wait, get_context,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), create_cb,
priority);
if (cache_handle != nullptr) {
@ -1195,12 +1219,18 @@ Status BlockBasedTable::GetDataBlockFromCache(
assert(!compressed_block_cache_key.empty());
BlockContents contents;
Cache::CreateCallback create_cb_special = GetCreateCallback<BlockContents>(
read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
block_cache_compressed_handle = block_cache_compressed->Lookup(
compressed_block_cache_key,
BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
create_cb_special, priority, true);
if (rep_->ioptions.lowest_used_cache_tier ==
CacheTier::kNonVolatileBlockTier) {
Cache::CreateCallback create_cb_special = GetCreateCallback<BlockContents>(
read_amp_bytes_per_bit, statistics, using_zstd, filter_policy);
block_cache_compressed_handle = block_cache_compressed->Lookup(
compressed_block_cache_key,
BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
create_cb_special, priority, true);
} else {
block_cache_compressed_handle =
block_cache_compressed->Lookup(compressed_block_cache_key, statistics);
}
// if we found in the compressed cache, then uncompress and insert into
// uncompressed cache
@ -1237,10 +1267,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
read_options.fill_cache) {
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = block_cache->Insert(
block_cache_key, block_holder.get(),
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), charge,
&cache_handle, priority);
s = InsertEntryToCache(
rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
block_holder, charge, &cache_handle, priority);
if (s.ok()) {
assert(cache_handle != nullptr);
block->SetCachedValue(block_holder.release(), block_cache,
@ -1325,18 +1355,23 @@ Status BlockBasedTable::PutDataBlockToCache(
// We cannot directly put raw_block_contents because this could point to
// an object in the stack.
BlockContents* block_cont_for_comp_cache =
new BlockContents(std::move(*raw_block_contents));
s = block_cache_compressed->Insert(
compressed_block_cache_key, block_cont_for_comp_cache,
std::unique_ptr<BlockContents> block_cont_for_comp_cache(
new BlockContents(std::move(*raw_block_contents)));
s = InsertEntryToCache(
rep_->ioptions.lowest_used_cache_tier, block_cache_compressed,
compressed_block_cache_key,
BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
block_cont_for_comp_cache->ApproximateMemoryUsage());
block_cont_for_comp_cache,
block_cont_for_comp_cache->ApproximateMemoryUsage(), nullptr,
Cache::Priority::LOW);
BlockContents* block_cont_raw_ptr = block_cont_for_comp_cache.release();
if (s.ok()) {
// Avoid the following code to delete this cached block.
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
delete block_cont_for_comp_cache;
delete block_cont_raw_ptr;
}
}
@ -1344,10 +1379,10 @@ Status BlockBasedTable::PutDataBlockToCache(
if (block_cache != nullptr && block_holder->own_bytes()) {
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = block_cache->Insert(
block_cache_key, block_holder.get(),
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), charge,
&cache_handle, priority);
s = InsertEntryToCache(
rep_->ioptions.lowest_used_cache_tier, block_cache, block_cache_key,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type),
block_holder, charge, &cache_handle, priority);
if (s.ok()) {
assert(cache_handle != nullptr);
cached_block->SetCachedValue(block_holder.release(), block_cache,

View File

@ -275,13 +275,22 @@ class BlockBasedTable : public TableReader {
void UpdateCacheMissMetrics(BlockType block_type,
GetContext* get_context) const;
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
Cache::Handle* GetEntryFromCache(const CacheTier& cache_tier,
Cache* block_cache, const Slice& key,
BlockType block_type, const bool wait,
GetContext* get_context,
const Cache::CacheItemHelper* cache_helper,
const Cache::CreateCallback& create_cb,
Cache::Priority priority) const;
template <typename TBlocklike>
Status InsertEntryToCache(const CacheTier& cache_tier, Cache* block_cache,
const Slice& key,
const Cache::CacheItemHelper* cache_helper,
std::unique_ptr<TBlocklike>& block_holder,
size_t charge, Cache::Handle** cache_handle,
Cache::Priority priority) const;
// Either Block::NewDataIterator() or Block::NewIndexIterator().
template <typename TBlockIter>
static TBlockIter* InitBlockIterator(const Rep* rep, Block* block,