Add EventListener::OnExternalFileIngested() event
Summary: Add EventListener::OnExternalFileIngested() to allow user to subscribe to external file ingestion events Closes https://github.com/facebook/rocksdb/pull/1623 Differential Revision: D4285844 Pulled By: IslamAbdelRahman fbshipit-source-id: 0b95a88
This commit is contained in:
parent
86abd221aa
commit
80474e6791
@ -6466,9 +6466,35 @@ Status DBImpl::IngestExternalFile(
|
||||
// Cleanup
|
||||
ingestion_job.Cleanup(status);
|
||||
|
||||
if (status.ok()) {
|
||||
NotifyOnExternalFileIngested(cfd, ingestion_job);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
void DBImpl::NotifyOnExternalFileIngested(
|
||||
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (immutable_db_options_.listeners.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
|
||||
ExternalFileIngestionInfo info;
|
||||
info.cf_name = cfd->GetName();
|
||||
info.external_file_path = f.external_file_path;
|
||||
info.internal_file_path = f.internal_file_path;
|
||||
info.global_seqno = f.assigned_seqno;
|
||||
info.table_properties = f.table_properties;
|
||||
for (auto listener : immutable_db_options_.listeners) {
|
||||
listener->OnExternalFileIngested(this, info);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
}
|
||||
|
||||
void DBImpl::WaitForIngestFile() {
|
||||
mutex_.AssertHeld();
|
||||
while (num_running_ingest_file_ > 0) {
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "db/column_family.h"
|
||||
#include "db/compaction_job.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "db/external_sst_file_ingestion_job.h"
|
||||
#include "db/flush_job.h"
|
||||
#include "db/flush_scheduler.h"
|
||||
#include "db/internal_stats.h"
|
||||
@ -549,6 +550,9 @@ class DBImpl : public DB {
|
||||
void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
|
||||
const MemTableInfo& mem_table_info);
|
||||
|
||||
void NotifyOnExternalFileIngested(
|
||||
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
|
||||
|
||||
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
|
||||
|
||||
void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
|
||||
|
@ -336,6 +336,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
|
||||
|
||||
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
|
||||
|
||||
file_to_ingest->table_properties = *props;
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,8 @@ struct IngestedFileInfo {
|
||||
uint64_t num_entries;
|
||||
// Id of column family this file shoule be ingested into
|
||||
uint32_t cf_id;
|
||||
// TableProperties read from external file
|
||||
TableProperties table_properties;
|
||||
// Version of external file
|
||||
int version;
|
||||
|
||||
@ -98,6 +100,10 @@ class ExternalSstFileIngestionJob {
|
||||
|
||||
VersionEdit* edit() { return &edit_; }
|
||||
|
||||
const autovector<IngestedFileInfo>& files_to_ingest() const {
|
||||
return files_to_ingest_;
|
||||
}
|
||||
|
||||
private:
|
||||
// Open the external file and populate `file_to_ingest` with all the
|
||||
// external information we need to ingest this file.
|
||||
|
@ -28,7 +28,8 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
const Options options,
|
||||
std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
|
||||
bool allow_global_seqno = false, bool sort_data = false,
|
||||
std::map<std::string, std::string>* true_data = nullptr) {
|
||||
std::map<std::string, std::string>* true_data = nullptr,
|
||||
ColumnFamilyHandle* cfh = nullptr) {
|
||||
// Generate a file id if not provided
|
||||
if (file_id == -1) {
|
||||
file_id = last_file_id_ + 1;
|
||||
@ -51,7 +52,8 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
data.resize(uniq_iter - data.begin());
|
||||
}
|
||||
std::string file_path = sst_files_dir_ + ToString(file_id);
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator,
|
||||
cfh);
|
||||
|
||||
Status s = sst_file_writer.Open(file_path);
|
||||
if (!s.ok()) {
|
||||
@ -69,7 +71,11 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
if (s.ok()) {
|
||||
IngestExternalFileOptions ifo;
|
||||
ifo.allow_global_seqno = allow_global_seqno;
|
||||
s = db_->IngestExternalFile({file_path}, ifo);
|
||||
if (cfh) {
|
||||
s = db_->IngestExternalFile(cfh, {file_path}, ifo);
|
||||
} else {
|
||||
s = db_->IngestExternalFile({file_path}, ifo);
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok() && true_data) {
|
||||
@ -84,25 +90,29 @@ class ExternalSSTFileTest : public DBTestBase {
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<std::pair<int, std::string>> data,
|
||||
int file_id = -1, bool allow_global_seqno = false, bool sort_data = false,
|
||||
std::map<std::string, std::string>* true_data = nullptr) {
|
||||
std::map<std::string, std::string>* true_data = nullptr,
|
||||
ColumnFamilyHandle* cfh = nullptr) {
|
||||
std::vector<std::pair<std::string, std::string>> file_data;
|
||||
for (auto& entry : data) {
|
||||
file_data.emplace_back(Key(entry.first), entry.second);
|
||||
}
|
||||
return GenerateAndAddExternalFile(options, file_data, file_id,
|
||||
allow_global_seqno, sort_data, true_data);
|
||||
allow_global_seqno, sort_data, true_data,
|
||||
cfh);
|
||||
}
|
||||
|
||||
Status GenerateAndAddExternalFile(
|
||||
const Options options, std::vector<int> keys, int file_id = -1,
|
||||
bool allow_global_seqno = false, bool sort_data = false,
|
||||
std::map<std::string, std::string>* true_data = nullptr) {
|
||||
std::map<std::string, std::string>* true_data = nullptr,
|
||||
ColumnFamilyHandle* cfh = nullptr) {
|
||||
std::vector<std::pair<std::string, std::string>> file_data;
|
||||
for (auto& k : keys) {
|
||||
file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
|
||||
}
|
||||
return GenerateAndAddExternalFile(options, file_data, file_id,
|
||||
allow_global_seqno, sort_data, true_data);
|
||||
allow_global_seqno, sort_data, true_data,
|
||||
cfh);
|
||||
}
|
||||
|
||||
Status DeprecatedAddFile(const std::vector<std::string>& files,
|
||||
@ -1835,6 +1845,56 @@ TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
|
||||
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
|
||||
}
|
||||
|
||||
class TestIngestExternalFileListener : public EventListener {
|
||||
public:
|
||||
void OnExternalFileIngested(DB* db,
|
||||
const ExternalFileIngestionInfo& info) override {
|
||||
ingested_files.push_back(info);
|
||||
}
|
||||
|
||||
std::vector<ExternalFileIngestionInfo> ingested_files;
|
||||
};
|
||||
|
||||
TEST_F(ExternalSSTFileTest, IngestionListener) {
|
||||
Options options = CurrentOptions();
|
||||
TestIngestExternalFileListener* listener =
|
||||
new TestIngestExternalFileListener();
|
||||
options.listeners.emplace_back(listener);
|
||||
CreateAndReopenWithCF({"koko", "toto"}, options);
|
||||
|
||||
// Ingest into default cf
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
|
||||
handles_[0]));
|
||||
ASSERT_EQ(listener->ingested_files.size(), 1);
|
||||
ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
|
||||
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
|
||||
0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
|
||||
"default");
|
||||
|
||||
// Ingest into cf1
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
|
||||
handles_[1]));
|
||||
ASSERT_EQ(listener->ingested_files.size(), 2);
|
||||
ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
|
||||
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
|
||||
1);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
|
||||
"koko");
|
||||
|
||||
// Ingest into cf2
|
||||
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
|
||||
handles_[2]));
|
||||
ASSERT_EQ(listener->ingested_files.size(), 3);
|
||||
ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
|
||||
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
|
||||
2);
|
||||
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
|
||||
"toto");
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -170,6 +170,20 @@ struct MemTableInfo {
|
||||
|
||||
};
|
||||
|
||||
struct ExternalFileIngestionInfo {
|
||||
// the name of the column family
|
||||
std::string cf_name;
|
||||
// Path of the file outside the DB
|
||||
std::string external_file_path;
|
||||
// Path of the file inside the DB
|
||||
std::string internal_file_path;
|
||||
// The global sequence number assigned to keys in this file
|
||||
SequenceNumber global_seqno;
|
||||
// Table properties of the table being flushed
|
||||
TableProperties table_properties;
|
||||
};
|
||||
|
||||
|
||||
// EventListener class contains a set of call-back functions that will
|
||||
// be called when specific RocksDB event happens such as flush. It can
|
||||
// be used as a building block for developing custom features such as
|
||||
@ -291,6 +305,15 @@ class EventListener {
|
||||
virtual void OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle* handle) {
|
||||
}
|
||||
|
||||
// A call-back function for RocksDB which will be called after an external
|
||||
// file is ingested using IngestExternalFile.
|
||||
//
|
||||
// Note that the this function will run on the same thread as
|
||||
// IngestExternalFile(), if this function is blocked, IngestExternalFile()
|
||||
// will be blocked from finishing.
|
||||
virtual void OnExternalFileIngested(
|
||||
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
|
||||
|
||||
virtual ~EventListener() {}
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user