Add TablePropertiesCollector support in SstFileWriter
Summary: Update SstFileWriter to use user TablePropertiesCollectors that are passed in Options Test Plan: unittests Reviewers: sdong Reviewed By: sdong Subscribers: jkedgar, andrewkr, hermanlee4, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D62253
This commit is contained in:
parent
78837f5d61
commit
6a17b07ca8
@ -926,6 +926,52 @@ TEST_F(DBSSTTest, AddExternalSstFile) {
|
|||||||
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
|
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
|
||||||
kSkipFIFOCompaction));
|
kSkipFIFOCompaction));
|
||||||
}
|
}
|
||||||
|
class SstFileWriterCollector : public TablePropertiesCollector {
|
||||||
|
public:
|
||||||
|
explicit SstFileWriterCollector(const std::string prefix) : prefix_(prefix) {
|
||||||
|
name_ = prefix_ + "_SstFileWriterCollector";
|
||||||
|
}
|
||||||
|
|
||||||
|
const char* Name() const override { return name_.c_str(); }
|
||||||
|
|
||||||
|
Status Finish(UserCollectedProperties* properties) override {
|
||||||
|
*properties = UserCollectedProperties{
|
||||||
|
{prefix_ + "_SstFileWriterCollector", "YES"},
|
||||||
|
{prefix_ + "_Count", std::to_string(count_)},
|
||||||
|
};
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type,
|
||||||
|
SequenceNumber seq, uint64_t file_size) override {
|
||||||
|
++count_;
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual UserCollectedProperties GetReadableProperties() const override {
|
||||||
|
return UserCollectedProperties{};
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
uint32_t count_ = 0;
|
||||||
|
std::string prefix_;
|
||||||
|
std::string name_;
|
||||||
|
};
|
||||||
|
|
||||||
|
class SstFileWriterCollectorFactory : public TablePropertiesCollectorFactory {
|
||||||
|
public:
|
||||||
|
explicit SstFileWriterCollectorFactory(std::string prefix)
|
||||||
|
: prefix_(prefix), num_created_(0) {}
|
||||||
|
virtual TablePropertiesCollector* CreateTablePropertiesCollector(
|
||||||
|
TablePropertiesCollectorFactory::Context context) override {
|
||||||
|
num_created_++;
|
||||||
|
return new SstFileWriterCollector(prefix_);
|
||||||
|
}
|
||||||
|
const char* Name() const override { return "SstFileWriterCollectorFactory"; }
|
||||||
|
|
||||||
|
std::string prefix_;
|
||||||
|
uint32_t num_created_;
|
||||||
|
};
|
||||||
|
|
||||||
TEST_F(DBSSTTest, AddExternalSstFileList) {
|
TEST_F(DBSSTTest, AddExternalSstFileList) {
|
||||||
do {
|
do {
|
||||||
@ -934,6 +980,12 @@ TEST_F(DBSSTTest, AddExternalSstFileList) {
|
|||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
options.env = env_;
|
options.env = env_;
|
||||||
|
|
||||||
|
auto abc_collector = std::make_shared<SstFileWriterCollectorFactory>("abc");
|
||||||
|
auto xyz_collector = std::make_shared<SstFileWriterCollectorFactory>("xyz");
|
||||||
|
|
||||||
|
options.table_properties_collector_factories.emplace_back(abc_collector);
|
||||||
|
options.table_properties_collector_factories.emplace_back(xyz_collector);
|
||||||
|
|
||||||
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
||||||
|
|
||||||
// file1.sst (0 => 99)
|
// file1.sst (0 => 99)
|
||||||
@ -1042,6 +1094,17 @@ TEST_F(DBSSTTest, AddExternalSstFileList) {
|
|||||||
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TablePropertiesCollection props;
|
||||||
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
|
||||||
|
ASSERT_EQ(props.size(), 2);
|
||||||
|
for (auto file_props : props) {
|
||||||
|
auto user_props = file_props.second->user_collected_properties;
|
||||||
|
ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
|
||||||
|
ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
|
||||||
|
ASSERT_EQ(user_props["abc_Count"], "100");
|
||||||
|
ASSERT_EQ(user_props["xyz_Count"], "100");
|
||||||
|
}
|
||||||
|
|
||||||
// Add file while holding a snapshot will fail
|
// Add file while holding a snapshot will fail
|
||||||
const Snapshot* s1 = db_->GetSnapshot();
|
const Snapshot* s1 = db_->GetSnapshot();
|
||||||
if (s1 != nullptr) {
|
if (s1 != nullptr) {
|
||||||
@ -1055,6 +1118,16 @@ TEST_F(DBSSTTest, AddExternalSstFileList) {
|
|||||||
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
|
||||||
|
ASSERT_EQ(props.size(), 3);
|
||||||
|
for (auto file_props : props) {
|
||||||
|
auto user_props = file_props.second->user_collected_properties;
|
||||||
|
ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
|
||||||
|
ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
|
||||||
|
ASSERT_EQ(user_props["abc_Count"], "100");
|
||||||
|
ASSERT_EQ(user_props["xyz_Count"], "100");
|
||||||
|
}
|
||||||
|
|
||||||
// This file list has overlapping values with the exisitng data
|
// This file list has overlapping values with the exisitng data
|
||||||
s = db_->AddFile(file_list3);
|
s = db_->AddFile(file_list3);
|
||||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||||
|
@ -117,9 +117,20 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
|||||||
|
|
||||||
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
|
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
|
||||||
int_tbl_prop_collector_factories;
|
int_tbl_prop_collector_factories;
|
||||||
|
|
||||||
|
// SstFileWriter properties collector to add SstFileWriter version.
|
||||||
int_tbl_prop_collector_factories.emplace_back(
|
int_tbl_prop_collector_factories.emplace_back(
|
||||||
new SstFileWriterPropertiesCollectorFactory(1 /* version */));
|
new SstFileWriterPropertiesCollectorFactory(1 /* version */));
|
||||||
|
|
||||||
|
// User collector factories
|
||||||
|
auto user_collector_factories =
|
||||||
|
r->ioptions.table_properties_collector_factories;
|
||||||
|
for (size_t i = 0; i < user_collector_factories.size(); i++) {
|
||||||
|
int_tbl_prop_collector_factories.emplace_back(
|
||||||
|
new UserKeyTablePropertiesCollectorFactory(
|
||||||
|
user_collector_factories[i]));
|
||||||
|
}
|
||||||
|
|
||||||
TableBuilderOptions table_builder_options(
|
TableBuilderOptions table_builder_options(
|
||||||
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
|
r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
|
||||||
compression_type, r->ioptions.compression_opts,
|
compression_type, r->ioptions.compression_opts,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user