Introduce a mechanism to dump out blocks from block cache and re-insert to secondary cache (#8912)

Summary:
Background: Cache warming up will cause potential read performance degradation due to reading blocks from storage to the block cache. Since in production, the workload and access pattern to a certain DB is stable, it is a potential solution to dump out the blocks belonging to a certain DB to persist storage (e.g., to a file) and bulk-load the blocks to Secondary cache before the DB is relaunched. For example, when migrating a DB form host A to host B, it will take a short period of time, the access pattern to blocks in the block cache will not change much. It is efficient to dump out the blocks of certain DB, migrate to the destination host and insert them to the Secondary cache before we relaunch the DB.

Design: we introduce the interface of CacheDumpWriter and CacheDumpRead for user to store the blocks dumped out from block cache. RocksDB will encode all the information and send the string to the writer. User can implement their own writer it they want. CacheDumper and CacheLoad are introduced to save the blocks and load the blocks respectively.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8912

Test Plan: add new tests to lru_cache_test and pass make check.

Reviewed By: pdillinger

Differential Revision: D31452871

Pulled By: zhichao-cao

fbshipit-source-id: 11ab4f5d03e383f476947116361d54188d36ec48
This commit is contained in:
Zhichao Cao 2021-10-07 11:40:20 -07:00 committed by Facebook GitHub Bot
parent fe994bbd0b
commit 699f45049d
12 changed files with 1437 additions and 14 deletions

View File

@ -851,6 +851,8 @@ set(SOURCES
utilities/blob_db/blob_db_impl_filesnapshot.cc
utilities/blob_db/blob_dump_tool.cc
utilities/blob_db/blob_file.cc
utilities/cache_dump_load.cc
utilities/cache_dump_load_impl.cc
utilities/cassandra/cassandra_compaction_filter.cc
utilities/cassandra/format.cc
utilities/cassandra/merge_operator.cc

View File

@ -8,6 +8,7 @@
* Print information about blob files when using "ldb list_live_files_metadata"
* Provided support for SingleDelete with user defined timestamp.
* Add remote compaction read/write bytes statistics: `REMOTE_COMPACT_READ_BYTES`, `REMOTE_COMPACT_WRITE_BYTES`.
* Introduce an experimental feature to dump out the blocks from block cache and insert them to the secondary cache to reduce the cache warmup time (e.g., used while migrating DB instance). More information are in `class CacheDumper` and `CacheDumpedLoader` at `rocksdb/utilities/cache_dump_load.h` Note that, this feature is subject to the potential change in the future, it is still experimental.
### Public API change
* Made SystemClock extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.

View File

@ -370,6 +370,8 @@ cpp_library(
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
"utilities/blob_db/blob_dump_tool.cc",
"utilities/blob_db/blob_file.cc",
"utilities/cache_dump_load.cc",
"utilities/cache_dump_load_impl.cc",
"utilities/cassandra/cassandra_compaction_filter.cc",
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",
@ -693,6 +695,8 @@ cpp_library(
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
"utilities/blob_db/blob_dump_tool.cc",
"utilities/blob_db/blob_file.cc",
"utilities/cache_dump_load.cc",
"utilities/cache_dump_load_impl.cc",
"utilities/cassandra/cassandra_compaction_filter.cc",
"utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc",

View File

@ -15,9 +15,11 @@
#include "rocksdb/cache.h"
#include "rocksdb/io_status.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/utilities/cache_dump_load.h"
#include "test_util/testharness.h"
#include "util/coding.h"
#include "util/random.h"
#include "utilities/cache_dump_load_impl.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
@ -1182,6 +1184,367 @@ TEST_F(DBSecondaryCacheTest, TestSecondaryCacheMultiGet) {
Destroy(options);
}
class LRUCacheWithStat : public LRUCache {
public:
LRUCacheWithStat(
size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit,
double _high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> _memory_allocator = nullptr,
bool _use_adaptive_mutex = kDefaultToAdaptiveMutex,
CacheMetadataChargePolicy _metadata_charge_policy =
kDontChargeCacheMetadata,
const std::shared_ptr<SecondaryCache>& _secondary_cache = nullptr)
: LRUCache(_capacity, _num_shard_bits, _strict_capacity_limit,
_high_pri_pool_ratio, _memory_allocator, _use_adaptive_mutex,
_metadata_charge_policy, _secondary_cache) {
insert_count_ = 0;
lookup_count_ = 0;
}
~LRUCacheWithStat() {}
Status Insert(const Slice& key, void* value, size_t charge, DeleterFn deleter,
Handle** handle, Priority priority) override {
insert_count_++;
return LRUCache::Insert(key, value, charge, deleter, handle, priority);
}
Status Insert(const Slice& key, void* value, const CacheItemHelper* helper,
size_t chargge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
insert_count_++;
return LRUCache::Insert(key, value, helper, chargge, handle, priority);
}
Handle* Lookup(const Slice& key, Statistics* stats) override {
lookup_count_++;
return LRUCache::Lookup(key, stats);
}
Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
const CreateCallback& create_cb, Priority priority, bool wait,
Statistics* stats = nullptr) override {
lookup_count_++;
return LRUCache::Lookup(key, helper, create_cb, priority, wait, stats);
}
uint32_t GetInsertCount() { return insert_count_; }
uint32_t GetLookupcount() { return lookup_count_; }
void ResetCount() {
insert_count_ = 0;
lookup_count_ = 0;
}
private:
uint32_t insert_count_;
uint32_t lookup_count_;
};
#ifndef ROCKSDB_LITE
TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadBasic) {
LRUCacheOptions cache_opts(1024 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
LRUCacheWithStat* tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.memory_allocator, cache_opts.use_adaptive_mutex,
cache_opts.metadata_charge_policy, cache_opts.secondary_cache);
std::shared_ptr<Cache> cache(tmp_cache);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
DestroyAndReopen(options);
fault_fs_->SetFailGetUniqueId(true);
Random rnd(301);
const int N = 256;
std::vector<std::string> value;
char buf[1000];
memset(buf, 'a', 1000);
value.resize(N);
for (int i = 0; i < N; i++) {
// std::string p_v = rnd.RandomString(1000);
std::string p_v(buf, 1000);
value[i] = p_v;
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
Compact("a", "z");
// do th eread for all the key value pairs, so all the blocks should be in
// cache
uint32_t start_insert = tmp_cache->GetInsertCount();
uint32_t start_lookup = tmp_cache->GetLookupcount();
std::string v;
for (int i = 0; i < N; i++) {
v = Get(Key(i));
ASSERT_EQ(v, value[i]);
}
uint32_t dump_insert = tmp_cache->GetInsertCount() - start_insert;
uint32_t dump_lookup = tmp_cache->GetLookupcount() - start_lookup;
ASSERT_EQ(63,
static_cast<int>(dump_insert)); // the insert in the block cache
ASSERT_EQ(256,
static_cast<int>(dump_lookup)); // the lookup in the block cache
// We have enough blocks in the block cache
CacheDumpOptions cd_options;
cd_options.clock = fault_env_->GetSystemClock().get();
std::string dump_path = db_->GetName() + "/cache_dump";
std::unique_ptr<CacheDumpWriter> dump_writer;
Status s = NewToFileCacheDumpWriter(fault_fs_, FileOptions(), dump_path,
&dump_writer);
ASSERT_OK(s);
std::unique_ptr<CacheDumper> cache_dumper;
s = NewDefaultCacheDumper(cd_options, cache, std::move(dump_writer),
&cache_dumper);
ASSERT_OK(s);
std::vector<DB*> db_list;
db_list.push_back(db_);
s = cache_dumper->SetDumpFilter(db_list);
ASSERT_OK(s);
s = cache_dumper->DumpCacheEntriesToWriter();
ASSERT_OK(s);
cache_dumper.reset();
// we have a new cache it is empty, then, before we do the Get, we do the
// dumpload
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048 * 1024);
cache_opts.secondary_cache = secondary_cache;
tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.memory_allocator, cache_opts.use_adaptive_mutex,
cache_opts.metadata_charge_policy, cache_opts.secondary_cache);
std::shared_ptr<Cache> cache_new(tmp_cache);
table_options.block_cache = cache_new;
table_options.block_size = 4 * 1024;
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
// start to load the data to new block cache
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
std::unique_ptr<CacheDumpReader> dump_reader;
s = NewFromFileCacheDumpReader(fault_fs_, FileOptions(), dump_path,
&dump_reader);
ASSERT_OK(s);
std::unique_ptr<CacheDumpedLoader> cache_loader;
s = NewDefaultCacheDumpedLoader(cd_options, table_options, secondary_cache,
std::move(dump_reader), &cache_loader);
ASSERT_OK(s);
s = cache_loader->RestoreCacheEntriesToSecondaryCache();
ASSERT_OK(s);
uint32_t load_insert = secondary_cache->num_inserts() - start_insert;
uint32_t load_lookup = secondary_cache->num_lookups() - start_lookup;
// check the number we inserted
ASSERT_EQ(64, static_cast<int>(load_insert));
ASSERT_EQ(0, static_cast<int>(load_lookup));
ASSERT_OK(s);
Reopen(options);
// After load, we do the Get again
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
uint32_t cache_insert = tmp_cache->GetInsertCount();
uint32_t cache_lookup = tmp_cache->GetLookupcount();
for (int i = 0; i < N; i++) {
v = Get(Key(i));
ASSERT_EQ(v, value[i]);
}
uint32_t final_insert = secondary_cache->num_inserts() - start_insert;
uint32_t final_lookup = secondary_cache->num_lookups() - start_lookup;
// no insert to secondary cache
ASSERT_EQ(0, static_cast<int>(final_insert));
// lookup the secondary to get all blocks
ASSERT_EQ(64, static_cast<int>(final_lookup));
uint32_t block_insert = tmp_cache->GetInsertCount() - cache_insert;
uint32_t block_lookup = tmp_cache->GetLookupcount() - cache_lookup;
// Check the new block cache insert and lookup, should be no insert since all
// blocks are from the secondary cache.
ASSERT_EQ(0, static_cast<int>(block_insert));
ASSERT_EQ(256, static_cast<int>(block_lookup));
fault_fs_->SetFailGetUniqueId(false);
Destroy(options);
}
TEST_F(DBSecondaryCacheTest, LRUCacheDumpLoadWithFilter) {
LRUCacheOptions cache_opts(1024 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
LRUCacheWithStat* tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.memory_allocator, cache_opts.use_adaptive_mutex,
cache_opts.metadata_charge_policy, cache_opts.secondary_cache);
std::shared_ptr<Cache> cache(tmp_cache);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
std::string dbname1 = test::PerThreadDBPath("db_1");
ASSERT_OK(DestroyDB(dbname1, options));
DB* db1 = nullptr;
ASSERT_OK(DB::Open(options, dbname1, &db1));
std::string dbname2 = test::PerThreadDBPath("db_2");
ASSERT_OK(DestroyDB(dbname2, options));
DB* db2 = nullptr;
ASSERT_OK(DB::Open(options, dbname2, &db2));
fault_fs_->SetFailGetUniqueId(true);
// write the KVs to db1
Random rnd(301);
const int N = 256;
std::vector<std::string> value1;
WriteOptions wo;
char buf[1000];
memset(buf, 'a', 1000);
value1.resize(N);
for (int i = 0; i < N; i++) {
std::string p_v(buf, 1000);
value1[i] = p_v;
ASSERT_OK(db1->Put(wo, Key(i), p_v));
}
ASSERT_OK(db1->Flush(FlushOptions()));
Slice bg("a");
Slice ed("b");
ASSERT_OK(db1->CompactRange(CompactRangeOptions(), &bg, &ed));
// Write the KVs to DB2
std::vector<std::string> value2;
memset(buf, 'b', 1000);
value2.resize(N);
for (int i = 0; i < N; i++) {
std::string p_v(buf, 1000);
value2[i] = p_v;
ASSERT_OK(db2->Put(wo, Key(i), p_v));
}
ASSERT_OK(db2->Flush(FlushOptions()));
ASSERT_OK(db2->CompactRange(CompactRangeOptions(), &bg, &ed));
// do th eread for all the key value pairs, so all the blocks should be in
// cache
uint32_t start_insert = tmp_cache->GetInsertCount();
uint32_t start_lookup = tmp_cache->GetLookupcount();
ReadOptions ro;
std::string v;
for (int i = 0; i < N; i++) {
ASSERT_OK(db1->Get(ro, Key(i), &v));
ASSERT_EQ(v, value1[i]);
}
for (int i = 0; i < N; i++) {
ASSERT_OK(db2->Get(ro, Key(i), &v));
ASSERT_EQ(v, value2[i]);
}
uint32_t dump_insert = tmp_cache->GetInsertCount() - start_insert;
uint32_t dump_lookup = tmp_cache->GetLookupcount() - start_lookup;
ASSERT_EQ(128,
static_cast<int>(dump_insert)); // the insert in the block cache
ASSERT_EQ(512,
static_cast<int>(dump_lookup)); // the lookup in the block cache
// We have enough blocks in the block cache
CacheDumpOptions cd_options;
cd_options.clock = fault_env_->GetSystemClock().get();
std::string dump_path = db1->GetName() + "/cache_dump";
std::unique_ptr<CacheDumpWriter> dump_writer;
Status s = NewToFileCacheDumpWriter(fault_fs_, FileOptions(), dump_path,
&dump_writer);
ASSERT_OK(s);
std::unique_ptr<CacheDumper> cache_dumper;
s = NewDefaultCacheDumper(cd_options, cache, std::move(dump_writer),
&cache_dumper);
ASSERT_OK(s);
std::vector<DB*> db_list;
db_list.push_back(db1);
s = cache_dumper->SetDumpFilter(db_list);
ASSERT_OK(s);
s = cache_dumper->DumpCacheEntriesToWriter();
ASSERT_OK(s);
cache_dumper.reset();
// we have a new cache it is empty, then, before we do the Get, we do the
// dumpload
std::shared_ptr<TestSecondaryCache> secondary_cache =
std::make_shared<TestSecondaryCache>(2048 * 1024);
cache_opts.secondary_cache = secondary_cache;
tmp_cache = new LRUCacheWithStat(
cache_opts.capacity, cache_opts.num_shard_bits,
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
cache_opts.memory_allocator, cache_opts.use_adaptive_mutex,
cache_opts.metadata_charge_policy, cache_opts.secondary_cache);
std::shared_ptr<Cache> cache_new(tmp_cache);
table_options.block_cache = cache_new;
table_options.block_size = 4 * 1024;
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.env = fault_env_.get();
// Start the cache loading process
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
std::unique_ptr<CacheDumpReader> dump_reader;
s = NewFromFileCacheDumpReader(fault_fs_, FileOptions(), dump_path,
&dump_reader);
ASSERT_OK(s);
std::unique_ptr<CacheDumpedLoader> cache_loader;
s = NewDefaultCacheDumpedLoader(cd_options, table_options, secondary_cache,
std::move(dump_reader), &cache_loader);
ASSERT_OK(s);
s = cache_loader->RestoreCacheEntriesToSecondaryCache();
ASSERT_OK(s);
uint32_t load_insert = secondary_cache->num_inserts() - start_insert;
uint32_t load_lookup = secondary_cache->num_lookups() - start_lookup;
// check the number we inserted
ASSERT_EQ(64, static_cast<int>(load_insert));
ASSERT_EQ(0, static_cast<int>(load_lookup));
ASSERT_OK(s);
ASSERT_OK(db1->Close());
delete db1;
ASSERT_OK(DB::Open(options, dbname1, &db1));
// After load, we do the Get again. To validate the cache, we do not allow any
// I/O, so we set the file system to false.
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
fault_fs_->SetFilesystemActive(false, error_msg);
start_insert = secondary_cache->num_inserts();
start_lookup = secondary_cache->num_lookups();
uint32_t cache_insert = tmp_cache->GetInsertCount();
uint32_t cache_lookup = tmp_cache->GetLookupcount();
for (int i = 0; i < N; i++) {
ASSERT_OK(db1->Get(ro, Key(i), &v));
ASSERT_EQ(v, value1[i]);
}
uint32_t final_insert = secondary_cache->num_inserts() - start_insert;
uint32_t final_lookup = secondary_cache->num_lookups() - start_lookup;
// no insert to secondary cache
ASSERT_EQ(0, static_cast<int>(final_insert));
// lookup the secondary to get all blocks
ASSERT_EQ(64, static_cast<int>(final_lookup));
uint32_t block_insert = tmp_cache->GetInsertCount() - cache_insert;
uint32_t block_lookup = tmp_cache->GetLookupcount() - cache_lookup;
// Check the new block cache insert and lookup, should be no insert since all
// blocks are from the secondary cache.
ASSERT_EQ(0, static_cast<int>(block_insert));
ASSERT_EQ(256, static_cast<int>(block_lookup));
fault_fs_->SetFailGetUniqueId(false);
fault_fs_->SetFilesystemActive(true);
delete db1;
delete db2;
ASSERT_OK(DestroyDB(dbname1, options));
ASSERT_OK(DestroyDB(dbname2, options));
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -23,17 +23,17 @@
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
Status WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<WritableFileWriter>* writer,
IODebugContext* dbg) {
IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<WritableFileWriter>* writer,
IODebugContext* dbg) {
std::unique_ptr<FSWritableFile> file;
Status s = fs->NewWritableFile(fname, file_opts, &file, dbg);
if (s.ok()) {
IOStatus io_s = fs->NewWritableFile(fname, file_opts, &file, dbg);
if (io_s.ok()) {
writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
}
return s;
return io_s;
}
IOStatus WritableFileWriter::Append(const Slice& data,

View File

@ -200,10 +200,10 @@ class WritableFileWriter {
}
}
static Status Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<WritableFileWriter>* writer,
IODebugContext* dbg);
static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<WritableFileWriter>* writer,
IODebugContext* dbg);
WritableFileWriter(const WritableFileWriter&) = delete;
WritableFileWriter& operator=(const WritableFileWriter&) = delete;

