Add table_properties_collector_factories override (#9995)
Summary: Add table_properties_collector_factories override on the remote side. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9995 Test Plan: unittest added Reviewed By: ajkr Differential Revision: D36392623 Pulled By: jay-zhuang fbshipit-source-id: 3ba031294d90247ca063d7de7b43178d38e3f66a
This commit is contained in:
parent
0070680cfd
commit
b84e3363f5
@ -10,6 +10,7 @@
|
||||
### New Features
|
||||
* DB::GetLiveFilesStorageInfo is ready for production use.
|
||||
* Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion.
|
||||
* RemoteCompaction supports table_properties_collector_factories override on compaction worker.
|
||||
|
||||
### Public API changes
|
||||
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.
|
||||
|
@ -15,13 +15,17 @@ class MyTestCompactionService : public CompactionService {
|
||||
MyTestCompactionService(
|
||||
std::string db_path, Options& options,
|
||||
std::shared_ptr<Statistics>& statistics,
|
||||
std::vector<std::shared_ptr<EventListener>>& listeners)
|
||||
std::vector<std::shared_ptr<EventListener>>& listeners,
|
||||
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
||||
table_properties_collector_factories)
|
||||
: db_path_(std::move(db_path)),
|
||||
options_(options),
|
||||
statistics_(statistics),
|
||||
start_info_("na", "na", "na", 0, Env::TOTAL),
|
||||
wait_info_("na", "na", "na", 0, Env::TOTAL),
|
||||
listeners_(listeners) {}
|
||||
listeners_(listeners),
|
||||
table_properties_collector_factories_(
|
||||
std::move(table_properties_collector_factories)) {}
|
||||
|
||||
static const char* kClassName() { return "MyTestCompactionService"; }
|
||||
|
||||
@ -78,6 +82,11 @@ class MyTestCompactionService : public CompactionService {
|
||||
options_override.listeners = listeners_;
|
||||
}
|
||||
|
||||
if (!table_properties_collector_factories_.empty()) {
|
||||
options_override.table_properties_collector_factories =
|
||||
table_properties_collector_factories_;
|
||||
}
|
||||
|
||||
OpenAndCompactOptions options;
|
||||
options.canceled = &canceled_;
|
||||
|
||||
@ -141,6 +150,8 @@ class MyTestCompactionService : public CompactionService {
|
||||
bool is_override_wait_result_ = false;
|
||||
std::string override_wait_result_;
|
||||
std::vector<std::shared_ptr<EventListener>> listeners_;
|
||||
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
||||
table_properties_collector_factories_;
|
||||
std::atomic_bool canceled_{false};
|
||||
};
|
||||
|
||||
@ -157,7 +168,8 @@ class CompactionServiceTest : public DBTestBase {
|
||||
compactor_statistics_ = CreateDBStatistics();
|
||||
|
||||
compaction_service_ = std::make_shared<MyTestCompactionService>(
|
||||
dbname_, *options, compactor_statistics_, remote_listeners);
|
||||
dbname_, *options, compactor_statistics_, remote_listeners,
|
||||
remote_table_properties_collector_factories);
|
||||
options->compaction_service = compaction_service_;
|
||||
DestroyAndReopen(*options);
|
||||
}
|
||||
@ -206,6 +218,8 @@ class CompactionServiceTest : public DBTestBase {
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<EventListener>> remote_listeners;
|
||||
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
||||
remote_table_properties_collector_factories;
|
||||
|
||||
private:
|
||||
std::shared_ptr<Statistics> compactor_statistics_;
|
||||
@ -827,6 +841,96 @@ TEST_F(CompactionServiceTest, RemoteEventListener) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CompactionServiceTest, TablePropertiesCollector) {
|
||||
const static std::string kUserPropertyName = "TestCount";
|
||||
|
||||
class TablePropertiesCollectorTest : public TablePropertiesCollector {
|
||||
public:
|
||||
Status Finish(UserCollectedProperties* properties) override {
|
||||
*properties = UserCollectedProperties{
|
||||
{kUserPropertyName, std::to_string(count_)},
|
||||
};
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
UserCollectedProperties GetReadableProperties() const override {
|
||||
return UserCollectedProperties();
|
||||
}
|
||||
|
||||
const char* Name() const override { return "TablePropertiesCollectorTest"; }
|
||||
|
||||
Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
|
||||
EntryType /*type*/, SequenceNumber /*seq*/,
|
||||
uint64_t /*file_size*/) override {
|
||||
count_++;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
uint32_t count_ = 0;
|
||||
};
|
||||
|
||||
class TablePropertiesCollectorFactoryTest
|
||||
: public TablePropertiesCollectorFactory {
|
||||
public:
|
||||
TablePropertiesCollector* CreateTablePropertiesCollector(
|
||||
TablePropertiesCollectorFactory::Context /*context*/) override {
|
||||
return new TablePropertiesCollectorTest();
|
||||
}
|
||||
|
||||
const char* Name() const override {
|
||||
return "TablePropertiesCollectorFactoryTest";
|
||||
}
|
||||
};
|
||||
|
||||
auto factory = new TablePropertiesCollectorFactoryTest();
|
||||
remote_table_properties_collector_factories.emplace_back(factory);
|
||||
|
||||
const int kNumSst = 3;
|
||||
const int kLevel0Trigger = 4;
|
||||
Options options = CurrentOptions();
|
||||
options.level0_file_num_compaction_trigger = kLevel0Trigger;
|
||||
ReopenWithCompactionService(&options);
|
||||
|
||||
// generate a few SSTs locally which should not have user property
|
||||
for (int i = 0; i < kNumSst; i++) {
|
||||
for (int j = 0; j < 100; j++) {
|
||||
ASSERT_OK(Put(Key(i * 10 + j), "value"));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
|
||||
TablePropertiesCollection fname_to_props;
|
||||
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
|
||||
for (const auto& file_props : fname_to_props) {
|
||||
auto properties = file_props.second->user_collected_properties;
|
||||
auto it = properties.find(kUserPropertyName);
|
||||
ASSERT_EQ(it, properties.end());
|
||||
}
|
||||
|
||||
// trigger compaction
|
||||
for (int i = kNumSst; i < kLevel0Trigger; i++) {
|
||||
for (int j = 0; j < 100; j++) {
|
||||
ASSERT_OK(Put(Key(i * 10 + j), "value"));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
|
||||
|
||||
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
|
||||
|
||||
bool has_user_property = false;
|
||||
for (const auto& file_props : fname_to_props) {
|
||||
auto properties = file_props.second->user_collected_properties;
|
||||
auto it = properties.find(kUserPropertyName);
|
||||
if (it != properties.end()) {
|
||||
has_user_property = true;
|
||||
ASSERT_GT(std::stoi(it->second), 0);
|
||||
}
|
||||
}
|
||||
ASSERT_TRUE(has_user_property);
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -832,6 +832,8 @@ Status DB::OpenAndCompact(
|
||||
override_options.table_factory;
|
||||
compaction_input.column_family.options.sst_partitioner_factory =
|
||||
override_options.sst_partitioner_factory;
|
||||
compaction_input.column_family.options.table_properties_collector_factories =
|
||||
override_options.table_properties_collector_factories;
|
||||
compaction_input.db_options.listeners = override_options.listeners;
|
||||
|
||||
std::vector<ColumnFamilyDescriptor> column_families;
|
||||
|
@ -1973,6 +1973,11 @@ struct CompactionServiceOptionsOverride {
|
||||
// returned to CompactionService primary host, to collect that, the user needs
|
||||
// to set it here.
|
||||
std::shared_ptr<Statistics> statistics = nullptr;
|
||||
|
||||
// Only compaction generated SST files use this user defined table properties
|
||||
// collector.
|
||||
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
||||
table_properties_collector_factories;
|
||||
};
|
||||
|
||||
struct OpenAndCompactOptions {
|
||||
|
Loading…
Reference in New Issue
Block a user