diff --git a/db/db_impl.h b/db/db_impl.h index c9640e0ef..de87d61de 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -735,6 +735,11 @@ class DBImpl : public DB { const MutableCFOptions& mutable_cf_options, int job_id, TableProperties prop); + void NotifyOnCompactionBegin(ColumnFamilyData* cfd, + Compaction *c, const Status &st, + const CompactionJobStats& job_stats, + int job_id); + void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction *c, const Status &st, const CompactionJobStats& job_stats, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 3c262ace6..6297c7bed 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -744,6 +744,69 @@ Status DBImpl::ContinueBackgroundWork() { return Status::OK(); } +void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, + Compaction *c, const Status &st, + const CompactionJobStats& job_stats, + int job_id) { +#ifndef ROCKSDB_LITE + if (immutable_db_options_.listeners.empty()) { + return; + } + mutex_.AssertHeld(); + if (shutting_down_.load(std::memory_order_acquire)) { + return; + } + Version* current = cfd->current(); + current->Ref(); + // release lock while notifying events + mutex_.Unlock(); + TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex"); + { + CompactionJobInfo info; + info.cf_name = cfd->GetName(); + info.status = st; + info.thread_id = env_->GetThreadID(); + info.job_id = job_id; + info.base_input_level = c->start_level(); + info.output_level = c->output_level(); + info.stats = job_stats; + info.table_properties = c->GetOutputTableProperties(); + info.compaction_reason = c->compaction_reason(); + info.compression = c->output_compression(); + for (size_t i = 0; i < c->num_input_levels(); ++i) { + for (const auto fmd : *c->inputs(i)) { + auto fn = TableFileName(c->immutable_cf_options()->cf_paths, + fmd->fd.GetNumber(), fmd->fd.GetPathId()); + info.input_files.push_back(fn); + if (info.table_properties.count(fn) == 0) { + std::shared_ptr tp; + auto s = current->GetTableProperties(&tp, fmd, &fn); + if (s.ok()) { + info.table_properties[fn] = tp; + } + } + } + } + for (const auto newf : c->edit()->GetNewFiles()) { + info.output_files.push_back(TableFileName( + c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(), + newf.second.fd.GetPathId())); + } + for (auto listener : immutable_db_options_.listeners) { + listener->OnCompactionBegin(this, info); + } + } + mutex_.Lock(); + current->Unref(); +#else + (void)cfd; + (void)c; + (void)st; + (void)job_stats; + (void)job_id; +#endif // ROCKSDB_LITE +} + void DBImpl::NotifyOnCompactionCompleted( ColumnFamilyData* cfd, Compaction* c, const Status& st, const CompactionJobStats& compaction_job_stats, const int job_id) { @@ -1945,6 +2008,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, compaction_job_stats.num_input_files = c->num_input_files(0); + NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, + compaction_job_stats, job_context->job_id); + for (const auto& f : *c->inputs(0)) { c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); } @@ -1969,6 +2035,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, compaction_job_stats.num_input_files = c->num_input_files(0); + NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, + compaction_job_stats, job_context->job_id); + // Move files to next level int32_t moved_files = 0; int64_t moved_bytes = 0; @@ -2068,6 +2137,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &compaction_job_stats); compaction_job.Prepare(); + NotifyOnCompactionBegin(c->column_family_data(), c.get(), status, + compaction_job_stats, job_context->job_id); + mutex_.Unlock(); compaction_job.Run(); TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); diff --git a/db/db_test2.cc b/db/db_test2.cc index 0e345f8b2..f2eae2355 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1228,7 +1228,14 @@ TEST_F(DBTest2, CompressionOptions) { class CompactionStallTestListener : public EventListener { public: - CompactionStallTestListener() : compacted_files_cnt_(0) {} + CompactionStallTestListener() : compacting_files_cnt_(0), compacted_files_cnt_(0) {} + + void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override { + ASSERT_EQ(ci.cf_name, "default"); + ASSERT_EQ(ci.base_input_level, 0); + ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum); + compacting_files_cnt_ += ci.input_files.size(); + } void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override { ASSERT_EQ(ci.cf_name, "default"); @@ -1236,6 +1243,8 @@ class CompactionStallTestListener : public EventListener { ASSERT_EQ(ci.compaction_reason, CompactionReason::kLevelL0FilesNum); compacted_files_cnt_ += ci.input_files.size(); } + + std::atomic compacting_files_cnt_; std::atomic compacted_files_cnt_; }; @@ -1244,6 +1253,8 @@ TEST_F(DBTest2, CompactionStall) { {{"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:0"}, {"DBImpl::BGWorkCompaction", "DBTest2::CompactionStall:1"}, {"DBTest2::CompactionStall:2", + "DBImpl::NotifyOnCompactionBegin::UnlockMutex"}, + {"DBTest2::CompactionStall:3", "DBImpl::NotifyOnCompactionCompleted::UnlockMutex"}}); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -1285,14 +1296,18 @@ TEST_F(DBTest2, CompactionStall) { // Wait for another compaction to be triggered TEST_SYNC_POINT("DBTest2::CompactionStall:1"); - // Hold NotifyOnCompactionCompleted in the unlock mutex section + // Hold NotifyOnCompactionBegin in the unlock mutex section TEST_SYNC_POINT("DBTest2::CompactionStall:2"); + // Hold NotifyOnCompactionCompleted in the unlock mutex section + TEST_SYNC_POINT("DBTest2::CompactionStall:3"); + dbfull()->TEST_WaitForCompact(); ASSERT_LT(NumTableFilesAtLevel(0), options.level0_file_num_compaction_trigger); ASSERT_GT(listener->compacted_files_cnt_.load(), 10 - options.level0_file_num_compaction_trigger); + ASSERT_EQ(listener->compacting_files_cnt_.load(), listener->compacted_files_cnt_.load()); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index e48642590..b55a33982 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -299,6 +299,16 @@ class EventListener { // returned value. virtual void OnTableFileDeleted(const TableFileDeletionInfo& /*info*/) {} + // A callback function to RocksDB which will be called before a + // RocksDB starts to compact. The default implementation is + // no-op. + // + // Note that the this function must be implemented in a way such that + // it should not run for an extended period of time before the function + // returns. Otherwise, RocksDB may be blocked. + virtual void OnCompactionBegin(DB* /*db*/, + const CompactionJobInfo& /*ci*/) {} + // A callback function for RocksDB which will be called whenever // a registered RocksDB compacts a file. The default implementation // is a no-op.