diff --git a/db/db_impl.cc b/db/db_impl.cc index 4e3d7dcb9..b7d6fbcbc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -6541,9 +6541,35 @@ Status DBImpl::IngestExternalFile( // Cleanup ingestion_job.Cleanup(status); + if (status.ok()) { + NotifyOnExternalFileIngested(cfd, ingestion_job); + } + return status; } +void DBImpl::NotifyOnExternalFileIngested( + ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) { +#ifndef ROCKSDB_LITE + if (immutable_db_options_.listeners.empty()) { + return; + } + + for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) { + ExternalFileIngestionInfo info; + info.cf_name = cfd->GetName(); + info.external_file_path = f.external_file_path; + info.internal_file_path = f.internal_file_path; + info.global_seqno = f.assigned_seqno; + info.table_properties = f.table_properties; + for (auto listener : immutable_db_options_.listeners) { + listener->OnExternalFileIngested(this, info); + } + } + +#endif +} + void DBImpl::WaitForIngestFile() { mutex_.AssertHeld(); while (num_running_ingest_file_ > 0) { diff --git a/db/db_impl.h b/db/db_impl.h index cfedd8103..c87299b0f 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -23,6 +23,7 @@ #include "db/column_family.h" #include "db/compaction_job.h" #include "db/dbformat.h" +#include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" #include "db/internal_stats.h" @@ -556,6 +557,9 @@ class DBImpl : public DB { void NotifyOnMemTableSealed(ColumnFamilyData* cfd, const MemTableInfo& mem_table_info); + void NotifyOnExternalFileIngested( + ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job); + void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 543ff48f7..688c68215 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -336,6 +336,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( file_to_ingest->cf_id = static_cast(props->column_family_id); + file_to_ingest->table_properties = *props; + return status; } diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 954dcb98b..d711d5dc2 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -38,6 +38,8 @@ struct IngestedFileInfo { uint64_t num_entries; // Id of column family this file shoule be ingested into uint32_t cf_id; + // TableProperties read from external file + TableProperties table_properties; // Version of external file int version; @@ -98,6 +100,10 @@ class ExternalSstFileIngestionJob { VersionEdit* edit() { return &edit_; } + const autovector& files_to_ingest() const { + return files_to_ingest_; + } + private: // Open the external file and populate `file_to_ingest` with all the // external information we need to ingest this file. diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 3120d43da..fc57eedee 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -28,7 +28,8 @@ class ExternalSSTFileTest : public DBTestBase { const Options options, std::vector> data, int file_id = -1, bool allow_global_seqno = false, bool sort_data = false, - std::map* true_data = nullptr) { + std::map* true_data = nullptr, + ColumnFamilyHandle* cfh = nullptr) { // Generate a file id if not provided if (file_id == -1) { file_id = last_file_id_ + 1; @@ -51,7 +52,8 @@ class ExternalSSTFileTest : public DBTestBase { data.resize(uniq_iter - data.begin()); } std::string file_path = sst_files_dir_ + ToString(file_id); - SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator, + cfh); Status s = sst_file_writer.Open(file_path); if (!s.ok()) { @@ -69,7 +71,11 @@ class ExternalSSTFileTest : public DBTestBase { if (s.ok()) { IngestExternalFileOptions ifo; ifo.allow_global_seqno = allow_global_seqno; - s = db_->IngestExternalFile({file_path}, ifo); + if (cfh) { + s = db_->IngestExternalFile(cfh, {file_path}, ifo); + } else { + s = db_->IngestExternalFile({file_path}, ifo); + } } if (s.ok() && true_data) { @@ -84,25 +90,29 @@ class ExternalSSTFileTest : public DBTestBase { Status GenerateAndAddExternalFile( const Options options, std::vector> data, int file_id = -1, bool allow_global_seqno = false, bool sort_data = false, - std::map* true_data = nullptr) { + std::map* true_data = nullptr, + ColumnFamilyHandle* cfh = nullptr) { std::vector> file_data; for (auto& entry : data) { file_data.emplace_back(Key(entry.first), entry.second); } return GenerateAndAddExternalFile(options, file_data, file_id, - allow_global_seqno, sort_data, true_data); + allow_global_seqno, sort_data, true_data, + cfh); } Status GenerateAndAddExternalFile( const Options options, std::vector keys, int file_id = -1, bool allow_global_seqno = false, bool sort_data = false, - std::map* true_data = nullptr) { + std::map* true_data = nullptr, + ColumnFamilyHandle* cfh = nullptr) { std::vector> file_data; for (auto& k : keys) { file_data.emplace_back(Key(k), Key(k) + ToString(file_id)); } return GenerateAndAddExternalFile(options, file_data, file_id, - allow_global_seqno, sort_data, true_data); + allow_global_seqno, sort_data, true_data, + cfh); } Status DeprecatedAddFile(const std::vector& files, @@ -1835,6 +1845,56 @@ TEST_F(ExternalSSTFileTest, FileWithCFInfo) { ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo)); } +class TestIngestExternalFileListener : public EventListener { + public: + void OnExternalFileIngested(DB* db, + const ExternalFileIngestionInfo& info) override { + ingested_files.push_back(info); + } + + std::vector ingested_files; +}; + +TEST_F(ExternalSSTFileTest, IngestionListener) { + Options options = CurrentOptions(); + TestIngestExternalFileListener* listener = + new TestIngestExternalFileListener(); + options.listeners.emplace_back(listener); + CreateAndReopenWithCF({"koko", "toto"}, options); + + // Ingest into default cf + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr, + handles_[0])); + ASSERT_EQ(listener->ingested_files.size(), 1); + ASSERT_EQ(listener->ingested_files.back().cf_name, "default"); + ASSERT_EQ(listener->ingested_files.back().global_seqno, 0); + ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id, + 0); + ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name, + "default"); + + // Ingest into cf1 + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr, + handles_[1])); + ASSERT_EQ(listener->ingested_files.size(), 2); + ASSERT_EQ(listener->ingested_files.back().cf_name, "koko"); + ASSERT_EQ(listener->ingested_files.back().global_seqno, 0); + ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id, + 1); + ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name, + "koko"); + + // Ingest into cf2 + ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr, + handles_[2])); + ASSERT_EQ(listener->ingested_files.size(), 3); + ASSERT_EQ(listener->ingested_files.back().cf_name, "toto"); + ASSERT_EQ(listener->ingested_files.back().global_seqno, 0); + ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id, + 2); + ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name, + "toto"); +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 06a48cad2..37a3961bb 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -170,6 +170,20 @@ struct MemTableInfo { }; +struct ExternalFileIngestionInfo { + // the name of the column family + std::string cf_name; + // Path of the file outside the DB + std::string external_file_path; + // Path of the file inside the DB + std::string internal_file_path; + // The global sequence number assigned to keys in this file + SequenceNumber global_seqno; + // Table properties of the table being flushed + TableProperties table_properties; +}; + + // EventListener class contains a set of call-back functions that will // be called when specific RocksDB event happens such as flush. It can // be used as a building block for developing custom features such as @@ -291,6 +305,15 @@ class EventListener { virtual void OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle* handle) { } + // A call-back function for RocksDB which will be called after an external + // file is ingested using IngestExternalFile. + // + // Note that the this function will run on the same thread as + // IngestExternalFile(), if this function is blocked, IngestExternalFile() + // will be blocked from finishing. + virtual void OnExternalFileIngested( + DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {} + virtual ~EventListener() {} };