From b84e3363f5c7c606d1641ce69fb55fc744bc2152 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Tue, 17 May 2022 20:57:51 -0700 Subject: [PATCH] Add table_properties_collector_factories override (#9995) Summary: Add table_properties_collector_factories override on the remote side. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9995 Test Plan: unittest added Reviewed By: ajkr Differential Revision: D36392623 Pulled By: jay-zhuang fbshipit-source-id: 3ba031294d90247ca063d7de7b43178d38e3f66a --- HISTORY.md | 1 + db/compaction/compaction_service_test.cc | 110 ++++++++++++++++++++++- db/db_impl/db_impl_secondary.cc | 2 + include/rocksdb/options.h | 5 ++ 4 files changed, 115 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 47047bca3..61cb7cd7a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,6 +10,7 @@ ### New Features * DB::GetLiveFilesStorageInfo is ready for production use. * Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion. +* RemoteCompaction supports table_properties_collector_factories override on compaction worker. ### Public API changes * Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions. diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 6fff7aa7a..d073cbe2e 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -15,13 +15,17 @@ class MyTestCompactionService : public CompactionService { MyTestCompactionService( std::string db_path, Options& options, std::shared_ptr& statistics, - std::vector>& listeners) + std::vector>& listeners, + std::vector> + table_properties_collector_factories) : db_path_(std::move(db_path)), options_(options), statistics_(statistics), start_info_("na", "na", "na", 0, Env::TOTAL), wait_info_("na", "na", "na", 0, Env::TOTAL), - listeners_(listeners) {} + listeners_(listeners), + table_properties_collector_factories_( + std::move(table_properties_collector_factories)) {} static const char* kClassName() { return "MyTestCompactionService"; } @@ -78,6 +82,11 @@ class MyTestCompactionService : public CompactionService { options_override.listeners = listeners_; } + if (!table_properties_collector_factories_.empty()) { + options_override.table_properties_collector_factories = + table_properties_collector_factories_; + } + OpenAndCompactOptions options; options.canceled = &canceled_; @@ -141,6 +150,8 @@ class MyTestCompactionService : public CompactionService { bool is_override_wait_result_ = false; std::string override_wait_result_; std::vector> listeners_; + std::vector> + table_properties_collector_factories_; std::atomic_bool canceled_{false}; }; @@ -157,7 +168,8 @@ class CompactionServiceTest : public DBTestBase { compactor_statistics_ = CreateDBStatistics(); compaction_service_ = std::make_shared( - dbname_, *options, compactor_statistics_, remote_listeners); + dbname_, *options, compactor_statistics_, remote_listeners, + remote_table_properties_collector_factories); options->compaction_service = compaction_service_; DestroyAndReopen(*options); } @@ -206,6 +218,8 @@ class CompactionServiceTest : public DBTestBase { } std::vector> remote_listeners; + std::vector> + remote_table_properties_collector_factories; private: std::shared_ptr compactor_statistics_; @@ -827,6 +841,96 @@ TEST_F(CompactionServiceTest, RemoteEventListener) { } } +TEST_F(CompactionServiceTest, TablePropertiesCollector) { + const static std::string kUserPropertyName = "TestCount"; + + class TablePropertiesCollectorTest : public TablePropertiesCollector { + public: + Status Finish(UserCollectedProperties* properties) override { + *properties = UserCollectedProperties{ + {kUserPropertyName, std::to_string(count_)}, + }; + return Status::OK(); + } + + UserCollectedProperties GetReadableProperties() const override { + return UserCollectedProperties(); + } + + const char* Name() const override { return "TablePropertiesCollectorTest"; } + + Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/, + EntryType /*type*/, SequenceNumber /*seq*/, + uint64_t /*file_size*/) override { + count_++; + return Status::OK(); + } + + private: + uint32_t count_ = 0; + }; + + class TablePropertiesCollectorFactoryTest + : public TablePropertiesCollectorFactory { + public: + TablePropertiesCollector* CreateTablePropertiesCollector( + TablePropertiesCollectorFactory::Context /*context*/) override { + return new TablePropertiesCollectorTest(); + } + + const char* Name() const override { + return "TablePropertiesCollectorFactoryTest"; + } + }; + + auto factory = new TablePropertiesCollectorFactoryTest(); + remote_table_properties_collector_factories.emplace_back(factory); + + const int kNumSst = 3; + const int kLevel0Trigger = 4; + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kLevel0Trigger; + ReopenWithCompactionService(&options); + + // generate a few SSTs locally which should not have user property + for (int i = 0; i < kNumSst; i++) { + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(i * 10 + j), "value")); + } + ASSERT_OK(Flush()); + } + + TablePropertiesCollection fname_to_props; + ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props)); + for (const auto& file_props : fname_to_props) { + auto properties = file_props.second->user_collected_properties; + auto it = properties.find(kUserPropertyName); + ASSERT_EQ(it, properties.end()); + } + + // trigger compaction + for (int i = kNumSst; i < kLevel0Trigger; i++) { + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(i * 10 + j), "value")); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); + + ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props)); + + bool has_user_property = false; + for (const auto& file_props : fname_to_props) { + auto properties = file_props.second->user_collected_properties; + auto it = properties.find(kUserPropertyName); + if (it != properties.end()) { + has_user_property = true; + ASSERT_GT(std::stoi(it->second), 0); + } + } + ASSERT_TRUE(has_user_property); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index fb93a4408..3160212f3 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -832,6 +832,8 @@ Status DB::OpenAndCompact( override_options.table_factory; compaction_input.column_family.options.sst_partitioner_factory = override_options.sst_partitioner_factory; + compaction_input.column_family.options.table_properties_collector_factories = + override_options.table_properties_collector_factories; compaction_input.db_options.listeners = override_options.listeners; std::vector column_families; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f4cbfa125..68c335364 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1973,6 +1973,11 @@ struct CompactionServiceOptionsOverride { // returned to CompactionService primary host, to collect that, the user needs // to set it here. std::shared_ptr statistics = nullptr; + + // Only compaction generated SST files use this user defined table properties + // collector. + std::vector> + table_properties_collector_factories; }; struct OpenAndCompactOptions {