Add statistics support on CompactionService remote side (#8368)

Summary:
Add statistics option on CompactionService remote side.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8368

Test Plan: `make check`

Reviewed By: ajkr

Differential Revision: D28944427

Pulled By: jay-zhuang

fbshipit-source-id: 2a19217f4a69b6e511af87eed12391860ef00c5e
This commit is contained in:
Jay Zhuang 2021-06-29 11:46:44 -07:00 committed by Facebook GitHub Bot
parent 3503f28982
commit 93a7389442
4 changed files with 36 additions and 19 deletions

View File

@ -11,6 +11,8 @@
* ldb has a new feature, `list_live_files_metadata`, that shows the live SST files, as well as their LSM storage level and the column family they belong to.
* The new BlobDB implementation now tracks the amount of garbage in each blob file in the MANIFEST.
* Integrated BlobDB now supports Merge with base values (Put/Delete etc.).
* RemoteCompaction supports sub-compaction, the job_id in the user interface is changed from `int` to `uint64_t` to support sub-compaction id.
* Expose statistics option in RemoteCompaction worker.
### Public API change
* Added APIs to the Customizable class to allow developers to create their own Customizable classes. Created the utilities/customizable_util.h file to contain helper methods for developing new Customizable classes.

View File

@ -12,9 +12,9 @@ namespace ROCKSDB_NAMESPACE {
class MyTestCompactionService : public CompactionService {
public:
MyTestCompactionService(const std::string& db_path,
std::shared_ptr<FileSystem> fs, Options& options)
: db_path_(db_path), fs_(fs), options_(options) {}
MyTestCompactionService(const std::string& db_path, Options& options,
std::shared_ptr<Statistics> statistics = nullptr)
: db_path_(db_path), options_(options), statistics_(statistics) {}
static const char* kClassName() { return "MyTestCompactionService"; }
@ -54,6 +54,7 @@ class MyTestCompactionService : public CompactionService {
options_override.prefix_extractor = options_.prefix_extractor;
options_override.table_factory = options_.table_factory;
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
options_override.statistics = statistics_;
Status s = DB::OpenAndCompact(
db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(job_id),
@ -75,8 +76,8 @@ class MyTestCompactionService : public CompactionService {
std::atomic_int compaction_num_{0};
std::map<uint64_t, std::string> jobs_;
const std::string db_path_;
std::shared_ptr<FileSystem> fs_;
Options options_;
std::shared_ptr<Statistics> statistics_;
};
class CompactionServiceTest : public DBTestBase {
@ -123,8 +124,10 @@ class CompactionServiceTest : public DBTestBase {
TEST_F(CompactionServiceTest, BasicCompactions) {
Options options = CurrentOptions();
options.env = env_;
options.statistics = CreateDBStatistics();
std::shared_ptr<Statistics> compactor_statistics = CreateDBStatistics();
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
dbname_, options, compactor_statistics);
DestroyAndReopen(options);
@ -158,6 +161,12 @@ TEST_F(CompactionServiceTest, BasicCompactions) {
dynamic_cast<MyTestCompactionService*>(options.compaction_service.get());
ASSERT_GE(my_cs->GetCompactionNum(), 1);
// make sure the compaction statistics is only recorded on remote side
ASSERT_GE(
compactor_statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY), 1);
ASSERT_EQ(options.statistics->getTickerCount(COMPACTION_KEY_DROP_NEWER_ENTRY),
0);
// Test failed compaction
SyncPoint::GetInstance()->SetCallBack(
"DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
@ -195,8 +204,8 @@ TEST_F(CompactionServiceTest, ManualCompaction) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
GenerateTestData();
@ -236,8 +245,8 @@ TEST_F(CompactionServiceTest, FailedToStart) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
GenerateTestData();
@ -261,8 +270,8 @@ TEST_F(CompactionServiceTest, InvalidResult) {
Options options = CurrentOptions();
options.env = env_;
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
GenerateTestData();
@ -288,8 +297,8 @@ TEST_F(CompactionServiceTest, SubCompaction) {
options.max_subcompactions = 10;
options.target_file_size_base = 1 << 10; // 1KB
options.disable_auto_compactions = true;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
GenerateTestData();
@ -330,8 +339,8 @@ TEST_F(CompactionServiceTest, CompactionFilter) {
options.env = env_;
auto delete_comp_filter = PartialDeleteCompactionFilter();
options.compaction_filter = &delete_comp_filter;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
@ -373,8 +382,8 @@ TEST_F(CompactionServiceTest, CompactionFilter) {
TEST_F(CompactionServiceTest, Snapshot) {
Options options = CurrentOptions();
options.env = env_;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
DestroyAndReopen(options);
@ -400,8 +409,8 @@ TEST_F(CompactionServiceTest, ConcurrentCompaction) {
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 100;
options.env = env_;
options.compaction_service = std::make_shared<MyTestCompactionService>(
dbname_, env_->GetFileSystem(), options);
options.compaction_service =
std::make_shared<MyTestCompactionService>(dbname_, options);
options.max_background_jobs = 20;
DestroyAndReopen(options);

View File

@ -767,6 +767,7 @@ Status DB::OpenAndCompact(
compaction_input.db_options.env = override_options.env;
compaction_input.db_options.file_checksum_gen_factory =
override_options.file_checksum_gen_factory;
compaction_input.db_options.statistics = override_options.statistics;
compaction_input.column_family.options.comparator =
override_options.comparator;
compaction_input.column_family.options.merge_operator =

View File

@ -1797,6 +1797,11 @@ struct CompactionServiceOptionsOverride {
std::shared_ptr<const SliceTransform> prefix_extractor = nullptr;
std::shared_ptr<TableFactory> table_factory;
std::shared_ptr<SstPartitionerFactory> sst_partitioner_factory = nullptr;
// statistics is used to collect DB operation metrics, the metrics won't be
// returned to CompactionService primary host, to collect that, the user needs
// to set it here.
std::shared_ptr<Statistics> statistics = nullptr;
};
} // namespace ROCKSDB_NAMESPACE