diff --git a/HISTORY.md b/HISTORY.md index 052bd4835..4d2990ad8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -18,6 +18,7 @@ * Added an option to dynamically charge an updating estimated memory usage of block-based table reader to block cache if block cache available. To enable this feature, set `BlockBasedTableOptions::reserve_table_reader_memory = true`. * Add new stat ASYNC_READ_BYTES that calculates number of bytes read during async read call and users can check if async code path is being called by RocksDB internal automatic prefetching for sequential reads. * Enable async prefetching if ReadOptions.readahead_size is set along with ReadOptions.async_io in FilePrefetchBuffer. +* Add event listener support on remote compaction compactor side. ### Behavior changes * Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794). diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index ef797307e..6a0630b97 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1251,7 +1251,7 @@ void CompactionJob::NotifyOnSubcompactionBegin( if (shutting_down_->load(std::memory_order_acquire)) { return; } - if (c->is_manual_compaction() && + if (c->is_manual_compaction() && manual_compaction_paused_ && manual_compaction_paused_->load(std::memory_order_acquire) > 0) { return; } diff --git a/db/compaction/compaction_service_test.cc b/db/compaction/compaction_service_test.cc index 867cb08ef..9d4a19077 100644 --- a/db/compaction/compaction_service_test.cc +++ b/db/compaction/compaction_service_test.cc @@ -12,13 +12,16 @@ namespace ROCKSDB_NAMESPACE { class MyTestCompactionService : public CompactionService { public: - MyTestCompactionService(std::string db_path, Options& options, - std::shared_ptr& statistics) + MyTestCompactionService( + std::string db_path, Options& options, + std::shared_ptr& statistics, + std::vector>& listeners) : db_path_(std::move(db_path)), options_(options), statistics_(statistics), start_info_("na", "na", "na", 0, Env::TOTAL), - wait_info_("na", "na", "na", 0, Env::TOTAL) {} + wait_info_("na", "na", "na", 0, Env::TOTAL), + listeners_(listeners) {} static const char* kClassName() { return "MyTestCompactionService"; } @@ -71,6 +74,9 @@ class MyTestCompactionService : public CompactionService { options_override.table_factory = options_.table_factory; options_override.sst_partitioner_factory = options_.sst_partitioner_factory; options_override.statistics = statistics_; + if (!listeners_.empty()) { + options_override.listeners = listeners_; + } Status s = DB::OpenAndCompact( db_path_, db_path_ + "/" + ROCKSDB_NAMESPACE::ToString(info.job_id), @@ -129,6 +135,7 @@ class MyTestCompactionService : public CompactionService { CompactionServiceJobStatus::kFailure; bool is_override_wait_result_ = false; std::string override_wait_result_; + std::vector> listeners_; }; class CompactionServiceTest : public DBTestBase { @@ -144,7 +151,7 @@ class CompactionServiceTest : public DBTestBase { compactor_statistics_ = CreateDBStatistics(); compaction_service_ = std::make_shared( - dbname_, *options, compactor_statistics_); + dbname_, *options, compactor_statistics_, remote_listeners); options->compaction_service = compaction_service_; DestroyAndReopen(*options); } @@ -192,6 +199,8 @@ class CompactionServiceTest : public DBTestBase { } } + std::vector> remote_listeners; + private: std::shared_ptr compactor_statistics_; std::shared_ptr primary_statistics_; @@ -685,6 +694,88 @@ TEST_F(CompactionServiceTest, FallbackLocalManual) { VerifyTestData(); } +TEST_F(CompactionServiceTest, RemoteEventListener) { + class RemoteEventListenerTest : public EventListener { + public: + const char* Name() const override { return "RemoteEventListenerTest"; } + + void OnSubcompactionBegin(const SubcompactionJobInfo& info) override { + auto result = on_going_compactions.emplace(info.job_id); + ASSERT_TRUE(result.second); // make sure there's no duplication + compaction_num++; + EventListener::OnSubcompactionBegin(info); + } + void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override { + auto num = on_going_compactions.erase(info.job_id); + ASSERT_TRUE(num == 1); // make sure the compaction id exists + EventListener::OnSubcompactionCompleted(info); + } + void OnTableFileCreated(const TableFileCreationInfo& info) override { + ASSERT_EQ(on_going_compactions.count(info.job_id), 1); + file_created++; + EventListener::OnTableFileCreated(info); + } + void OnTableFileCreationStarted( + const TableFileCreationBriefInfo& info) override { + ASSERT_EQ(on_going_compactions.count(info.job_id), 1); + file_creation_started++; + EventListener::OnTableFileCreationStarted(info); + } + + bool ShouldBeNotifiedOnFileIO() override { + file_io_notified++; + return EventListener::ShouldBeNotifiedOnFileIO(); + } + + std::atomic_uint64_t file_io_notified{0}; + std::atomic_uint64_t file_creation_started{0}; + std::atomic_uint64_t file_created{0}; + + std::set on_going_compactions; // store the job_id + std::atomic_uint64_t compaction_num{0}; + }; + + auto listener = new RemoteEventListenerTest(); + remote_listeners.emplace_back(listener); + + Options options = CurrentOptions(); + ReopenWithCompactionService(&options); + + for (int i = 0; i < 20; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 10 + j; + ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 10; j++) { + int key_id = i * 20 + j * 2; + ASSERT_OK(Put(Key(key_id), "value_new" + ToString(key_id))); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // check the events are triggered + ASSERT_TRUE(listener->file_io_notified > 0); + ASSERT_TRUE(listener->file_creation_started > 0); + ASSERT_TRUE(listener->file_created > 0); + ASSERT_TRUE(listener->compaction_num > 0); + ASSERT_TRUE(listener->on_going_compactions.empty()); + + // verify result + for (int i = 0; i < 200; i++) { + auto result = Get(Key(i)); + if (i % 2) { + ASSERT_EQ(result, "value" + ToString(i)); + } else { + ASSERT_EQ(result, "value_new" + ToString(i)); + } + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index 37b2955b3..8f1051edb 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -825,6 +825,7 @@ Status DB::OpenAndCompact( override_options.table_factory; compaction_input.column_family.options.sst_partitioner_factory = override_options.sst_partitioner_factory; + compaction_input.db_options.listeners = override_options.listeners; std::vector column_families; column_families.push_back(compaction_input.column_family); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9e9305e9c..727a543df 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1932,6 +1932,14 @@ struct CompactionServiceOptionsOverride { std::shared_ptr table_factory; std::shared_ptr sst_partitioner_factory = nullptr; + // Only subsets of events are triggered in remote compaction worker, like: + // `OnTableFileCreated`, `OnTableFileCreationStarted`, + // `ShouldBeNotifiedOnFileIO` `OnSubcompactionBegin`, + // `OnSubcompactionCompleted`, etc. Worth mentioning, `OnCompactionBegin` and + // `OnCompactionCompleted` won't be triggered. They will be triggered on the + // primary DB side. + std::vector> listeners; + // statistics is used to collect DB operation metrics, the metrics won't be // returned to CompactionService primary host, to collect that, the user needs // to set it here.