Add subcompaction event API (#9311)
Summary: Add event callback for subcompaction and adds a sub_job_id to identify it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9311 Reviewed By: ajkr Differential Revision: D33892707 Pulled By: jay-zhuang fbshipit-source-id: 57b5e5e594d61b2112d480c18a79a36751f65a4e
This commit is contained in:
parent
a86ee02d34
commit
f092f0fa5d
@ -403,7 +403,7 @@ endif()
|
|||||||
|
|
||||||
option(WITH_TSAN "build with TSAN" OFF)
|
option(WITH_TSAN "build with TSAN" OFF)
|
||||||
if(WITH_TSAN)
|
if(WITH_TSAN)
|
||||||
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread -pie")
|
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread -Wl,-pie")
|
||||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fPIC")
|
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fPIC")
|
||||||
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=thread -fPIC")
|
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fsanitize=thread -fPIC")
|
||||||
if(WITH_JEMALLOC)
|
if(WITH_JEMALLOC)
|
||||||
|
@ -68,6 +68,7 @@
|
|||||||
* Add Transaction::SetReadTimestampForValidation() and Transaction::SetCommitTimestamp(). Default impl returns NotSupported().
|
* Add Transaction::SetReadTimestampForValidation() and Transaction::SetCommitTimestamp(). Default impl returns NotSupported().
|
||||||
* Add support for decimal patterns to ObjectLibrary::PatternEntry
|
* Add support for decimal patterns to ObjectLibrary::PatternEntry
|
||||||
* Remove deprecated remote compaction APIs `CompactionService::Start()` and `CompactionService::WaitForComplete()`. Please use `CompactionService::StartV2()`, `CompactionService::WaitForCompleteV2()` instead, which provides the same information plus extra data like priority, db_id, etc.
|
* Remove deprecated remote compaction APIs `CompactionService::Start()` and `CompactionService::WaitForComplete()`. Please use `CompactionService::StartV2()`, `CompactionService::WaitForCompleteV2()` instead, which provides the same information plus extra data like priority, db_id, etc.
|
||||||
|
* Add subcompaction callback APIs: `OnSubcompactionBegin()` and `OnSubcompactionCompleted()`.
|
||||||
|
|
||||||
### Behavior Changes
|
### Behavior Changes
|
||||||
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.
|
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.
|
||||||
|
@ -292,7 +292,7 @@ class Compaction {
|
|||||||
|
|
||||||
int GetInputBaseLevel() const;
|
int GetInputBaseLevel() const;
|
||||||
|
|
||||||
CompactionReason compaction_reason() { return compaction_reason_; }
|
CompactionReason compaction_reason() const { return compaction_reason_; }
|
||||||
|
|
||||||
const std::vector<FileMetaData*>& grandparents() const {
|
const std::vector<FileMetaData*>& grandparents() const {
|
||||||
return grandparents_;
|
return grandparents_;
|
||||||
|
@ -195,6 +195,10 @@ struct CompactionJob::SubcompactionState {
|
|||||||
// within the same compaction job.
|
// within the same compaction job.
|
||||||
const uint32_t sub_job_id;
|
const uint32_t sub_job_id;
|
||||||
|
|
||||||
|
// Notify on sub-compaction completion only if listener was notified on
|
||||||
|
// sub-compaction begin.
|
||||||
|
bool notify_on_subcompaction_completion = false;
|
||||||
|
|
||||||
SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size,
|
SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size,
|
||||||
uint32_t _sub_job_id)
|
uint32_t _sub_job_id)
|
||||||
: compaction(c),
|
: compaction(c),
|
||||||
@ -1215,8 +1219,82 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
|
|||||||
compaction_result.bytes_written);
|
compaction_result.bytes_written);
|
||||||
return CompactionServiceJobStatus::kSuccess;
|
return CompactionServiceJobStatus::kSuccess;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CompactionJob::BuildSubcompactionJobInfo(
|
||||||
|
SubcompactionState* sub_compact,
|
||||||
|
SubcompactionJobInfo* subcompaction_job_info) const {
|
||||||
|
Compaction* c = compact_->compaction;
|
||||||
|
ColumnFamilyData* cfd = c->column_family_data();
|
||||||
|
|
||||||
|
subcompaction_job_info->cf_id = cfd->GetID();
|
||||||
|
subcompaction_job_info->cf_name = cfd->GetName();
|
||||||
|
subcompaction_job_info->status = sub_compact->status;
|
||||||
|
subcompaction_job_info->thread_id = env_->GetThreadID();
|
||||||
|
subcompaction_job_info->job_id = job_id_;
|
||||||
|
subcompaction_job_info->subcompaction_job_id = sub_compact->sub_job_id;
|
||||||
|
subcompaction_job_info->base_input_level = c->start_level();
|
||||||
|
subcompaction_job_info->output_level = c->output_level();
|
||||||
|
subcompaction_job_info->stats = sub_compact->compaction_job_stats;
|
||||||
|
}
|
||||||
#endif // !ROCKSDB_LITE
|
#endif // !ROCKSDB_LITE
|
||||||
|
|
||||||
|
void CompactionJob::NotifyOnSubcompactionBegin(
|
||||||
|
SubcompactionState* sub_compact) {
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
Compaction* c = compact_->compaction;
|
||||||
|
|
||||||
|
if (db_options_.listeners.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (shutting_down_->load(std::memory_order_acquire)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (c->is_manual_compaction() &&
|
||||||
|
manual_compaction_paused_->load(std::memory_order_acquire) > 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub_compact->notify_on_subcompaction_completion = true;
|
||||||
|
|
||||||
|
SubcompactionJobInfo info{};
|
||||||
|
BuildSubcompactionJobInfo(sub_compact, &info);
|
||||||
|
|
||||||
|
for (auto listener : db_options_.listeners) {
|
||||||
|
listener->OnSubcompactionBegin(info);
|
||||||
|
}
|
||||||
|
info.status.PermitUncheckedError();
|
||||||
|
|
||||||
|
#else
|
||||||
|
(void)sub_compact;
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
|
}
|
||||||
|
|
||||||
|
void CompactionJob::NotifyOnSubcompactionCompleted(
|
||||||
|
SubcompactionState* sub_compact) {
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
|
||||||
|
if (db_options_.listeners.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (shutting_down_->load(std::memory_order_acquire)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sub_compact->notify_on_subcompaction_completion == false) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SubcompactionJobInfo info{};
|
||||||
|
BuildSubcompactionJobInfo(sub_compact, &info);
|
||||||
|
|
||||||
|
for (auto listener : db_options_.listeners) {
|
||||||
|
listener->OnSubcompactionCompleted(info);
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
(void)sub_compact;
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
|
}
|
||||||
|
|
||||||
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||||
assert(sub_compact);
|
assert(sub_compact);
|
||||||
assert(sub_compact->compaction);
|
assert(sub_compact->compaction);
|
||||||
@ -1255,6 +1333,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NotifyOnSubcompactionBegin(sub_compact);
|
||||||
|
|
||||||
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
|
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
|
||||||
existing_snapshots_);
|
existing_snapshots_);
|
||||||
|
|
||||||
@ -1614,6 +1694,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
|||||||
clip.reset();
|
clip.reset();
|
||||||
raw_input.reset();
|
raw_input.reset();
|
||||||
sub_compact->status = status;
|
sub_compact->status = status;
|
||||||
|
NotifyOnSubcompactionCompleted(sub_compact);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) {
|
uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) {
|
||||||
|
@ -167,6 +167,16 @@ class CompactionJob {
|
|||||||
void UpdateCompactionInputStatsHelper(
|
void UpdateCompactionInputStatsHelper(
|
||||||
int* num_files, uint64_t* bytes_read, int input_level);
|
int* num_files, uint64_t* bytes_read, int input_level);
|
||||||
|
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
void BuildSubcompactionJobInfo(
|
||||||
|
SubcompactionState* sub_compact,
|
||||||
|
SubcompactionJobInfo* subcompaction_job_info) const;
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
|
void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact);
|
||||||
|
|
||||||
|
void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact);
|
||||||
|
|
||||||
uint32_t job_id_;
|
uint32_t job_id_;
|
||||||
|
|
||||||
CompactionJobStats* compaction_job_stats_;
|
CompactionJobStats* compaction_job_stats_;
|
||||||
|
@ -4671,6 +4671,98 @@ TEST_F(DBCompactionTest, CompactionStatsTest) {
|
|||||||
VerifyCompactionStats(*cfd, *collector);
|
VerifyCompactionStats(*cfd, *collector);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBCompactionTest, SubcompactionEvent) {
|
||||||
|
class SubCompactionEventListener : public EventListener {
|
||||||
|
public:
|
||||||
|
void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
ASSERT_EQ(running_compactions_.find(ci.job_id),
|
||||||
|
running_compactions_.end());
|
||||||
|
running_compactions_.emplace(ci.job_id, std::unordered_set<int>());
|
||||||
|
}
|
||||||
|
|
||||||
|
void OnCompactionCompleted(DB* /*db*/,
|
||||||
|
const CompactionJobInfo& ci) override {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
auto it = running_compactions_.find(ci.job_id);
|
||||||
|
ASSERT_NE(it, running_compactions_.end());
|
||||||
|
ASSERT_EQ(it->second.size(), 0);
|
||||||
|
running_compactions_.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
|
void OnSubcompactionBegin(const SubcompactionJobInfo& si) override {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
auto it = running_compactions_.find(si.job_id);
|
||||||
|
ASSERT_NE(it, running_compactions_.end());
|
||||||
|
auto r = it->second.insert(si.subcompaction_job_id);
|
||||||
|
ASSERT_TRUE(r.second); // each subcompaction_job_id should be different
|
||||||
|
total_subcompaction_cnt_++;
|
||||||
|
}
|
||||||
|
|
||||||
|
void OnSubcompactionCompleted(const SubcompactionJobInfo& si) override {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
auto it = running_compactions_.find(si.job_id);
|
||||||
|
ASSERT_NE(it, running_compactions_.end());
|
||||||
|
auto r = it->second.erase(si.subcompaction_job_id);
|
||||||
|
ASSERT_EQ(r, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t GetRunningCompactionCount() {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
return running_compactions_.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t GetTotalSubcompactionCount() {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
return total_subcompaction_cnt_;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
InstrumentedMutex mutex_;
|
||||||
|
std::unordered_map<int, std::unordered_set<int>> running_compactions_;
|
||||||
|
size_t total_subcompaction_cnt_ = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.target_file_size_base = 1024;
|
||||||
|
options.level0_file_num_compaction_trigger = 10;
|
||||||
|
auto* listener = new SubCompactionEventListener();
|
||||||
|
options.listeners.emplace_back(listener);
|
||||||
|
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// generate 4 files @ L2
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
for (int j = 0; j < 10; j++) {
|
||||||
|
int key_id = i * 10 + j;
|
||||||
|
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
|
||||||
|
}
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
}
|
||||||
|
MoveFilesToLevel(2);
|
||||||
|
|
||||||
|
// generate 2 files @ L1 which overlaps with L2 files
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
for (int j = 0; j < 10; j++) {
|
||||||
|
int key_id = i * 20 + j * 2;
|
||||||
|
ASSERT_OK(Put(Key(key_id), "value" + ToString(key_id)));
|
||||||
|
}
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
}
|
||||||
|
MoveFilesToLevel(1);
|
||||||
|
ASSERT_EQ(FilesPerLevel(), "0,2,4");
|
||||||
|
|
||||||
|
CompactRangeOptions comp_opts;
|
||||||
|
comp_opts.max_subcompactions = 4;
|
||||||
|
Status s = dbfull()->CompactRange(comp_opts, nullptr, nullptr);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||||
|
// make sure there's no running compaction
|
||||||
|
ASSERT_EQ(listener->GetRunningCompactionCount(), 0);
|
||||||
|
// and sub compaction is triggered
|
||||||
|
ASSERT_GT(listener->GetTotalSubcompactionCount(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
|
TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
|
||||||
// LSM setup:
|
// LSM setup:
|
||||||
// L1: [ba bz]
|
// L1: [ba bz]
|
||||||
|
@ -363,6 +363,42 @@ struct CompactionFileInfo {
|
|||||||
uint64_t oldest_blob_file_number;
|
uint64_t oldest_blob_file_number;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct SubcompactionJobInfo {
|
||||||
|
~SubcompactionJobInfo() { status.PermitUncheckedError(); }
|
||||||
|
// the id of the column family where the compaction happened.
|
||||||
|
uint32_t cf_id;
|
||||||
|
// the name of the column family where the compaction happened.
|
||||||
|
std::string cf_name;
|
||||||
|
// the status indicating whether the compaction was successful or not.
|
||||||
|
Status status;
|
||||||
|
// the id of the thread that completed this compaction job.
|
||||||
|
uint64_t thread_id;
|
||||||
|
// the job id, which is unique in the same thread.
|
||||||
|
int job_id;
|
||||||
|
|
||||||
|
// sub-compaction job id, which is only unique within the same compaction, so
|
||||||
|
// use both 'job_id' and 'subcompaction_job_id' to identify a subcompaction
|
||||||
|
// within an instance.
|
||||||
|
// For non subcompaction job, it's set to -1.
|
||||||
|
int subcompaction_job_id;
|
||||||
|
// the smallest input level of the compaction.
|
||||||
|
int base_input_level;
|
||||||
|
// the output level of the compaction.
|
||||||
|
int output_level;
|
||||||
|
|
||||||
|
// Reason to run the compaction
|
||||||
|
CompactionReason compaction_reason;
|
||||||
|
|
||||||
|
// Compression algorithm used for output files
|
||||||
|
CompressionType compression;
|
||||||
|
|
||||||
|
// Statistics and other additional details on the compaction
|
||||||
|
CompactionJobStats stats;
|
||||||
|
|
||||||
|
// Compression algorithm used for blob output files.
|
||||||
|
CompressionType blob_compression_type;
|
||||||
|
};
|
||||||
|
|
||||||
struct CompactionJobInfo {
|
struct CompactionJobInfo {
|
||||||
~CompactionJobInfo() { status.PermitUncheckedError(); }
|
~CompactionJobInfo() { status.PermitUncheckedError(); }
|
||||||
// the id of the column family where the compaction happened.
|
// the id of the column family where the compaction happened.
|
||||||
@ -375,6 +411,7 @@ struct CompactionJobInfo {
|
|||||||
uint64_t thread_id;
|
uint64_t thread_id;
|
||||||
// the job id, which is unique in the same thread.
|
// the job id, which is unique in the same thread.
|
||||||
int job_id;
|
int job_id;
|
||||||
|
|
||||||
// the smallest input level of the compaction.
|
// the smallest input level of the compaction.
|
||||||
int base_input_level;
|
int base_input_level;
|
||||||
// the output level of the compaction.
|
// the output level of the compaction.
|
||||||
@ -579,6 +616,43 @@ class EventListener : public Customizable {
|
|||||||
virtual void OnCompactionCompleted(DB* /*db*/,
|
virtual void OnCompactionCompleted(DB* /*db*/,
|
||||||
const CompactionJobInfo& /*ci*/) {}
|
const CompactionJobInfo& /*ci*/) {}
|
||||||
|
|
||||||
|
// A callback function to RocksDB which will be called before a sub-compaction
|
||||||
|
// begins. If a compaction is split to 2 sub-compactions, it will trigger one
|
||||||
|
// `OnCompactionBegin()` first, then two `OnSubcompactionBegin()`.
|
||||||
|
// If compaction is not split, it will still trigger one
|
||||||
|
// `OnSubcompactionBegin()`, as internally, compaction is always handled by
|
||||||
|
// sub-compaction. The default implementation is a no-op.
|
||||||
|
//
|
||||||
|
// Note that this function must be implemented in a way such that
|
||||||
|
// it should not run for an extended period of time before the function
|
||||||
|
// returns. Otherwise, RocksDB may be blocked.
|
||||||
|
//
|
||||||
|
// @param ci a reference to a CompactionJobInfo struct, it contains a
|
||||||
|
// `sub_job_id` which is only unique within the specified compaction (which
|
||||||
|
// can be identified by `job_id`). 'ci' is released after this function is
|
||||||
|
// returned, and must be copied if it's needed outside this function.
|
||||||
|
// Note: `table_properties` is not set for sub-compaction, the information
|
||||||
|
// could be got from `OnCompactionBegin()`.
|
||||||
|
virtual void OnSubcompactionBegin(const SubcompactionJobInfo& /*si*/) {}
|
||||||
|
|
||||||
|
// A callback function to RocksDB which will be called whenever a
|
||||||
|
// sub-compaction completed. The same as `OnSubcompactionBegin()`, if a
|
||||||
|
// compaction is split to 2 sub-compactions, it will be triggered twice. If
|
||||||
|
// a compaction is not split, it will still be triggered once.
|
||||||
|
// The default implementation is a no-op.
|
||||||
|
//
|
||||||
|
// Note that this function must be implemented in a way such that
|
||||||
|
// it should not run for an extended period of time before the function
|
||||||
|
// returns. Otherwise, RocksDB may be blocked.
|
||||||
|
//
|
||||||
|
// @param ci a reference to a CompactionJobInfo struct, it contains a
|
||||||
|
// `sub_job_id` which is only unique within the specified compaction (which
|
||||||
|
// can be identified by `job_id`). 'ci' is released after this function is
|
||||||
|
// returned, and must be copied if it's needed outside this function.
|
||||||
|
// Note: `table_properties` is not set for sub-compaction, the information
|
||||||
|
// could be got from `OnCompactionCompleted()`.
|
||||||
|
virtual void OnSubcompactionCompleted(const SubcompactionJobInfo& /*si*/) {}
|
||||||
|
|
||||||
// A callback function for RocksDB which will be called whenever
|
// A callback function for RocksDB which will be called whenever
|
||||||
// a SST file is created. Different from OnCompactionCompleted and
|
// a SST file is created. Different from OnCompactionCompleted and
|
||||||
// OnFlushCompleted, this callback is designed for external logging
|
// OnFlushCompleted, this callback is designed for external logging
|
||||||
|
Loading…
Reference in New Issue
Block a user