Add EventListener::OnExternalFileIngested() event

Summary:
Add EventListener::OnExternalFileIngested() to allow user to subscribe to external file ingestion events
Closes https://github.com/facebook/rocksdb/pull/1623

Differential Revision: D4285844

Pulled By: IslamAbdelRahman

fbshipit-source-id: 0b95a88
This commit is contained in:
Islam AbdelRahman 2016-12-06 13:56:17 -08:00
parent 7768975517
commit f04765f7cf
6 changed files with 128 additions and 7 deletions

View File

@ -6541,9 +6541,35 @@ Status DBImpl::IngestExternalFile(
// Cleanup
ingestion_job.Cleanup(status);
if (status.ok()) {
NotifyOnExternalFileIngested(cfd, ingestion_job);
}
return status;
}
void DBImpl::NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.empty()) {
return;
}
for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
ExternalFileIngestionInfo info;
info.cf_name = cfd->GetName();
info.external_file_path = f.external_file_path;
info.internal_file_path = f.internal_file_path;
info.global_seqno = f.assigned_seqno;
info.table_properties = f.table_properties;
for (auto listener : immutable_db_options_.listeners) {
listener->OnExternalFileIngested(this, info);
}
}
#endif
}
void DBImpl::WaitForIngestFile() {
mutex_.AssertHeld();
while (num_running_ingest_file_ > 0) {

View File

@ -23,6 +23,7 @@
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
@ -556,6 +557,9 @@ class DBImpl : public DB {
void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
const MemTableInfo& mem_table_info);
void NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;

View File

@ -336,6 +336,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
file_to_ingest->table_properties = *props;
return status;
}

View File

@ -38,6 +38,8 @@ struct IngestedFileInfo {
uint64_t num_entries;
// Id of column family this file shoule be ingested into
uint32_t cf_id;
// TableProperties read from external file
TableProperties table_properties;
// Version of external file
int version;
@ -98,6 +100,10 @@ class ExternalSstFileIngestionJob {
VersionEdit* edit() { return &edit_; }
const autovector<IngestedFileInfo>& files_to_ingest() const {
return files_to_ingest_;
}
private:
// Open the external file and populate `file_to_ingest` with all the
// external information we need to ingest this file.

View File

@ -28,7 +28,8 @@ class ExternalSSTFileTest : public DBTestBase {
const Options options,
std::vector<std::pair<std::string, std::string>> data, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
// Generate a file id if not provided
if (file_id == -1) {
file_id = last_file_id_ + 1;
@ -51,7 +52,8 @@ class ExternalSSTFileTest : public DBTestBase {
data.resize(uniq_iter - data.begin());
}
std::string file_path = sst_files_dir_ + ToString(file_id);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator,
cfh);
Status s = sst_file_writer.Open(file_path);
if (!s.ok()) {
@ -69,7 +71,11 @@ class ExternalSSTFileTest : public DBTestBase {
if (s.ok()) {
IngestExternalFileOptions ifo;
ifo.allow_global_seqno = allow_global_seqno;
s = db_->IngestExternalFile({file_path}, ifo);
if (cfh) {
s = db_->IngestExternalFile(cfh, {file_path}, ifo);
} else {
s = db_->IngestExternalFile({file_path}, ifo);
}
}
if (s.ok() && true_data) {
@ -84,25 +90,29 @@ class ExternalSSTFileTest : public DBTestBase {
Status GenerateAndAddExternalFile(
const Options options, std::vector<std::pair<int, std::string>> data,
int file_id = -1, bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& entry : data) {
file_data.emplace_back(Key(entry.first), entry.second);
}
return GenerateAndAddExternalFile(options, file_data, file_id,
allow_global_seqno, sort_data, true_data);
allow_global_seqno, sort_data, true_data,
cfh);
}
Status GenerateAndAddExternalFile(
const Options options, std::vector<int> keys, int file_id = -1,
bool allow_global_seqno = false, bool sort_data = false,
std::map<std::string, std::string>* true_data = nullptr) {
std::map<std::string, std::string>* true_data = nullptr,
ColumnFamilyHandle* cfh = nullptr) {
std::vector<std::pair<std::string, std::string>> file_data;
for (auto& k : keys) {
file_data.emplace_back(Key(k), Key(k) + ToString(file_id));
}
return GenerateAndAddExternalFile(options, file_data, file_id,
allow_global_seqno, sort_data, true_data);
allow_global_seqno, sort_data, true_data,
cfh);
}
Status DeprecatedAddFile(const std::vector<std::string>& files,
@ -1835,6 +1845,56 @@ TEST_F(ExternalSSTFileTest, FileWithCFInfo) {
ASSERT_OK(db_->IngestExternalFile(handles_[0], {unknown_sst}, ifo));
}
class TestIngestExternalFileListener : public EventListener {
public:
void OnExternalFileIngested(DB* db,
const ExternalFileIngestionInfo& info) override {
ingested_files.push_back(info);
}
std::vector<ExternalFileIngestionInfo> ingested_files;
};
TEST_F(ExternalSSTFileTest, IngestionListener) {
Options options = CurrentOptions();
TestIngestExternalFileListener* listener =
new TestIngestExternalFileListener();
options.listeners.emplace_back(listener);
CreateAndReopenWithCF({"koko", "toto"}, options);
// Ingest into default cf
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[0]));
ASSERT_EQ(listener->ingested_files.size(), 1);
ASSERT_EQ(listener->ingested_files.back().cf_name, "default");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"default");
// Ingest into cf1
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[1]));
ASSERT_EQ(listener->ingested_files.size(), 2);
ASSERT_EQ(listener->ingested_files.back().cf_name, "koko");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
1);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"koko");
// Ingest into cf2
ASSERT_OK(GenerateAndAddExternalFile(options, {1, 2}, -1, true, true, nullptr,
handles_[2]));
ASSERT_EQ(listener->ingested_files.size(), 3);
ASSERT_EQ(listener->ingested_files.back().cf_name, "toto");
ASSERT_EQ(listener->ingested_files.back().global_seqno, 0);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_id,
2);
ASSERT_EQ(listener->ingested_files.back().table_properties.column_family_name,
"toto");
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

View File

@ -170,6 +170,20 @@ struct MemTableInfo {
};
struct ExternalFileIngestionInfo {
// the name of the column family
std::string cf_name;
// Path of the file outside the DB
std::string external_file_path;
// Path of the file inside the DB
std::string internal_file_path;
// The global sequence number assigned to keys in this file
SequenceNumber global_seqno;
// Table properties of the table being flushed
TableProperties table_properties;
};
// EventListener class contains a set of call-back functions that will
// be called when specific RocksDB event happens such as flush. It can
// be used as a building block for developing custom features such as
@ -291,6 +305,15 @@ class EventListener {
virtual void OnColumnFamilyHandleDeletionStarted(ColumnFamilyHandle* handle) {
}
// A call-back function for RocksDB which will be called after an external
// file is ingested using IngestExternalFile.
//
// Note that the this function will run on the same thread as
// IngestExternalFile(), if this function is blocked, IngestExternalFile()
// will be blocked from finishing.
virtual void OnExternalFileIngested(
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
virtual ~EventListener() {}
};