View File

@ -0,0 +1,142 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <set>
#include "rocksdb/cache.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/io_status.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/table.h"
namespace ROCKSDB_NAMESPACE {
// The classes and functions in this header file is used for dumping out the
// blocks in a block cache, storing or transfering the blocks to another
// destination host, and load these blocks to the secondary cache at destination
// host.
// NOTE that: The classes, functions, and data structures are EXPERIMENTAL! They
// my be changed in the future when the development continues.
// The major and minor version number of the data format to be stored/trandfered
// via CacheDumpWriter and read out via CacheDumpReader
static const int kCacheDumpMajorVersion = 0;
static const int kCacheDumpMinorVersion = 1;
// NOTE that: this class is EXPERIMENTAL! May be changed in the future!
// This is an abstract class to write or transfer the data that is created by
// CacheDumper. We pack one block with its block type, dump time, block key in
// the block cache, block len, block crc32c checksum and block itself as a unit
// and it is stored via WritePacket. Before we call WritePacket, we must call
// WriteMetadata once, which stores the sequence number, block unit checksum,
// and block unit size.
// We provide file based CacheDumpWriter to store the metadata and its package
// sequentially in a file as the defualt implementation. Users can implement
// their own CacheDumpWriter to store/transfer the data. For example, user can
// create a subclass which transfer the metadata and package on the fly.
class CacheDumpWriter {
public:
virtual ~CacheDumpWriter() = default;
// Called ONCE before the calls to WritePacket
virtual IOStatus WriteMetadata(const Slice& metadata) = 0;
virtual IOStatus WritePacket(const Slice& data) = 0;
virtual IOStatus Close() = 0;
};
// NOTE that: this class is EXPERIMENTAL! May be changed in the future!
// This is an abstract class to read or receive the data that is stored
// or transfered by CacheDumpWriter. Note that, ReadMetadata must be called
// once before we call a ReadPacket.
class CacheDumpReader {
public:
virtual ~CacheDumpReader() = default;
// Called ONCE before the calls to ReadPacket
virtual IOStatus ReadMetadata(std::string* metadata) = 0;
// Sets data to empty string on EOF
virtual IOStatus ReadPacket(std::string* data) = 0;
// (Close not needed)
};
// CacheDumpOptions is the option for CacheDumper and CacheDumpedLoader. Any
// dump or load process related control variables can be added here.
struct CacheDumpOptions {
SystemClock* clock;
};
// NOTE that: this class is EXPERIMENTAL! May be changed in the future!
// This the class to dump out the block in the block cache, store/transfer them
// via CacheDumpWriter. In order to dump out the blocks belonging to a certain
// DB or a list of DB (block cache can be shared by many DB), user needs to call
// SetDumpFilter to specify a list of DB to filter out the blocks that do not
// belong to those DB.
// A typical use case is: when we migrate a DB instance from host A to host B.
// We need to reopen the DB at host B after all the files are copied to host B.
// At this moment, the block cache at host B does not have any block from this
// migrated DB. Therefore, the read performance can be low due to cache warm up.
// By using CacheDumper before we shut down the DB at host A and using
// CacheDumpedLoader at host B before we reopen the DB, we can warmup the cache
// ahead. This function can be used in other use cases also.
class CacheDumper {
public:
virtual ~CacheDumper() = default;
// Only dump the blocks in the block cache that belong to the DBs in this list
virtual Status SetDumpFilter(std::vector<DB*> db_list) {
(void)db_list;
return Status::NotSupported("SetDumpFilter is not supported");
}
// The main function to dump out all the blocks that satisfy the filter
// condition from block cache to a certain CacheDumpWriter in one shot. This
// process may take some time.
virtual IOStatus DumpCacheEntriesToWriter() {
return IOStatus::NotSupported("DumpCacheEntriesToWriter is not supported");
}
};
// NOTE that: this class is EXPERIMENTAL! May be changed in the future!
// This is the class to load the dumped blocks to the destination cache. For now
// we only load the blocks to the SecondaryCache. In the future, we may plan to
// support loading to the block cache.
class CacheDumpedLoader {
public:
virtual ~CacheDumpedLoader() = default;
virtual IOStatus RestoreCacheEntriesToSecondaryCache() {
return IOStatus::NotSupported(
"RestoreCacheEntriesToSecondaryCache is not supported");
}
};
// Get the writer which stores all the metadata and data sequentially to a file
IOStatus NewToFileCacheDumpWriter(const std::shared_ptr<FileSystem>& fs,
const FileOptions& file_opts,
const std::string& file_name,
std::unique_ptr<CacheDumpWriter>* writer);
// Get the reader which read out the metadata and data sequentially from a file
IOStatus NewFromFileCacheDumpReader(const std::shared_ptr<FileSystem>& fs,
const FileOptions& file_opts,
const std::string& file_name,
std::unique_ptr<CacheDumpReader>* reader);
// Get the default cache dumper
Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options,
const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpWriter>&& writer,
std::unique_ptr<CacheDumper>* cache_dumper);
// Get the default cache dump loader
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader);
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

