[rocksdb] new CompactionFilterV2 API
Summary: This diff adds a new CompactionFilterV2 API that roll up the decisions of kv pairs during compactions. These kv pairs must share the same key prefix. They are buffered inside the db. typedef std::vector<Slice> SliceVector; virtual std::vector<bool> Filter(int level, const SliceVector& keys, const SliceVector& existing_values, std::vector<std::string>* new_values, std::vector<bool>* values_changed ) const = 0; Application can override the Filter() function to operate on the buffered kv pairs. More details in the inline documentation. Test Plan: make check. Added unit tests to make sure Keep, Delete, Change all works. Reviewers: haobo CCs: leveldb Differential Revision: https://reviews.facebook.net/D15087
This commit is contained in:
parent
cda4006e87
commit
b47812fba6
@ -15,11 +15,13 @@
|
||||
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
|
||||
* Added a command "checkconsistency" in ldb tool, which checks
|
||||
if file system state matches DB state (file existence and file sizes)
|
||||
* CompactionFilter::Context is now CompactionFilterContext. It is shared by CompactionFilter and CompactionFilterV2
|
||||
|
||||
### New Features
|
||||
* If we find one truncated record at the end of the MANIFEST or WAL files,
|
||||
we will ignore it. We assume that writers of these records were interrupted
|
||||
and that we can safely ignore it.
|
||||
* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB.
|
||||
|
||||
## 2.7.0 (01/28/2014)
|
||||
|
||||
|
517
db/db_impl.cc
517
db/db_impl.cc
@ -70,6 +70,7 @@ namespace rocksdb {
|
||||
int DBImpl::SuperVersion::dummy = 0;
|
||||
void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy;
|
||||
void* const DBImpl::SuperVersion::kSVObsolete = nullptr;
|
||||
const std::string kNullString = "NULL";
|
||||
|
||||
void DumpLeveldbBuildVersion(Logger * log);
|
||||
|
||||
@ -118,12 +119,129 @@ struct DBImpl::CompactionState {
|
||||
}
|
||||
|
||||
// Create a client visible context of this compaction
|
||||
CompactionFilter::Context GetFilterContext() {
|
||||
CompactionFilter::Context context;
|
||||
CompactionFilterContext GetFilterContext() {
|
||||
CompactionFilterContext context;
|
||||
context.is_full_compaction = compaction->IsFullCompaction();
|
||||
context.is_manual_compaction = compaction->IsManualCompaction();
|
||||
return context;
|
||||
}
|
||||
|
||||
std::vector<Slice> key_buf_;
|
||||
std::vector<Slice> existing_value_buf_;
|
||||
std::vector<std::string> key_str_buf_;
|
||||
std::vector<std::string> existing_value_str_buf_;
|
||||
// new_value_buf_ will only be appended if a value changes
|
||||
std::vector<std::string> new_value_buf_;
|
||||
// if values_changed_buf_[i] is true
|
||||
// new_value_buf_ will add a new entry with the changed value
|
||||
std::vector<bool> value_changed_buf_;
|
||||
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
|
||||
std::vector<bool> to_delete_buf_;
|
||||
// buffer for the parsed internal keys, the string buffer is backed
|
||||
// by key_str_buf_
|
||||
std::vector<ParsedInternalKey> ikey_buf_;
|
||||
|
||||
std::vector<Slice> other_key_buf_;
|
||||
std::vector<Slice> other_value_buf_;
|
||||
std::vector<std::string> other_key_str_buf_;
|
||||
std::vector<std::string> other_value_str_buf_;
|
||||
|
||||
std::vector<Slice> combined_key_buf_;
|
||||
std::vector<Slice> combined_value_buf_;
|
||||
|
||||
std::string cur_prefix_;
|
||||
|
||||
// Buffers the kv-pair that will be run through compaction filter V2
|
||||
// in the future.
|
||||
void BufferKeyValueSlices(const Slice& key, const Slice& value) {
|
||||
key_str_buf_.emplace_back(key.ToString());
|
||||
existing_value_str_buf_.emplace_back(value.ToString());
|
||||
key_buf_.emplace_back(Slice(key_str_buf_.back()));
|
||||
existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back()));
|
||||
|
||||
ParsedInternalKey ikey;
|
||||
ParseInternalKey(key_buf_.back(), &ikey);
|
||||
ikey_buf_.emplace_back(ikey);
|
||||
}
|
||||
|
||||
// Buffers the kv-pair that will not be run through compaction filter V2
|
||||
// in the future.
|
||||
void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
|
||||
other_key_str_buf_.emplace_back(key.ToString());
|
||||
other_value_str_buf_.emplace_back(value.ToString());
|
||||
other_key_buf_.emplace_back(Slice(other_key_str_buf_.back()));
|
||||
other_value_buf_.emplace_back(Slice(other_value_str_buf_.back()));
|
||||
}
|
||||
|
||||
// Add a kv-pair to the combined buffer
|
||||
void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
|
||||
// The real strings are stored in the batch buffers
|
||||
combined_key_buf_.emplace_back(key);
|
||||
combined_value_buf_.emplace_back(value);
|
||||
}
|
||||
|
||||
// Merging the two buffers
|
||||
void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
|
||||
size_t i = 0;
|
||||
size_t j = 0;
|
||||
size_t total_size = key_buf_.size() + other_key_buf_.size();
|
||||
combined_key_buf_.reserve(total_size);
|
||||
combined_value_buf_.reserve(total_size);
|
||||
|
||||
while (i + j < total_size) {
|
||||
int comp_res = 0;
|
||||
if (i < key_buf_.size() && j < other_key_buf_.size()) {
|
||||
comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]);
|
||||
} else if (i >= key_buf_.size() && j < other_key_buf_.size()) {
|
||||
comp_res = 1;
|
||||
} else if (j >= other_key_buf_.size() && i < key_buf_.size()) {
|
||||
comp_res = -1;
|
||||
}
|
||||
if (comp_res > 0) {
|
||||
AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]);
|
||||
j++;
|
||||
} else if (comp_res < 0) {
|
||||
AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void CleanupBatchBuffer() {
|
||||
to_delete_buf_.clear();
|
||||
key_buf_.clear();
|
||||
existing_value_buf_.clear();
|
||||
key_str_buf_.clear();
|
||||
existing_value_str_buf_.clear();
|
||||
new_value_buf_.clear();
|
||||
value_changed_buf_.clear();
|
||||
ikey_buf_.clear();
|
||||
|
||||
to_delete_buf_.shrink_to_fit();
|
||||
key_buf_.shrink_to_fit();
|
||||
existing_value_buf_.shrink_to_fit();
|
||||
key_str_buf_.shrink_to_fit();
|
||||
existing_value_str_buf_.shrink_to_fit();
|
||||
new_value_buf_.shrink_to_fit();
|
||||
value_changed_buf_.shrink_to_fit();
|
||||
ikey_buf_.shrink_to_fit();
|
||||
|
||||
other_key_buf_.clear();
|
||||
other_value_buf_.clear();
|
||||
other_key_str_buf_.clear();
|
||||
other_value_str_buf_.clear();
|
||||
other_key_buf_.shrink_to_fit();
|
||||
other_value_buf_.shrink_to_fit();
|
||||
other_key_str_buf_.shrink_to_fit();
|
||||
other_value_str_buf_.shrink_to_fit();
|
||||
}
|
||||
|
||||
void CleanupMergedBuffer() {
|
||||
combined_key_buf_.clear();
|
||||
combined_value_buf_.clear();
|
||||
combined_key_buf_.shrink_to_fit();
|
||||
combined_value_buf_.shrink_to_fit();
|
||||
}
|
||||
};
|
||||
|
||||
// Fix user-supplied options to be reasonable
|
||||
@ -2401,66 +2519,27 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
|
||||
return 0;
|
||||
}
|
||||
|
||||
Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
DeletionState& deletion_state,
|
||||
LogBuffer* log_buffer) {
|
||||
assert(compact);
|
||||
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
||||
Log(options_.info_log,
|
||||
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
|
||||
compact->compaction->num_input_files(0),
|
||||
compact->compaction->level(),
|
||||
compact->compaction->num_input_files(1),
|
||||
compact->compaction->output_level(),
|
||||
compact->compaction->score(),
|
||||
options_.max_background_compactions - bg_compaction_scheduled_);
|
||||
char scratch[2345];
|
||||
compact->compaction->Summary(scratch, sizeof(scratch));
|
||||
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
|
||||
|
||||
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
|
||||
assert(compact->builder == nullptr);
|
||||
assert(!compact->outfile);
|
||||
|
||||
SequenceNumber visible_at_tip = 0;
|
||||
SequenceNumber earliest_snapshot;
|
||||
SequenceNumber latest_snapshot = 0;
|
||||
snapshots_.getAll(compact->existing_snapshots);
|
||||
if (compact->existing_snapshots.size() == 0) {
|
||||
// optimize for fast path if there are no snapshots
|
||||
visible_at_tip = versions_->LastSequence();
|
||||
earliest_snapshot = visible_at_tip;
|
||||
} else {
|
||||
latest_snapshot = compact->existing_snapshots.back();
|
||||
// Add the current seqno as the 'latest' virtual
|
||||
// snapshot to the end of this list.
|
||||
compact->existing_snapshots.push_back(versions_->LastSequence());
|
||||
earliest_snapshot = compact->existing_snapshots[0];
|
||||
}
|
||||
|
||||
// Is this compaction producing files at the bottommost level?
|
||||
bool bottommost_level = compact->compaction->BottomMostLevel();
|
||||
|
||||
// Allocate the output file numbers before we release the lock
|
||||
AllocateCompactionOutputFileNumbers(compact);
|
||||
|
||||
// Release mutex while we're actually doing the compaction work
|
||||
mutex_.Unlock();
|
||||
// flush log buffer immediately after releasing the mutex
|
||||
log_buffer->FlushBufferToLog();
|
||||
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
|
||||
input->SeekToFirst();
|
||||
Status DBImpl::ProcessKeyValueCompaction(
|
||||
SequenceNumber visible_at_tip,
|
||||
SequenceNumber earliest_snapshot,
|
||||
SequenceNumber latest_snapshot,
|
||||
DeletionState& deletion_state,
|
||||
bool bottommost_level,
|
||||
int64_t& imm_micros,
|
||||
Iterator* input,
|
||||
CompactionState* compact,
|
||||
bool is_compaction_v2,
|
||||
LogBuffer* log_buffer) {
|
||||
size_t combined_idx = 0;
|
||||
Status status;
|
||||
std::string compaction_filter_value;
|
||||
ParsedInternalKey ikey;
|
||||
std::string current_user_key;
|
||||
bool has_current_user_key = false;
|
||||
std::vector<char> delete_key; // for compaction filter
|
||||
SequenceNumber last_sequence_for_key __attribute__((unused)) =
|
||||
kMaxSequenceNumber;
|
||||
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
||||
std::string compaction_filter_value;
|
||||
std::vector<char> delete_key; // for compaction filter
|
||||
MergeHelper merge(user_comparator(), options_.merge_operator.get(),
|
||||
options_.info_log.get(),
|
||||
options_.min_partial_merge_operands,
|
||||
@ -2490,12 +2569,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
imm_micros += (env_->NowMicros() - imm_start);
|
||||
}
|
||||
|
||||
Slice key = input->key();
|
||||
Slice value = input->value();
|
||||
Slice key;
|
||||
Slice value;
|
||||
// If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
|
||||
// This prefix batch should contain results after calling
|
||||
// compaction_filter_v2.
|
||||
//
|
||||
// If is_compaction_v2 is off, this function will go through all the
|
||||
// kv-pairs in input.
|
||||
if (!is_compaction_v2) {
|
||||
key = input->key();
|
||||
value = input->value();
|
||||
} else {
|
||||
if (combined_idx >= compact->combined_key_buf_.size()) {
|
||||
break;
|
||||
}
|
||||
assert(combined_idx < compact->combined_key_buf_.size());
|
||||
key = compact->combined_key_buf_[combined_idx];
|
||||
value = compact->combined_value_buf_[combined_idx];
|
||||
|
||||
++combined_idx;
|
||||
}
|
||||
|
||||
if (compact->compaction->ShouldStopBefore(key) &&
|
||||
compact->builder != nullptr) {
|
||||
status = FinishCompactionOutputFile(compact, input.get());
|
||||
status = FinishCompactionOutputFile(compact, input);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -2515,15 +2613,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
} else {
|
||||
if (!has_current_user_key ||
|
||||
user_comparator()->Compare(ikey.user_key,
|
||||
Slice(current_user_key)) != 0) {
|
||||
Slice(current_user_key)) != 0) {
|
||||
// First occurrence of this user key
|
||||
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
|
||||
has_current_user_key = true;
|
||||
last_sequence_for_key = kMaxSequenceNumber;
|
||||
visible_in_snapshot = kMaxSequenceNumber;
|
||||
|
||||
// apply the compaction filter to the first occurrence of the user key
|
||||
if (compaction_filter &&
|
||||
if (compaction_filter && !is_compaction_v2 &&
|
||||
ikey.type == kTypeValue &&
|
||||
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
||||
// If the user has specified a compaction filter and the sequence
|
||||
@ -2535,15 +2632,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
compaction_filter_value.clear();
|
||||
bool to_delete =
|
||||
compaction_filter->Filter(compact->compaction->level(),
|
||||
ikey.user_key, value,
|
||||
&compaction_filter_value,
|
||||
&value_changed);
|
||||
ikey.user_key, value,
|
||||
&compaction_filter_value,
|
||||
&value_changed);
|
||||
if (to_delete) {
|
||||
// make a copy of the original key
|
||||
delete_key.assign(key.data(), key.data() + key.size());
|
||||
// convert it to a delete
|
||||
UpdateInternalKey(&delete_key[0], delete_key.size(),
|
||||
ikey.sequence, kTypeDeletion);
|
||||
ikey.sequence, kTypeDeletion);
|
||||
// anchor the key again
|
||||
key = Slice(&delete_key[0], delete_key.size());
|
||||
// needed because ikey is backed by key
|
||||
@ -2565,8 +2662,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
SequenceNumber visible = visible_at_tip ?
|
||||
visible_at_tip :
|
||||
findEarliestVisibleSnapshot(ikey.sequence,
|
||||
compact->existing_snapshots,
|
||||
&prev_snapshot);
|
||||
compact->existing_snapshots,
|
||||
&prev_snapshot);
|
||||
|
||||
if (visible_in_snapshot == visible) {
|
||||
// If the earliest snapshot is which this key is visible in
|
||||
@ -2578,8 +2675,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
drop = true; // (A)
|
||||
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY);
|
||||
} else if (ikey.type == kTypeDeletion &&
|
||||
ikey.sequence <= earliest_snapshot &&
|
||||
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
|
||||
ikey.sequence <= earliest_snapshot &&
|
||||
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
|
||||
// For this user key:
|
||||
// (1) there is no data in higher levels
|
||||
// (2) data in lower levels will have larger sequence numbers
|
||||
@ -2596,8 +2693,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
// object to minimize change to the existing flow. Turn out this
|
||||
// logic could also be nicely re-used for memtable flush purge
|
||||
// optimization in BuildTable.
|
||||
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level,
|
||||
options_.statistics.get());
|
||||
int steps = 0;
|
||||
merge.MergeUntil(input, prev_snapshot, bottommost_level,
|
||||
options_.statistics.get(), &steps);
|
||||
// Skip the Merge ops
|
||||
combined_idx = combined_idx - 1 + steps;
|
||||
|
||||
current_entry_is_merging = true;
|
||||
if (merge.IsSuccess()) {
|
||||
// Successfully found Put/Delete/(end-of-key-range) while merging
|
||||
@ -2699,7 +2800,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
// Close output file if it is big enough
|
||||
if (compact->builder->FileSize() >=
|
||||
compact->compaction->MaxOutputFileSize()) {
|
||||
status = FinishCompactionOutputFile(compact, input.get());
|
||||
status = FinishCompactionOutputFile(compact, input);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
@ -2736,6 +2837,278 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
}
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
void DBImpl::CallCompactionFilterV2(CompactionState* compact,
|
||||
CompactionFilterV2* compaction_filter_v2) {
|
||||
if (compact == nullptr || compaction_filter_v2 == nullptr) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<Slice> user_key_buf;
|
||||
for (const auto& key : compact->ikey_buf_) {
|
||||
user_key_buf.emplace_back(key.user_key);
|
||||
}
|
||||
|
||||
// If the user has specified a compaction filter and the sequence
|
||||
// number is greater than any external snapshot, then invoke the
|
||||
// filter.
|
||||
// If the return value of the compaction filter is true, replace
|
||||
// the entry with a delete marker.
|
||||
compact->to_delete_buf_ = compaction_filter_v2->Filter(
|
||||
compact->compaction->level(),
|
||||
user_key_buf, compact->existing_value_buf_,
|
||||
&compact->new_value_buf_,
|
||||
&compact->value_changed_buf_);
|
||||
|
||||
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
|
||||
// kv-pairs in this compaction run needs to be deleted.
|
||||
assert(compact->to_delete_buf_.size() ==
|
||||
compact->key_buf_.size());
|
||||
assert(compact->to_delete_buf_.size() ==
|
||||
compact->existing_value_buf_.size());
|
||||
assert(compact->to_delete_buf_.size() ==
|
||||
compact->value_changed_buf_.size());
|
||||
|
||||
int new_value_idx = 0;
|
||||
for (unsigned int i = 0; i < compact->to_delete_buf_.size(); ++i) {
|
||||
if (compact->to_delete_buf_[i]) {
|
||||
// update the string buffer directly
|
||||
// the Slice buffer points to the updated buffer
|
||||
UpdateInternalKey(&compact->key_str_buf_[i][0],
|
||||
compact->key_str_buf_[i].size(),
|
||||
compact->ikey_buf_[i].sequence,
|
||||
kTypeDeletion);
|
||||
|
||||
// no value associated with delete
|
||||
compact->existing_value_buf_[i].clear();
|
||||
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER);
|
||||
} else if (compact->value_changed_buf_[i]) {
|
||||
compact->existing_value_buf_[i] =
|
||||
Slice(compact->new_value_buf_[new_value_idx++]);
|
||||
}
|
||||
} // for
|
||||
}
|
||||
|
||||
Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||
DeletionState& deletion_state,
|
||||
LogBuffer* log_buffer) {
|
||||
assert(compact);
|
||||
compact->CleanupBatchBuffer();
|
||||
compact->CleanupMergedBuffer();
|
||||
compact->cur_prefix_ = kNullString;
|
||||
|
||||
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
||||
Log(options_.info_log,
|
||||
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
|
||||
compact->compaction->num_input_files(0),
|
||||
compact->compaction->level(),
|
||||
compact->compaction->num_input_files(1),
|
||||
compact->compaction->output_level(),
|
||||
compact->compaction->score(),
|
||||
options_.max_background_compactions - bg_compaction_scheduled_);
|
||||
char scratch[2345];
|
||||
compact->compaction->Summary(scratch, sizeof(scratch));
|
||||
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
|
||||
|
||||
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
|
||||
assert(compact->builder == nullptr);
|
||||
assert(!compact->outfile);
|
||||
|
||||
SequenceNumber visible_at_tip = 0;
|
||||
SequenceNumber earliest_snapshot;
|
||||
SequenceNumber latest_snapshot = 0;
|
||||
snapshots_.getAll(compact->existing_snapshots);
|
||||
if (compact->existing_snapshots.size() == 0) {
|
||||
// optimize for fast path if there are no snapshots
|
||||
visible_at_tip = versions_->LastSequence();
|
||||
earliest_snapshot = visible_at_tip;
|
||||
} else {
|
||||
latest_snapshot = compact->existing_snapshots.back();
|
||||
// Add the current seqno as the 'latest' virtual
|
||||
// snapshot to the end of this list.
|
||||
compact->existing_snapshots.push_back(versions_->LastSequence());
|
||||
earliest_snapshot = compact->existing_snapshots[0];
|
||||
}
|
||||
|
||||
// Is this compaction producing files at the bottommost level?
|
||||
bool bottommost_level = compact->compaction->BottomMostLevel();
|
||||
|
||||
// Allocate the output file numbers before we release the lock
|
||||
AllocateCompactionOutputFileNumbers(compact);
|
||||
|
||||
// Release mutex while we're actually doing the compaction work
|
||||
mutex_.Unlock();
|
||||
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
|
||||
input->SeekToFirst();
|
||||
shared_ptr<Iterator> backup_input(
|
||||
versions_->MakeInputIterator(compact->compaction));
|
||||
backup_input->SeekToFirst();
|
||||
|
||||
Status status;
|
||||
ParsedInternalKey ikey;
|
||||
std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2
|
||||
= nullptr;
|
||||
auto context = compact->GetFilterContext();
|
||||
compaction_filter_from_factory_v2 =
|
||||
options_.compaction_filter_factory_v2->CreateCompactionFilterV2(context);
|
||||
auto compaction_filter_v2 =
|
||||
compaction_filter_from_factory_v2.get();
|
||||
|
||||
// temp_backup_input always point to the start of the current buffer
|
||||
// temp_backup_input = backup_input;
|
||||
// iterate through input,
|
||||
// 1) buffer ineligible keys and value keys into 2 separate buffers;
|
||||
// 2) send value_buffer to compaction filter and alternate the values;
|
||||
// 3) merge value_buffer with ineligible_value_buffer;
|
||||
// 4) run the modified "compaction" using the old for loop.
|
||||
if (compaction_filter_v2) {
|
||||
for (; backup_input->Valid() && !shutting_down_.Acquire_Load(); ) {
|
||||
// Prioritize immutable compaction work
|
||||
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
|
||||
const uint64_t imm_start = env_->NowMicros();
|
||||
LogFlush(options_.info_log);
|
||||
mutex_.Lock();
|
||||
if (imm_.IsFlushPending()) {
|
||||
FlushMemTableToOutputFile(nullptr, deletion_state, log_buffer);
|
||||
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
|
||||
}
|
||||
mutex_.Unlock();
|
||||
imm_micros += (env_->NowMicros() - imm_start);
|
||||
}
|
||||
|
||||
Slice key = backup_input->key();
|
||||
Slice value = backup_input->value();
|
||||
|
||||
const SliceTransform* transformer =
|
||||
options_.compaction_filter_factory_v2->GetPrefixExtractor();
|
||||
std::string key_prefix = transformer->Transform(key).ToString();
|
||||
if (compact->cur_prefix_ == kNullString) {
|
||||
compact->cur_prefix_ = key_prefix;
|
||||
}
|
||||
if (!ParseInternalKey(key, &ikey)) {
|
||||
// log error
|
||||
Log(options_.info_log, "Failed to parse key: %s",
|
||||
key.ToString().c_str());
|
||||
continue;
|
||||
} else {
|
||||
// If the prefix remains the same, keep buffering
|
||||
if (key_prefix == compact->cur_prefix_) {
|
||||
// Apply the compaction filter V2 to all the kv pairs sharing
|
||||
// the same prefix
|
||||
if (ikey.type == kTypeValue &&
|
||||
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
||||
// Buffer all keys sharing the same prefix for CompactionFilterV2
|
||||
// Iterate through keys to check prefix
|
||||
compact->BufferKeyValueSlices(key, value);
|
||||
} else {
|
||||
// buffer ineligible keys
|
||||
compact->BufferOtherKeyValueSlices(key, value);
|
||||
}
|
||||
backup_input->Next();
|
||||
continue;
|
||||
// finish changing values for eligible keys
|
||||
} else {
|
||||
// Now prefix changes, this batch is done.
|
||||
// Call compaction filter on the buffered values to change the value
|
||||
if (compact->key_buf_.size() > 0) {
|
||||
CallCompactionFilterV2(compact, compaction_filter_v2);
|
||||
}
|
||||
compact->cur_prefix_ = key_prefix;
|
||||
}
|
||||
}
|
||||
|
||||
// Merge this batch of data (values + ineligible keys)
|
||||
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
|
||||
|
||||
// Done buffering for the current prefix. Spit it out to disk
|
||||
// Now just iterate through all the kv-pairs
|
||||
status = ProcessKeyValueCompaction(
|
||||
visible_at_tip,
|
||||
earliest_snapshot,
|
||||
latest_snapshot,
|
||||
deletion_state,
|
||||
bottommost_level,
|
||||
imm_micros,
|
||||
input.get(),
|
||||
compact,
|
||||
true,
|
||||
log_buffer);
|
||||
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// After writing the kv-pairs, we can safely remove the reference
|
||||
// to the string buffer and clean them up
|
||||
compact->CleanupBatchBuffer();
|
||||
compact->CleanupMergedBuffer();
|
||||
// Buffer the key that triggers the mismatch in prefix
|
||||
if (ikey.type == kTypeValue &&
|
||||
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
||||
compact->BufferKeyValueSlices(key, value);
|
||||
} else {
|
||||
compact->BufferOtherKeyValueSlices(key, value);
|
||||
}
|
||||
backup_input->Next();
|
||||
if (!backup_input->Valid()) {
|
||||
// If this is the single last value, we need to merge it.
|
||||
if (compact->key_buf_.size() > 0) {
|
||||
CallCompactionFilterV2(compact, compaction_filter_v2);
|
||||
}
|
||||
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
|
||||
|
||||
status = ProcessKeyValueCompaction(
|
||||
visible_at_tip,
|
||||
earliest_snapshot,
|
||||
latest_snapshot,
|
||||
deletion_state,
|
||||
bottommost_level,
|
||||
imm_micros,
|
||||
input.get(),
|
||||
compact,
|
||||
true,
|
||||
log_buffer);
|
||||
|
||||
compact->CleanupBatchBuffer();
|
||||
compact->CleanupMergedBuffer();
|
||||
}
|
||||
} // done processing all prefix batches
|
||||
// finish the last batch
|
||||
if (compact->key_buf_.size() > 0) {
|
||||
CallCompactionFilterV2(compact, compaction_filter_v2);
|
||||
}
|
||||
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
|
||||
status = ProcessKeyValueCompaction(
|
||||
visible_at_tip,
|
||||
earliest_snapshot,
|
||||
latest_snapshot,
|
||||
deletion_state,
|
||||
bottommost_level,
|
||||
imm_micros,
|
||||
input.get(),
|
||||
compact,
|
||||
true,
|
||||
log_buffer);
|
||||
} // checking for compaction filter v2
|
||||
|
||||
if (!compaction_filter_v2) {
|
||||
status = ProcessKeyValueCompaction(
|
||||
visible_at_tip,
|
||||
earliest_snapshot,
|
||||
latest_snapshot,
|
||||
deletion_state,
|
||||
bottommost_level,
|
||||
imm_micros,
|
||||
input.get(),
|
||||
compact,
|
||||
false,
|
||||
log_buffer);
|
||||
}
|
||||
|
||||
if (status.ok() && shutting_down_.Acquire_Load()) {
|
||||
status = Status::ShutdownInProgress(
|
||||
"Database shutdown started during compaction");
|
||||
|
19
db/db_impl.h
19
db/db_impl.h
@ -36,6 +36,7 @@ class TableCache;
|
||||
class Version;
|
||||
class VersionEdit;
|
||||
class VersionSet;
|
||||
class CompactionFilterV2;
|
||||
|
||||
class DBImpl : public DB {
|
||||
public:
|
||||
@ -366,6 +367,24 @@ class DBImpl : public DB {
|
||||
DeletionState& deletion_state,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
// Call compaction filter if is_compaction_v2 is not true. Then iterate
|
||||
// through input and compact the kv-pairs
|
||||
Status ProcessKeyValueCompaction(
|
||||
SequenceNumber visible_at_tip,
|
||||
SequenceNumber earliest_snapshot,
|
||||
SequenceNumber latest_snapshot,
|
||||
DeletionState& deletion_state,
|
||||
bool bottommost_level,
|
||||
int64_t& imm_micros,
|
||||
Iterator* input,
|
||||
CompactionState* compact,
|
||||
bool is_compaction_v2,
|
||||
LogBuffer* log_buffer);
|
||||
|
||||
// Call compaction_filter_v2->Filter() on kv-pairs in compact
|
||||
void CallCompactionFilterV2(CompactionState* compact,
|
||||
CompactionFilterV2* compaction_filter_v2);
|
||||
|
||||
Status OpenCompactionOutputFile(CompactionState* compact);
|
||||
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
|
||||
Status InstallCompactionResults(CompactionState* compact);
|
||||
|
254
db/db_test.cc
254
db/db_test.cc
@ -2434,7 +2434,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
|
||||
: check_context_(check_context) {}
|
||||
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) override {
|
||||
const CompactionFilterContext& context) override {
|
||||
if (check_context_) {
|
||||
ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
|
||||
ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
|
||||
@ -2451,7 +2451,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
|
||||
class DeleteFilterFactory : public CompactionFilterFactory {
|
||||
public:
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) override {
|
||||
const CompactionFilterContext& context) override {
|
||||
if (context.is_manual_compaction) {
|
||||
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
|
||||
} else {
|
||||
@ -2467,7 +2467,7 @@ class ChangeFilterFactory : public CompactionFilterFactory {
|
||||
explicit ChangeFilterFactory() {}
|
||||
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) override {
|
||||
const CompactionFilterContext& context) override {
|
||||
return std::unique_ptr<CompactionFilter>(new ChangeFilter());
|
||||
}
|
||||
|
||||
@ -3507,7 +3507,7 @@ TEST(DBTest, CompactionFilterWithValueChange) {
|
||||
|
||||
// verify that all keys now have the new value that
|
||||
// was set by the compaction process.
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
for (int i = 0; i < 100001; i++) {
|
||||
char key[100];
|
||||
snprintf(key, sizeof(key), "B%010d", i);
|
||||
std::string newvalue = Get(key);
|
||||
@ -3570,6 +3570,252 @@ TEST(DBTest, CompactionFilterContextManual) {
|
||||
delete iter;
|
||||
}
|
||||
|
||||
class KeepFilterV2 : public CompactionFilterV2 {
|
||||
public:
|
||||
virtual std::vector<bool> Filter(int level,
|
||||
const SliceVector& keys,
|
||||
const SliceVector& existing_values,
|
||||
std::vector<std::string>* new_values,
|
||||
std::vector<bool>* values_changed)
|
||||
const override {
|
||||
cfilter_count++;
|
||||
std::vector<bool> ret;
|
||||
new_values->clear();
|
||||
values_changed->clear();
|
||||
for (unsigned int i = 0; i < keys.size(); ++i) {
|
||||
values_changed->push_back(false);
|
||||
ret.push_back(false);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "KeepFilterV2";
|
||||
}
|
||||
};
|
||||
|
||||
class DeleteFilterV2 : public CompactionFilterV2 {
|
||||
public:
|
||||
virtual std::vector<bool> Filter(int level,
|
||||
const SliceVector& keys,
|
||||
const SliceVector& existing_values,
|
||||
std::vector<std::string>* new_values,
|
||||
std::vector<bool>* values_changed)
|
||||
const override {
|
||||
cfilter_count++;
|
||||
new_values->clear();
|
||||
values_changed->clear();
|
||||
std::vector<bool> ret;
|
||||
for (unsigned int i = 0; i < keys.size(); ++i) {
|
||||
values_changed->push_back(false);
|
||||
ret.push_back(true);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "DeleteFilterV2";
|
||||
}
|
||||
};
|
||||
|
||||
class ChangeFilterV2 : public CompactionFilterV2 {
|
||||
public:
|
||||
virtual std::vector<bool> Filter(int level,
|
||||
const SliceVector& keys,
|
||||
const SliceVector& existing_values,
|
||||
std::vector<std::string>* new_values,
|
||||
std::vector<bool>* values_changed)
|
||||
const override {
|
||||
std::vector<bool> ret;
|
||||
new_values->clear();
|
||||
values_changed->clear();
|
||||
for (unsigned int i = 0; i < keys.size(); ++i) {
|
||||
values_changed->push_back(true);
|
||||
new_values->push_back(NEW_VALUE);
|
||||
ret.push_back(false);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "ChangeFilterV2";
|
||||
}
|
||||
};
|
||||
|
||||
class KeepFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||
public:
|
||||
explicit KeepFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||
|
||||
virtual std::unique_ptr<CompactionFilterV2>
|
||||
CreateCompactionFilterV2(
|
||||
const CompactionFilterContext& context) override {
|
||||
return std::unique_ptr<CompactionFilterV2>(new KeepFilterV2());
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "KeepFilterFactoryV2";
|
||||
}
|
||||
};
|
||||
|
||||
class DeleteFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||
public:
|
||||
explicit DeleteFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||
|
||||
virtual std::unique_ptr<CompactionFilterV2>
|
||||
CreateCompactionFilterV2(
|
||||
const CompactionFilterContext& context) override {
|
||||
return std::unique_ptr<CompactionFilterV2>(new DeleteFilterV2());
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "DeleteFilterFactoryV2";
|
||||
}
|
||||
};
|
||||
|
||||
class ChangeFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||
public:
|
||||
explicit ChangeFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||
|
||||
virtual std::unique_ptr<CompactionFilterV2>
|
||||
CreateCompactionFilterV2(
|
||||
const CompactionFilterContext& context) override {
|
||||
return std::unique_ptr<CompactionFilterV2>(new ChangeFilterV2());
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "ChangeFilterFactoryV2";
|
||||
}
|
||||
};
|
||||
|
||||
TEST(DBTest, CompactionFilterV2) {
|
||||
Options options = CurrentOptions();
|
||||
options.num_levels = 3;
|
||||
options.max_mem_compaction_level = 0;
|
||||
// extract prefix
|
||||
auto prefix_extractor = NewFixedPrefixTransform(8);
|
||||
options.compaction_filter_factory_v2
|
||||
= std::make_shared<KeepFilterFactoryV2>(prefix_extractor);
|
||||
// In a testing environment, we can only flush the application
|
||||
// compaction filter buffer using universal compaction
|
||||
option_config_ = kUniversalCompaction;
|
||||
options.compaction_style = (rocksdb::CompactionStyle)1;
|
||||
Reopen(&options);
|
||||
|
||||
// Write 100K keys, these are written to a few files in L0.
|
||||
const std::string value(10, 'x');
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
char key[100];
|
||||
snprintf(key, sizeof(key), "B%08d%010d", i , i);
|
||||
Put(key, value);
|
||||
}
|
||||
|
||||
dbfull()->TEST_FlushMemTable();
|
||||
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
|
||||
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
|
||||
|
||||
// All the files are in the lowest level.
|
||||
int count = 0;
|
||||
int total = 0;
|
||||
Iterator* iter = dbfull()->TEST_NewInternalIterator();
|
||||
iter->SeekToFirst();
|
||||
ASSERT_OK(iter->status());
|
||||
while (iter->Valid()) {
|
||||
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
|
||||
ikey.sequence = -1;
|
||||
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
|
||||
total++;
|
||||
if (ikey.sequence != 0) {
|
||||
count++;
|
||||
}
|
||||
iter->Next();
|
||||
}
|
||||
|
||||
ASSERT_EQ(total, 100000);
|
||||
// 1 snapshot only. Since we are using universal compacton,
|
||||
// the sequence no is cleared for better compression
|
||||
ASSERT_EQ(count, 1);
|
||||
delete iter;
|
||||
|
||||
// create a new database with the compaction
|
||||
// filter in such a way that it deletes all keys
|
||||
options.compaction_filter_factory_v2 =
|
||||
std::make_shared<DeleteFilterFactoryV2>(prefix_extractor);
|
||||
options.create_if_missing = true;
|
||||
DestroyAndReopen(&options);
|
||||
|
||||
// write all the keys once again.
|
||||
for (int i = 0; i < 100000; i++) {
|
||||
char key[100];
|
||||
snprintf(key, sizeof(key), "B%08d%010d", i, i);
|
||||
Put(key, value);
|
||||
}
|
||||
|
||||
dbfull()->TEST_FlushMemTable();
|
||||
ASSERT_NE(NumTableFilesAtLevel(0), 0);
|
||||
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
|
||||
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
|
||||
|
||||
// Scan the entire database to ensure that nothing is left
|
||||
iter = db_->NewIterator(ReadOptions());
|
||||
iter->SeekToFirst();
|
||||
count = 0;
|
||||
while (iter->Valid()) {
|
||||
count++;
|
||||
iter->Next();
|
||||
}
|
||||
|
||||
ASSERT_EQ(count, 0);
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST(DBTest, CompactionFilterV2WithValueChange) {
|
||||
Options options = CurrentOptions();
|
||||
options.num_levels = 3;
|
||||
options.max_mem_compaction_level = 0;
|
||||
auto prefix_extractor = NewFixedPrefixTransform(8);
|
||||
options.compaction_filter_factory_v2 =
|
||||
std::make_shared<ChangeFilterFactoryV2>(prefix_extractor);
|
||||
// In a testing environment, we can only flush the application
|
||||
// compaction filter buffer using universal compaction
|
||||
option_config_ = kUniversalCompaction;
|
||||
options.compaction_style = (rocksdb::CompactionStyle)1;
|
||||
Reopen(&options);
|
||||
|
||||
// Write 100K+1 keys, these are written to a few files
|
||||
// in L0. We do this so that the current snapshot points
|
||||
// to the 100001 key.The compaction filter is not invoked
|
||||
// on keys that are visible via a snapshot because we
|
||||
// anyways cannot delete it.
|
||||
const std::string value(10, 'x');
|
||||
for (int i = 0; i < 100001; i++) {
|
||||
char key[100];
|
||||
snprintf(key, sizeof(key), "B%08d%010d", i, i);
|
||||
Put(key, value);
|
||||
}
|
||||
|
||||
// push all files to lower levels
|
||||
dbfull()->TEST_FlushMemTable();
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
|
||||
|
||||
// verify that all keys now have the new value that
|
||||
// was set by the compaction process.
|
||||
for (int i = 0; i < 100001; i++) {
|
||||
char key[100];
|
||||
snprintf(key, sizeof(key), "B%08d%010d", i, i);
|
||||
std::string newvalue = Get(key);
|
||||
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(DBTest, SparseMerge) {
|
||||
do {
|
||||
Options options = CurrentOptions();
|
||||
|
@ -21,7 +21,7 @@ namespace rocksdb {
|
||||
// operands_ stores the list of merge operands encountered while merging.
|
||||
// keys_[i] corresponds to operands_[i] for each i.
|
||||
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
bool at_bottom, Statistics* stats) {
|
||||
bool at_bottom, Statistics* stats, int* steps) {
|
||||
// Get a copy of the internal key, before it's invalidated by iter->Next()
|
||||
// Also maintain the list of merge operands seen.
|
||||
keys_.clear();
|
||||
@ -42,6 +42,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
bool hit_the_next_user_key = false;
|
||||
ParsedInternalKey ikey;
|
||||
std::string merge_result; // Temporary value for merge results
|
||||
if (steps) {
|
||||
++(*steps);
|
||||
}
|
||||
for (iter->Next(); iter->Valid(); iter->Next()) {
|
||||
assert(operands_.size() >= 1); // Should be invariants!
|
||||
assert(keys_.size() == operands_.size());
|
||||
@ -91,6 +94,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
|
||||
// move iter to the next entry (before doing anything else)
|
||||
iter->Next();
|
||||
if (steps) {
|
||||
++(*steps);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -119,6 +125,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
|
||||
// move iter to the next entry
|
||||
iter->Next();
|
||||
if (steps) {
|
||||
++(*steps);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -133,6 +142,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
// request or later did a partial merge.
|
||||
keys_.push_front(iter->key().ToString());
|
||||
operands_.push_front(iter->value().ToString());
|
||||
if (steps) {
|
||||
++(*steps);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -47,7 +47,8 @@ class MergeHelper {
|
||||
// at_bottom: (IN) true if the iterator covers the bottem level, which means
|
||||
// we could reach the start of the history of this user key.
|
||||
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
|
||||
bool at_bottom = false, Statistics* stats = nullptr);
|
||||
bool at_bottom = false, Statistics* stats = nullptr,
|
||||
int* steps = nullptr);
|
||||
|
||||
// Query the merge result
|
||||
// These are valid until the next MergeUntil call
|
||||
|
@ -10,26 +10,27 @@
|
||||
#define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
class Slice;
|
||||
class SliceTransform;
|
||||
|
||||
// Context information of a compaction run
|
||||
struct CompactionFilterContext {
|
||||
// Does this compaction run include all data files
|
||||
bool is_full_compaction;
|
||||
// Is this compaction requested by the client (true),
|
||||
// or is it occurring as an automatic compaction process
|
||||
bool is_manual_compaction;
|
||||
};
|
||||
|
||||
// CompactionFilter allows an application to modify/delete a key-value at
|
||||
// the time of compaction.
|
||||
|
||||
class CompactionFilter {
|
||||
public:
|
||||
|
||||
// Context information of a compaction run
|
||||
struct Context {
|
||||
// Does this compaction run include all data files
|
||||
bool is_full_compaction;
|
||||
// Is this compaction requested by the client (true),
|
||||
// or is it occurring as an automatic compaction process
|
||||
bool is_manual_compaction;
|
||||
};
|
||||
|
||||
virtual ~CompactionFilter() {}
|
||||
|
||||
// The compaction process invokes this
|
||||
@ -64,14 +65,47 @@ class CompactionFilter {
|
||||
virtual const char* Name() const = 0;
|
||||
};
|
||||
|
||||
// CompactionFilterV2 that buffers kv pairs sharing the same prefix and let
|
||||
// application layer to make individual decisions for all the kv pairs in the
|
||||
// buffer.
|
||||
class CompactionFilterV2 {
|
||||
public:
|
||||
virtual ~CompactionFilterV2() {}
|
||||
|
||||
// The compaction process invokes this method for all the kv pairs
|
||||
// sharing the same prefix. It is a "roll-up" version of CompactionFilter.
|
||||
//
|
||||
// Each entry in the return vector indicates if the corresponding kv should
|
||||
// be preserved in the output of this compaction run. The application can
|
||||
// inspect the exisitng values of the keys and make decision based on it.
|
||||
//
|
||||
// When a value is to be preserved, the application has the option
|
||||
// to modify the entry in existing_values and pass it back through an entry
|
||||
// in new_values. A corresponding values_changed entry needs to be set to
|
||||
// true in this case. Note that the new_values vector contains only changed
|
||||
// values, i.e. new_values.size() <= values_changed.size().
|
||||
//
|
||||
typedef std::vector<Slice> SliceVector;
|
||||
virtual std::vector<bool> Filter(int level,
|
||||
const SliceVector& keys,
|
||||
const SliceVector& existing_values,
|
||||
std::vector<std::string>* new_values,
|
||||
std::vector<bool>* values_changed)
|
||||
const = 0;
|
||||
|
||||
// Returns a name that identifies this compaction filter.
|
||||
// The name will be printed to LOG file on start up for diagnosis.
|
||||
virtual const char* Name() const = 0;
|
||||
};
|
||||
|
||||
// Each compaction will create a new CompactionFilter allowing the
|
||||
// application to know about different campactions
|
||||
class CompactionFilterFactory {
|
||||
public:
|
||||
virtual ~CompactionFilterFactory() { };
|
||||
virtual ~CompactionFilterFactory() { }
|
||||
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) = 0;
|
||||
const CompactionFilterContext& context) = 0;
|
||||
|
||||
// Returns a name that identifies this compaction filter factory.
|
||||
virtual const char* Name() const = 0;
|
||||
@ -82,7 +116,7 @@ class CompactionFilterFactory {
|
||||
class DefaultCompactionFilterFactory : public CompactionFilterFactory {
|
||||
public:
|
||||
virtual std::unique_ptr<CompactionFilter>
|
||||
CreateCompactionFilter(const CompactionFilter::Context& context) override {
|
||||
CreateCompactionFilter(const CompactionFilterContext& context) override {
|
||||
return std::unique_ptr<CompactionFilter>(nullptr);
|
||||
}
|
||||
|
||||
@ -91,6 +125,65 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory {
|
||||
}
|
||||
};
|
||||
|
||||
// Each compaction will create a new CompactionFilterV2
|
||||
//
|
||||
// CompactionFilterFactoryV2 enables application to specify a prefix and use
|
||||
// CompactionFilterV2 to filter kv-pairs in batches. Each batch contains all
|
||||
// the kv-pairs sharing the same prefix.
|
||||
//
|
||||
// This is useful for applications that require grouping kv-pairs in
|
||||
// compaction filter to make a purge/no-purge decision. For example, if the
|
||||
// key prefix is user id and the rest of key represents the type of value.
|
||||
// This batching filter will come in handy if the application's compaction
|
||||
// filter requires knowledge of all types of values for any user id.
|
||||
//
|
||||
class CompactionFilterFactoryV2 {
|
||||
public:
|
||||
explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||
: prefix_extractor_(prefix_extractor) { }
|
||||
|
||||
virtual ~CompactionFilterFactoryV2() { }
|
||||
|
||||
virtual std::unique_ptr<CompactionFilterV2> CreateCompactionFilterV2(
|
||||
const CompactionFilterContext& context) = 0;
|
||||
|
||||
// Returns a name that identifies this compaction filter factory.
|
||||
virtual const char* Name() const = 0;
|
||||
|
||||
const SliceTransform* GetPrefixExtractor() const {
|
||||
return prefix_extractor_;
|
||||
}
|
||||
|
||||
void SetPrefixExtractor(const SliceTransform* prefix_extractor) {
|
||||
prefix_extractor_ = prefix_extractor;
|
||||
}
|
||||
|
||||
private:
|
||||
// Prefix extractor for compaction filter v2
|
||||
// Keys sharing the same prefix will be buffered internally.
|
||||
// Client can implement a Filter callback function to operate on the buffer
|
||||
const SliceTransform* prefix_extractor_;
|
||||
};
|
||||
|
||||
// Default implementaion of CompactionFilterFactoryV2 which does not
|
||||
// return any filter
|
||||
class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||
public:
|
||||
explicit DefaultCompactionFilterFactoryV2(
|
||||
const SliceTransform* prefix_extractor)
|
||||
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||
|
||||
virtual std::unique_ptr<CompactionFilterV2>
|
||||
CreateCompactionFilterV2(
|
||||
const CompactionFilterContext& context) override {
|
||||
return std::unique_ptr<CompactionFilterV2>(nullptr);
|
||||
}
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "DefaultCompactionFilterFactoryV2";
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
|
||||
|
@ -214,7 +214,7 @@ class Env {
|
||||
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
|
||||
|
||||
// Wait for all threads started by StartThread to terminate.
|
||||
virtual void WaitForJoin() = 0;
|
||||
virtual void WaitForJoin() {}
|
||||
|
||||
// Get thread pool queue length for specific thrad pool.
|
||||
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {
|
||||
|
@ -22,6 +22,7 @@ namespace rocksdb {
|
||||
class Cache;
|
||||
class CompactionFilter;
|
||||
class CompactionFilterFactory;
|
||||
class CompactionFilterFactoryV2;
|
||||
class Comparator;
|
||||
class Env;
|
||||
enum InfoLogLevel : unsigned char;
|
||||
@ -123,6 +124,10 @@ struct Options {
|
||||
// Default: a factory that doesn't provide any object
|
||||
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
|
||||
|
||||
// Version TWO of the compaction_filter_factory
|
||||
// It supports rolling compaction
|
||||
std::shared_ptr<CompactionFilterFactoryV2> compaction_filter_factory_v2;
|
||||
|
||||
// If true, the database will be created if it is missing.
|
||||
// Default: false
|
||||
bool create_if_missing;
|
||||
|
@ -32,6 +32,9 @@ Options::Options()
|
||||
compaction_filter(nullptr),
|
||||
compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>(
|
||||
new DefaultCompactionFilterFactory())),
|
||||
compaction_filter_factory_v2(
|
||||
new DefaultCompactionFilterFactoryV2(
|
||||
NewFixedPrefixTransform(8))),
|
||||
create_if_missing(false),
|
||||
error_if_exists(false),
|
||||
paranoid_checks(false),
|
||||
@ -131,6 +134,8 @@ Options::Dump(Logger* log) const
|
||||
compaction_filter? compaction_filter->Name() : "None");
|
||||
Log(log," Options.compaction_filter_factory: %s",
|
||||
compaction_filter_factory->Name());
|
||||
Log(log, " Options.compaction_filter_factory_v2: %s",
|
||||
compaction_filter_factory_v2->Name());
|
||||
Log(log," Options.memtable_factory: %s",
|
||||
memtable_factory->Name());
|
||||
Log(log," Options.table_factory: %s", table_factory->Name());
|
||||
|
@ -192,7 +192,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
|
||||
user_comp_filter_factory_(comp_filter_factory) { }
|
||||
|
||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) {
|
||||
const CompactionFilterContext& context) {
|
||||
return std::unique_ptr<TtlCompactionFilter>(
|
||||
new TtlCompactionFilter(
|
||||
ttl_,
|
||||
|
@ -285,7 +285,7 @@ class TtlTest {
|
||||
|
||||
virtual std::unique_ptr<CompactionFilter>
|
||||
CreateCompactionFilter(
|
||||
const CompactionFilter::Context& context) override {
|
||||
const CompactionFilterContext& context) override {
|
||||
return std::unique_ptr<CompactionFilter>(
|
||||
new TestFilter(kSampleSize_, kNewValue_));
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user