2
src.mk
View File

@ -234,6 +234,8 @@ LIB_SOURCES = \
utilities/blob_db/blob_db_impl.cc \
utilities/blob_db/blob_db_impl_filesnapshot.cc \
utilities/blob_db/blob_file.cc \
utilities/cache_dump_load.cc \
utilities/cache_dump_load_impl.cc \
utilities/cassandra/cassandra_compaction_filter.cc \
utilities/cassandra/format.cc \
utilities/cassandra/merge_operator.cc \

View File

@ -266,8 +266,8 @@ TableBuilder* MockTableFactory::NewTableBuilder(
Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
KVVector file_contents) {
std::unique_ptr<WritableFileWriter> file_writer;
auto s = WritableFileWriter::Create(env->GetFileSystem(), fname,
FileOptions(), &file_writer, nullptr);
Status s = WritableFileWriter::Create(env->GetFileSystem(), fname,
FileOptions(), &file_writer, nullptr);
if (!s.ok()) {
return s;
}

View File

@ -0,0 +1,69 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/cache_dump_load.h"
#include "file/writable_file_writer.h"
#include "port/lang.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "table/format.h"
#include "util/crc32c.h"
#include "utilities/cache_dump_load_impl.h"
namespace ROCKSDB_NAMESPACE {
IOStatus NewToFileCacheDumpWriter(const std::shared_ptr<FileSystem>& fs,
const FileOptions& file_opts,
const std::string& file_name,
std::unique_ptr<CacheDumpWriter>* writer) {
std::unique_ptr<WritableFileWriter> file_writer;
IOStatus io_s = WritableFileWriter::Create(fs, file_name, file_opts,
&file_writer, nullptr);
if (!io_s.ok()) {
return io_s;
}
writer->reset(new ToFileCacheDumpWriter(std::move(file_writer)));
return io_s;
}
IOStatus NewFromFileCacheDumpReader(const std::shared_ptr<FileSystem>& fs,
const FileOptions& file_opts,
const std::string& file_name,
std::unique_ptr<CacheDumpReader>* reader) {
std::unique_ptr<RandomAccessFileReader> file_reader;
IOStatus io_s = RandomAccessFileReader::Create(fs, file_name, file_opts,
&file_reader, nullptr);
if (!io_s.ok()) {
return io_s;
}
reader->reset(new FromFileCacheDumpReader(std::move(file_reader)));
return io_s;
}
Status NewDefaultCacheDumper(const CacheDumpOptions& dump_options,
const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpWriter>&& writer,
std::unique_ptr<CacheDumper>* cache_dumper) {
cache_dumper->reset(
new CacheDumperImpl(dump_options, cache, std::move(writer)));
return Status::OK();
}
Status NewDefaultCacheDumpedLoader(
const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader,
std::unique_ptr<CacheDumpedLoader>* cache_dump_loader) {
cache_dump_loader->reset(new CacheDumpedLoaderImpl(
dump_options, toptions, secondary_cache, std::move(reader)));
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,477 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/cache_dump_load_impl.h"
#include "cache/cache_entry_roles.h"
#include "file/writable_file_writer.h"
#include "port/lang.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/utilities/ldb_cmd.h"
#include "table/format.h"
#include "util/crc32c.h"
namespace ROCKSDB_NAMESPACE {
// Set the dump filter with a list of DBs. Block cache may be shared by multipe
// DBs and we may only want to dump out the blocks belonging to certain DB(s).
// Therefore, a filter is need to decide if the key of the block satisfy the
// requirement.
Status CacheDumperImpl::SetDumpFilter(std::vector<DB*> db_list) {
Status s = Status::OK();
for (size_t i = 0; i < db_list.size(); i++) {
assert(i < db_list.size());
TablePropertiesCollection ptc;
assert(db_list[i] != nullptr);
s = db_list[i]->GetPropertiesOfAllTables(&ptc);
if (!s.ok()) {
return s;
}
for (auto id = ptc.begin(); id != ptc.end(); id++) {
assert(id->second->db_session_id.size() == 20);
prefix_filter_.insert(id->second->db_session_id);
}
}
return s;
}
// This is the main function to dump out the cache block entries to the writer.
// The writer may create a file or write to other systems. Currently, we will
// iterate the whole block cache, get the blocks, and write them to the writer
IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() {
// Prepare stage, check the parameters.
if (cache_ == nullptr) {
return IOStatus::InvalidArgument("Cache is null");
}
if (writer_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpWriter is null");
}
// Set the system clock
if (options_.clock == nullptr) {
return IOStatus::InvalidArgument("System clock is null");
}
clock_ = options_.clock;
// We copy the Cache Deleter Role Map as its member.
role_map_ = CopyCacheDeleterRoleMap();
// Set the sequence number
sequence_num_ = 0;
// Dump stage, first, we write the hader
IOStatus io_s = WriteHeader();
if (!io_s.ok()) {
return io_s;
}
// Then, we iterate the block cache and dump out the blocks that are not
// filtered out.
cache_->ApplyToAllEntries(DumpOneBlockCallBack(), {});
// Finally, write the footer
io_s = WriteFooter();
if (!io_s.ok()) {
return io_s;
}
io_s = writer_->Close();
return io_s;
}
// Check if we need to filter out the block based on its key
bool CacheDumperImpl::ShouldFilterOut(const Slice& key) {
// Since now we use db_session_id as the prefix, the prefix size is 20. If
// Anything changes in the future, we need to update it here.
bool filter_out = true;
size_t prefix_size = 20;
Slice key_prefix(key.data(), prefix_size);
std::string prefix = key_prefix.ToString();
if (prefix_filter_.find(prefix) != prefix_filter_.end()) {
filter_out = false;
}
return filter_out;
}
// This is the callback function which will be applied to
// Cache::ApplyToAllEntries. In this callback function, we will get the block
// type, decide if the block needs to be dumped based on the filter, and write
// the block through the provided writer.
std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)>
CacheDumperImpl::DumpOneBlockCallBack() {
return [&](const Slice& key, void* value, size_t /*charge*/,
Cache::DeleterFn deleter) {
// Step 1: get the type of the block from role_map_
auto e = role_map_.find(deleter);
CacheEntryRole role;
CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax;
if (e == role_map_.end()) {
role = CacheEntryRole::kMisc;
} else {
role = e->second;
}
bool filter_out = false;
// Step 2: based on the key prefix, check if the block should be filter out.
if (ShouldFilterOut(key)) {
filter_out = true;
}
// Step 3: based on the block type, get the block raw pointer and length.
const char* block_start = nullptr;
size_t block_len = 0;
switch (role) {
case CacheEntryRole::kDataBlock:
type = CacheDumpUnitType::kData;
block_start = (static_cast<Block*>(value))->data();
block_len = (static_cast<Block*>(value))->size();
break;
case CacheEntryRole::kDeprecatedFilterBlock:
type = CacheDumpUnitType::kDeprecatedFilterBlock;
block_start = (static_cast<BlockContents*>(value))->data.data();
block_len = (static_cast<BlockContents*>(value))->data.size();
break;
case CacheEntryRole::kFilterBlock:
type = CacheDumpUnitType::kFilter;
block_start = (static_cast<ParsedFullFilterBlock*>(value))
->GetBlockContentsData()
.data();
block_len = (static_cast<ParsedFullFilterBlock*>(value))
->GetBlockContentsData()
.size();
break;
case CacheEntryRole::kFilterMetaBlock:
type = CacheDumpUnitType::kFilterMetaBlock;
block_start = (static_cast<Block*>(value))->data();
block_len = (static_cast<Block*>(value))->size();
break;
case CacheEntryRole::kIndexBlock:
type = CacheDumpUnitType::kIndex;
block_start = (static_cast<Block*>(value))->data();
block_len = (static_cast<Block*>(value))->size();
break;
case CacheEntryRole::kMisc:
filter_out = true;
break;
case CacheEntryRole::kOtherBlock:
filter_out = true;
break;
case CacheEntryRole::kWriteBuffer:
filter_out = true;
break;
default:
filter_out = true;
}
// Step 4: if the block should not be filter out, write the block to the
// CacheDumpWriter
if (!filter_out && block_start != nullptr) {
char* buffer = new char[block_len];
memcpy(buffer, block_start, block_len);
WriteCacheBlock(type, key, (void*)buffer, block_len)
.PermitUncheckedError();
delete[] buffer;
}
};
}
// Write the raw block to the writer. It takes the timestamp of the block being
// copied from block cache, block type, key, block pointer, raw block size and
// the block checksum as the input. When writing the raw block, we first create
// the dump unit and encoude it to a string. Then, we calculate the checksum of
// the how dump unit string and store it in the dump unit metadata.
// First, we write the metadata first, which is a fixed size string. Then, we
// Append the dump unit string to the writer.
IOStatus CacheDumperImpl::WriteRawBlock(uint64_t timestamp,
CacheDumpUnitType type,
const Slice& key, void* value,
size_t len, uint32_t checksum) {
// First, serilize the block information in a string
DumpUnit dump_unit;
dump_unit.timestamp = timestamp;
dump_unit.key = key;
dump_unit.type = type;
dump_unit.value_len = len;
dump_unit.value = value;
dump_unit.value_checksum = checksum;
std::string encoded_data;
CacheDumperHelper::EncodeDumpUnit(dump_unit, &encoded_data);
// Second, create the metadata, which contains a sequence number, the dump
// unit string checksum and the string size. The sequence number monotonically
// increases from 0.
DumpUnitMeta unit_meta;
unit_meta.sequence_num = sequence_num_;
sequence_num_++;
unit_meta.dump_unit_checksum =
crc32c::Value(encoded_data.c_str(), encoded_data.size());
unit_meta.dump_unit_size = static_cast<uint64_t>(encoded_data.size());
std::string encoded_meta;
CacheDumperHelper::EncodeDumpUnitMeta(unit_meta, &encoded_meta);
// We write the metadata first.
assert(writer_ != nullptr);
IOStatus io_s = writer_->WriteMetadata(Slice(encoded_meta));
if (!io_s.ok()) {
return io_s;
}
// followed by the dump unit.
return writer_->WritePacket(Slice(encoded_data));
}
// Before we write any block, we write the header first to store the cache dump
// format version, rocksdb version, and brief intro.
IOStatus CacheDumperImpl::WriteHeader() {
std::string header_key = "header";
std::ostringstream s;
s << kTraceMagic << "\t"
<< "Cache dump format version: " << kCacheDumpMajorVersion << "."
<< kCacheDumpMinorVersion << "\t"
<< "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
<< "Format: dump_unit_metadata <sequence_number, dump_unit_checksum, "
"dump_unit_size>, dump_unit <timestamp, key, block_type, "
"block_size, raw_block, raw_block_checksum> cache_value\n";
std::string header_value(s.str());
CacheDumpUnitType type = CacheDumpUnitType::kHeader;
uint64_t timestamp = clock_->NowMicros();
uint32_t header_checksum =
crc32c::Value(header_value.c_str(), header_value.size());
return WriteRawBlock(timestamp, type, Slice(header_key),
(void*)header_value.c_str(), header_value.size(),
header_checksum);
}
// Write the block dumped from cache
IOStatus CacheDumperImpl::WriteCacheBlock(const CacheDumpUnitType type,
const Slice& key, void* value,
size_t len) {
uint64_t timestamp = clock_->NowMicros();
uint32_t value_checksum = crc32c::Value((char*)value, len);
return WriteRawBlock(timestamp, type, key, value, len, value_checksum);
}
// Write the footer after all the blocks are stored to indicate the ending.
IOStatus CacheDumperImpl::WriteFooter() {
std::string footer_key = "footer";
std::ostringstream s;
std::string footer_value("cache dump completed");
CacheDumpUnitType type = CacheDumpUnitType::kFooter;
uint64_t timestamp = clock_->NowMicros();
uint32_t footer_checksum =
crc32c::Value(footer_value.c_str(), footer_value.size());
return WriteRawBlock(timestamp, type, Slice(footer_key),
(void*)footer_value.c_str(), footer_value.size(),
footer_checksum);
}
// This is the main function to restore the cache entries to secondary cache.
// First, we check if all the arguments are valid. Then, we read the block
// sequentially from the reader and insert them to the secondary cache.
IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
// TODO: remove this line when options are used in the loader
(void)options_;
// Step 1: we check if all the arguments are valid
if (secondary_cache_ == nullptr) {
return IOStatus::InvalidArgument("Secondary Cache is null");
}
if (reader_ == nullptr) {
return IOStatus::InvalidArgument("CacheDumpReader is null");
}
// we copy the Cache Deleter Role Map as its member.
role_map_ = CopyCacheDeleterRoleMap();
// Step 2: read the header
// TODO: we need to check the cache dump format version and RocksDB version
// after the header is read out.
IOStatus io_s;
DumpUnit dump_unit;
std::string data;
io_s = ReadHeader(&data, &dump_unit);
if (!io_s.ok()) {
return io_s;
}
// Step 3: read out the rest of the blocks from the reader. The loop will stop
// either I/O status is not ok or we reach to the the end.
while (io_s.ok() && dump_unit.type != CacheDumpUnitType::kFooter) {
dump_unit.reset();
data.clear();
// read the content and store in the dump_unit
io_s = ReadCacheBlock(&data, &dump_unit);
if (!io_s.ok()) {
break;
}
// create the raw_block_content based on the information in the dump_unit
BlockContents raw_block_contents(
Slice((char*)dump_unit.value, dump_unit.value_len));
Cache::CacheItemHelper* helper = nullptr;
Statistics* statistics = nullptr;
Status s = Status::OK();
// according to the block type, get the helper callback function and create
// the corresponding block
switch (dump_unit.type) {
case CacheDumpUnitType::kDeprecatedFilterBlock: {
helper = BlocklikeTraits<BlockContents>::GetCacheItemHelper(
BlockType::kFilter);
std::unique_ptr<BlockContents> block_holder;
block_holder.reset(BlocklikeTraits<BlockContents>::Create(
std::move(raw_block_contents), 0, statistics, false,
toptions_.filter_policy.get()));
// Insert the block to secondary cache.
// Note that, if we cannot get the correct helper callback, the block
// will not be inserted.
if (helper != nullptr) {
s = secondary_cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper);
}
break;
}
case CacheDumpUnitType::kFilter: {
helper = BlocklikeTraits<ParsedFullFilterBlock>::GetCacheItemHelper(
BlockType::kFilter);
std::unique_ptr<ParsedFullFilterBlock> block_holder;
block_holder.reset(BlocklikeTraits<ParsedFullFilterBlock>::Create(
std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = secondary_cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper);
}
break;
}
case CacheDumpUnitType::kData: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(BlockType::kData);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = secondary_cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper);
}
break;
}
case CacheDumpUnitType::kIndex: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(BlockType::kIndex);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(raw_block_contents), 0, statistics, false,
toptions_.filter_policy.get()));
if (helper != nullptr) {
s = secondary_cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper);
}
break;
}
case CacheDumpUnitType::kFilterMetaBlock: {
helper = BlocklikeTraits<Block>::GetCacheItemHelper(BlockType::kFilter);
std::unique_ptr<Block> block_holder;
block_holder.reset(BlocklikeTraits<Block>::Create(
std::move(raw_block_contents), toptions_.read_amp_bytes_per_bit,
statistics, false, toptions_.filter_policy.get()));
if (helper != nullptr) {
s = secondary_cache_->Insert(dump_unit.key,
(void*)(block_holder.get()), helper);
}
break;
}
case CacheDumpUnitType::kFooter:
break;
default:
continue;
}
if (!s.ok()) {
io_s = status_to_io_status(std::move(s));
}
}
if (dump_unit.type == CacheDumpUnitType::kFooter) {
return IOStatus::OK();
} else {
return io_s;
}
}
// Read and copy the dump unit metadata to std::string data, decode and create
// the unit metadata based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
DumpUnitMeta* unit_meta) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit_meta != nullptr);
IOStatus io_s = reader_->ReadMetadata(data);
if (!io_s.ok()) {
return io_s;
}
return status_to_io_status(
CacheDumperHelper::DecodeDumpUnitMeta(*data, unit_meta));
}
// Read and copy the dump unit to std::string data, decode and create the unit
// based on the string
IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
DumpUnit* unit) {
assert(reader_ != nullptr);
assert(data != nullptr);
assert(unit != nullptr);
IOStatus io_s = reader_->ReadPacket(data);
if (!io_s.ok()) {
return io_s;
}
if (data->size() != len) {
return IOStatus::Corruption(
"The data being read out does not match the size stored in metadata!");
}
Slice block;
return status_to_io_status(CacheDumperHelper::DecodeDumpUnit(*data, unit));
}
// Read the header
IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
DumpUnit* dump_unit) {
DumpUnitMeta header_meta;
header_meta.reset();
std::string meta_string;
IOStatus io_s = ReadDumpUnitMeta(&meta_string, &header_meta);
if (!io_s.ok()) {
return io_s;
}
io_s = ReadDumpUnit(header_meta.dump_unit_size, data, dump_unit);
if (!io_s.ok()) {
return io_s;
}
uint32_t unit_checksum = crc32c::Value(data->c_str(), data->size());
if (unit_checksum != header_meta.dump_unit_checksum) {
return IOStatus::Corruption("Read header unit corrupted!");
}
return io_s;
}
// Read the blocks after header is read out
IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
DumpUnit* dump_unit) {
// According to the write process, we read the dump_unit_metadata first
DumpUnitMeta unit_meta;
unit_meta.reset();
std::string unit_string;
IOStatus io_s = ReadDumpUnitMeta(&unit_string, &unit_meta);
if (!io_s.ok()) {
return io_s;
}
// Based on the information in the dump_unit_metadata, we read the dump_unit
// and verify if its content is correct.
io_s = ReadDumpUnit(unit_meta.dump_unit_size, data, dump_unit);
if (!io_s.ok()) {
return io_s;
}
uint32_t unit_checksum = crc32c::Value(data->c_str(), data->size());
if (unit_checksum != unit_meta.dump_unit_checksum) {
return IOStatus::Corruption(
"Checksum does not match! Read dumped unit corrupted!");
}
return io_s;
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,363 @@
// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <unordered_map>
#include "file/random_access_file_reader.h"
#include "file/writable_file_writer.h"
#include "rocksdb/utilities/cache_dump_load.h"
#include "table/block_based/block.h"
#include "table/block_based/block_like_traits.h"
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/parsed_full_filter_block.h"
#include "table/block_based/reader_common.h"
namespace ROCKSDB_NAMESPACE {
// the read buffer size of for the default CacheDumpReader
const unsigned int kDumpReaderBufferSize = 1024; // 1KB
static const unsigned int kSizePrefixLen = 4;
enum CacheDumpUnitType : unsigned char {
kHeader = 1,
kFooter = 2,
kData = 3,
kFilter = 4,
kProperties = 5,
kCompressionDictionary = 6,
kRangeDeletion = 7,
kHashIndexPrefixes = 8,
kHashIndexMetadata = 9,
kMetaIndex = 10,
kIndex = 11,
kDeprecatedFilterBlock = 12,
kFilterMetaBlock = 13,
kBlockTypeMax,
};
// The metadata of a dump unit. After it is serilized, its size is fixed 16
// bytes.
struct DumpUnitMeta {
// sequence number is a monotonically increasing number to indicate the order
// of the blocks being written. Header is 0.
uint32_t sequence_num;
// The Crc32c checksum of its dump unit.
uint32_t dump_unit_checksum;
// The dump unit size after the dump unit is serilized to a string.
uint64_t dump_unit_size;
void reset() {
sequence_num = 0;
dump_unit_checksum = 0;
dump_unit_size = 0;
}
};
// The data structure to hold a block and its information.
struct DumpUnit {
// The timestamp when the block is identified, copied, and dumped from block
// cache
uint64_t timestamp;
// The type of the block
CacheDumpUnitType type;
// The key of this block when the block is referenced by this Cache
Slice key;
// The block size
size_t value_len;
// The Crc32c checksum of the block
uint32_t value_checksum;
// Pointer to the block. Note that, in the dump process, it points to a memory
// buffer copied from cache block. The buffer is freed when we process the
// next block. In the load process, we use an std::string to store the
// serilized dump_unit read from the reader. So it points to the memory
// address of the begin of the block in this string.
void* value;
void reset() {
timestamp = 0;
type = CacheDumpUnitType::kBlockTypeMax;
key.clear();
value_len = 0;
value_checksum = 0;
value = nullptr;
}
};
// The default implementation of the Cache Dumper
class CacheDumperImpl : public CacheDumper {
public:
CacheDumperImpl(const CacheDumpOptions& dump_options,
const std::shared_ptr<Cache>& cache,
std::unique_ptr<CacheDumpWriter>&& writer)
: options_(dump_options), cache_(cache), writer_(std::move(writer)) {}
~CacheDumperImpl() { writer_.reset(); }
Status SetDumpFilter(std::vector<DB*> db_list) override;
IOStatus DumpCacheEntriesToWriter() override;
private:
IOStatus WriteRawBlock(uint64_t timestamp, CacheDumpUnitType type,
const Slice& key, void* value, size_t len,
uint32_t checksum);
IOStatus WriteHeader();
IOStatus WriteCacheBlock(const CacheDumpUnitType type, const Slice& key,
void* value, size_t len);
IOStatus WriteFooter();
bool ShouldFilterOut(const Slice& key);
std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)>
DumpOneBlockCallBack();
CacheDumpOptions options_;
std::shared_ptr<Cache> cache_;
std::unique_ptr<CacheDumpWriter> writer_;
std::unordered_map<Cache::DeleterFn, CacheEntryRole> role_map_;
SystemClock* clock_;
uint32_t sequence_num_;
// The cache key prefix filter. Currently, we use db_session_id as the prefix,
// so using std::set to store the prefixes as filter is enough. Further
// improvement can be applied like BloomFilter or others to speedup the
// filtering.
std::set<std::string> prefix_filter_;
};
// The default implementation of CacheDumpedLoader
class CacheDumpedLoaderImpl : public CacheDumpedLoader {
public:
CacheDumpedLoaderImpl(const CacheDumpOptions& dump_options,
const BlockBasedTableOptions& toptions,
const std::shared_ptr<SecondaryCache>& secondary_cache,
std::unique_ptr<CacheDumpReader>&& reader)
: options_(dump_options),
toptions_(toptions),
secondary_cache_(secondary_cache),
reader_(std::move(reader)) {}
~CacheDumpedLoaderImpl() {}
IOStatus RestoreCacheEntriesToSecondaryCache() override;
private:
IOStatus ReadDumpUnitMeta(std::string* data, DumpUnitMeta* unit_meta);
IOStatus ReadDumpUnit(size_t len, std::string* data, DumpUnit* unit);
IOStatus ReadHeader(std::string* data, DumpUnit* dump_unit);
IOStatus ReadCacheBlock(std::string* data, DumpUnit* dump_unit);
CacheDumpOptions options_;
const BlockBasedTableOptions& toptions_;
std::shared_ptr<SecondaryCache> secondary_cache_;
std::unique_ptr<CacheDumpReader> reader_;
std::unordered_map<Cache::DeleterFn, CacheEntryRole> role_map_;
};
// The default implementation of CacheDumpWriter. We write the blocks to a file
// sequentially.
class ToFileCacheDumpWriter : public CacheDumpWriter {
public:
explicit ToFileCacheDumpWriter(
std::unique_ptr<WritableFileWriter>&& file_writer)
: file_writer_(std::move(file_writer)) {}
~ToFileCacheDumpWriter() { Close().PermitUncheckedError(); }
// Write the serilized metadata to the file
virtual IOStatus WriteMetadata(const Slice& metadata) override {
assert(file_writer_ != nullptr);
std::string prefix;
PutFixed32(&prefix, static_cast<uint32_t>(metadata.size()));
IOStatus io_s = file_writer_->Append(Slice(prefix));
if (!io_s.ok()) {
return io_s;
}
io_s = file_writer_->Append(metadata);
return io_s;
}
// Write the serilized data to the file
virtual IOStatus WritePacket(const Slice& data) override {
assert(file_writer_ != nullptr);
std::string prefix;
PutFixed32(&prefix, static_cast<uint32_t>(data.size()));
IOStatus io_s = file_writer_->Append(Slice(prefix));
if (!io_s.ok()) {
return io_s;
}
io_s = file_writer_->Append(data);
return io_s;
}
// Reset the writer
virtual IOStatus Close() override {
file_writer_.reset();
return IOStatus::OK();
}
private:
std::unique_ptr<WritableFileWriter> file_writer_;
};
// The default implementation of CacheDumpReader. It is implemented based on
// RandomAccessFileReader. Note that, we keep an internal variable to remember
// the current offset.
class FromFileCacheDumpReader : public CacheDumpReader {
public:
explicit FromFileCacheDumpReader(
std::unique_ptr<RandomAccessFileReader>&& reader)
: file_reader_(std::move(reader)),
offset_(0),
buffer_(new char[kDumpReaderBufferSize]) {}
~FromFileCacheDumpReader() { delete[] buffer_; }
virtual IOStatus ReadMetadata(std::string* metadata) override {
uint32_t metadata_len = 0;
IOStatus io_s = ReadSizePrefix(&metadata_len);
if (!io_s.ok()) {
return io_s;
}
return Read(metadata_len, metadata);
}
virtual IOStatus ReadPacket(std::string* data) override {
uint32_t data_len = 0;
IOStatus io_s = ReadSizePrefix(&data_len);
if (!io_s.ok()) {
return io_s;
}
return Read(data_len, data);
}
private:
IOStatus ReadSizePrefix(uint32_t* len) {
std::string prefix;
IOStatus io_s = Read(kSizePrefixLen, &prefix);
if (!io_s.ok()) {
return io_s;
}
Slice encoded_slice(prefix);
if (!GetFixed32(&encoded_slice, len)) {
return IOStatus::Corruption("Decode size prefix string failed");
}
return IOStatus::OK();
}
IOStatus Read(size_t len, std::string* data) {
assert(file_reader_ != nullptr);
IOStatus io_s;
unsigned int bytes_to_read = static_cast<unsigned int>(len);
unsigned int to_read = bytes_to_read > kDumpReaderBufferSize
? kDumpReaderBufferSize
: bytes_to_read;
while (to_read > 0) {
io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_,
buffer_, nullptr);
if (!io_s.ok()) {
return io_s;
}
if (result_.size() < to_read) {
return IOStatus::Corruption("Corrupted cache dump file.");
}
data->append(result_.data(), result_.size());
offset_ += to_read;
bytes_to_read -= to_read;
to_read = bytes_to_read > kDumpReaderBufferSize ? kDumpReaderBufferSize
: bytes_to_read;
}
return io_s;
}
std::unique_ptr<RandomAccessFileReader> file_reader_;
Slice result_;
size_t offset_;
char* buffer_;
};
// The cache dump and load helper class
class CacheDumperHelper {
public:
// serilize the dump_unit_meta to a string, it is fixed 16 bytes size.
static void EncodeDumpUnitMeta(const DumpUnitMeta& meta, std::string* data) {
assert(data);
PutFixed32(data, static_cast<uint32_t>(meta.sequence_num));
PutFixed32(data, static_cast<uint32_t>(meta.dump_unit_checksum));
PutFixed64(data, meta.dump_unit_size);
}
// Serilize the dump_unit to a string.
static void EncodeDumpUnit(const DumpUnit& dump_unit, std::string* data) {
assert(data);
PutFixed64(data, dump_unit.timestamp);
data->push_back(dump_unit.type);
PutLengthPrefixedSlice(data, dump_unit.key);
PutFixed32(data, static_cast<uint32_t>(dump_unit.value_len));
PutFixed32(data, dump_unit.value_checksum);
PutLengthPrefixedSlice(data,
Slice((char*)dump_unit.value, dump_unit.value_len));
}
// Deserilize the dump_unit_meta from a string
static Status DecodeDumpUnitMeta(const std::string& encoded_data,
DumpUnitMeta* unit_meta) {
assert(unit_meta != nullptr);
Slice encoded_slice = Slice(encoded_data);
if (!GetFixed32(&encoded_slice, &(unit_meta->sequence_num))) {
return Status::Incomplete("Decode dumped unit meta sequence_num failed");
}
if (!GetFixed32(&encoded_slice, &(unit_meta->dump_unit_checksum))) {
return Status::Incomplete(
"Decode dumped unit meta dump_unit_checksum failed");
}
if (!GetFixed64(&encoded_slice, &(unit_meta->dump_unit_size))) {
return Status::Incomplete(
"Decode dumped unit meta dump_unit_size failed");
}
return Status::OK();
}
// Deserilize the dump_unit from a string.
static Status DecodeDumpUnit(const std::string& encoded_data,
DumpUnit* dump_unit) {
assert(dump_unit != nullptr);
Slice encoded_slice = Slice(encoded_data);
// Decode timestamp
if (!GetFixed64(&encoded_slice, &dump_unit->timestamp)) {
return Status::Incomplete("Decode dumped unit string failed");
}
// Decode the block type
dump_unit->type = static_cast<CacheDumpUnitType>(encoded_slice[0]);
encoded_slice.remove_prefix(1);
// Decode the key
if (!GetLengthPrefixedSlice(&encoded_slice, &(dump_unit->key))) {
return Status::Incomplete("Decode dumped unit string failed");
}
// Decode the value size
uint32_t value_len;
if (!GetFixed32(&encoded_slice, &value_len)) {
return Status::Incomplete("Decode dumped unit string failed");
}
dump_unit->value_len = static_cast<size_t>(value_len);
// Decode the value checksum
if (!GetFixed32(&encoded_slice, &(dump_unit->value_checksum))) {
return Status::Incomplete("Decode dumped unit string failed");
}
// Decode the block content and copy to the memory space whose pointer
// will be managed by the cache finally.
Slice block;
if (!GetLengthPrefixedSlice(&encoded_slice, &block)) {
return Status::Incomplete("Decode dumped unit string failed");
}
dump_unit->value = (void*)block.data();
assert(block.size() == dump_unit->value_len);
return Status::OK();
}
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE