[rocksdb] new CompactionFilterV2 API
Summary: This diff adds a new CompactionFilterV2 API that roll up the decisions of kv pairs during compactions. These kv pairs must share the same key prefix. They are buffered inside the db. typedef std::vector<Slice> SliceVector; virtual std::vector<bool> Filter(int level, const SliceVector& keys, const SliceVector& existing_values, std::vector<std::string>* new_values, std::vector<bool>* values_changed ) const = 0; Application can override the Filter() function to operate on the buffered kv pairs. More details in the inline documentation. Test Plan: make check. Added unit tests to make sure Keep, Delete, Change all works. Reviewers: haobo CCs: leveldb Differential Revision: https://reviews.facebook.net/D15087
This commit is contained in:
parent
cda4006e87
commit
b47812fba6
@ -15,11 +15,13 @@
|
|||||||
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
|
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
|
||||||
* Added a command "checkconsistency" in ldb tool, which checks
|
* Added a command "checkconsistency" in ldb tool, which checks
|
||||||
if file system state matches DB state (file existence and file sizes)
|
if file system state matches DB state (file existence and file sizes)
|
||||||
|
* CompactionFilter::Context is now CompactionFilterContext. It is shared by CompactionFilter and CompactionFilterV2
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
* If we find one truncated record at the end of the MANIFEST or WAL files,
|
* If we find one truncated record at the end of the MANIFEST or WAL files,
|
||||||
we will ignore it. We assume that writers of these records were interrupted
|
we will ignore it. We assume that writers of these records were interrupted
|
||||||
and that we can safely ignore it.
|
and that we can safely ignore it.
|
||||||
|
* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB.
|
||||||
|
|
||||||
## 2.7.0 (01/28/2014)
|
## 2.7.0 (01/28/2014)
|
||||||
|
|
||||||
|
495
db/db_impl.cc
495
db/db_impl.cc
@ -70,6 +70,7 @@ namespace rocksdb {
|
|||||||
int DBImpl::SuperVersion::dummy = 0;
|
int DBImpl::SuperVersion::dummy = 0;
|
||||||
void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy;
|
void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy;
|
||||||
void* const DBImpl::SuperVersion::kSVObsolete = nullptr;
|
void* const DBImpl::SuperVersion::kSVObsolete = nullptr;
|
||||||
|
const std::string kNullString = "NULL";
|
||||||
|
|
||||||
void DumpLeveldbBuildVersion(Logger * log);
|
void DumpLeveldbBuildVersion(Logger * log);
|
||||||
|
|
||||||
@ -118,12 +119,129 @@ struct DBImpl::CompactionState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a client visible context of this compaction
|
// Create a client visible context of this compaction
|
||||||
CompactionFilter::Context GetFilterContext() {
|
CompactionFilterContext GetFilterContext() {
|
||||||
CompactionFilter::Context context;
|
CompactionFilterContext context;
|
||||||
context.is_full_compaction = compaction->IsFullCompaction();
|
context.is_full_compaction = compaction->IsFullCompaction();
|
||||||
context.is_manual_compaction = compaction->IsManualCompaction();
|
context.is_manual_compaction = compaction->IsManualCompaction();
|
||||||
return context;
|
return context;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::vector<Slice> key_buf_;
|
||||||
|
std::vector<Slice> existing_value_buf_;
|
||||||
|
std::vector<std::string> key_str_buf_;
|
||||||
|
std::vector<std::string> existing_value_str_buf_;
|
||||||
|
// new_value_buf_ will only be appended if a value changes
|
||||||
|
std::vector<std::string> new_value_buf_;
|
||||||
|
// if values_changed_buf_[i] is true
|
||||||
|
// new_value_buf_ will add a new entry with the changed value
|
||||||
|
std::vector<bool> value_changed_buf_;
|
||||||
|
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
|
||||||
|
std::vector<bool> to_delete_buf_;
|
||||||
|
// buffer for the parsed internal keys, the string buffer is backed
|
||||||
|
// by key_str_buf_
|
||||||
|
std::vector<ParsedInternalKey> ikey_buf_;
|
||||||
|
|
||||||
|
std::vector<Slice> other_key_buf_;
|
||||||
|
std::vector<Slice> other_value_buf_;
|
||||||
|
std::vector<std::string> other_key_str_buf_;
|
||||||
|
std::vector<std::string> other_value_str_buf_;
|
||||||
|
|
||||||
|
std::vector<Slice> combined_key_buf_;
|
||||||
|
std::vector<Slice> combined_value_buf_;
|
||||||
|
|
||||||
|
std::string cur_prefix_;
|
||||||
|
|
||||||
|
// Buffers the kv-pair that will be run through compaction filter V2
|
||||||
|
// in the future.
|
||||||
|
void BufferKeyValueSlices(const Slice& key, const Slice& value) {
|
||||||
|
key_str_buf_.emplace_back(key.ToString());
|
||||||
|
existing_value_str_buf_.emplace_back(value.ToString());
|
||||||
|
key_buf_.emplace_back(Slice(key_str_buf_.back()));
|
||||||
|
existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back()));
|
||||||
|
|
||||||
|
ParsedInternalKey ikey;
|
||||||
|
ParseInternalKey(key_buf_.back(), &ikey);
|
||||||
|
ikey_buf_.emplace_back(ikey);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Buffers the kv-pair that will not be run through compaction filter V2
|
||||||
|
// in the future.
|
||||||
|
void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
|
||||||
|
other_key_str_buf_.emplace_back(key.ToString());
|
||||||
|
other_value_str_buf_.emplace_back(value.ToString());
|
||||||
|
other_key_buf_.emplace_back(Slice(other_key_str_buf_.back()));
|
||||||
|
other_value_buf_.emplace_back(Slice(other_value_str_buf_.back()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add a kv-pair to the combined buffer
|
||||||
|
void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
|
||||||
|
// The real strings are stored in the batch buffers
|
||||||
|
combined_key_buf_.emplace_back(key);
|
||||||
|
combined_value_buf_.emplace_back(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merging the two buffers
|
||||||
|
void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
|
||||||
|
size_t i = 0;
|
||||||
|
size_t j = 0;
|
||||||
|
size_t total_size = key_buf_.size() + other_key_buf_.size();
|
||||||
|
combined_key_buf_.reserve(total_size);
|
||||||
|
combined_value_buf_.reserve(total_size);
|
||||||
|
|
||||||
|
while (i + j < total_size) {
|
||||||
|
int comp_res = 0;
|
||||||
|
if (i < key_buf_.size() && j < other_key_buf_.size()) {
|
||||||
|
comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]);
|
||||||
|
} else if (i >= key_buf_.size() && j < other_key_buf_.size()) {
|
||||||
|
comp_res = 1;
|
||||||
|
} else if (j >= other_key_buf_.size() && i < key_buf_.size()) {
|
||||||
|
comp_res = -1;
|
||||||
|
}
|
||||||
|
if (comp_res > 0) {
|
||||||
|
AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]);
|
||||||
|
j++;
|
||||||
|
} else if (comp_res < 0) {
|
||||||
|
AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]);
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void CleanupBatchBuffer() {
|
||||||
|
to_delete_buf_.clear();
|
||||||
|
key_buf_.clear();
|
||||||
|
existing_value_buf_.clear();
|
||||||
|
key_str_buf_.clear();
|
||||||
|
existing_value_str_buf_.clear();
|
||||||
|
new_value_buf_.clear();
|
||||||
|
value_changed_buf_.clear();
|
||||||
|
ikey_buf_.clear();
|
||||||
|
|
||||||
|
to_delete_buf_.shrink_to_fit();
|
||||||
|
key_buf_.shrink_to_fit();
|
||||||
|
existing_value_buf_.shrink_to_fit();
|
||||||
|
key_str_buf_.shrink_to_fit();
|
||||||
|
existing_value_str_buf_.shrink_to_fit();
|
||||||
|
new_value_buf_.shrink_to_fit();
|
||||||
|
value_changed_buf_.shrink_to_fit();
|
||||||
|
ikey_buf_.shrink_to_fit();
|
||||||
|
|
||||||
|
other_key_buf_.clear();
|
||||||
|
other_value_buf_.clear();
|
||||||
|
other_key_str_buf_.clear();
|
||||||
|
other_value_str_buf_.clear();
|
||||||
|
other_key_buf_.shrink_to_fit();
|
||||||
|
other_value_buf_.shrink_to_fit();
|
||||||
|
other_key_str_buf_.shrink_to_fit();
|
||||||
|
other_value_str_buf_.shrink_to_fit();
|
||||||
|
}
|
||||||
|
|
||||||
|
void CleanupMergedBuffer() {
|
||||||
|
combined_key_buf_.clear();
|
||||||
|
combined_value_buf_.clear();
|
||||||
|
combined_key_buf_.shrink_to_fit();
|
||||||
|
combined_value_buf_.shrink_to_fit();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Fix user-supplied options to be reasonable
|
// Fix user-supplied options to be reasonable
|
||||||
@ -2401,66 +2519,27 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBImpl::DoCompactionWork(CompactionState* compact,
|
Status DBImpl::ProcessKeyValueCompaction(
|
||||||
|
SequenceNumber visible_at_tip,
|
||||||
|
SequenceNumber earliest_snapshot,
|
||||||
|
SequenceNumber latest_snapshot,
|
||||||
DeletionState& deletion_state,
|
DeletionState& deletion_state,
|
||||||
|
bool bottommost_level,
|
||||||
|
int64_t& imm_micros,
|
||||||
|
Iterator* input,
|
||||||
|
CompactionState* compact,
|
||||||
|
bool is_compaction_v2,
|
||||||
LogBuffer* log_buffer) {
|
LogBuffer* log_buffer) {
|
||||||
assert(compact);
|
size_t combined_idx = 0;
|
||||||
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
|
||||||
Log(options_.info_log,
|
|
||||||
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
|
|
||||||
compact->compaction->num_input_files(0),
|
|
||||||
compact->compaction->level(),
|
|
||||||
compact->compaction->num_input_files(1),
|
|
||||||
compact->compaction->output_level(),
|
|
||||||
compact->compaction->score(),
|
|
||||||
options_.max_background_compactions - bg_compaction_scheduled_);
|
|
||||||
char scratch[2345];
|
|
||||||
compact->compaction->Summary(scratch, sizeof(scratch));
|
|
||||||
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
|
|
||||||
|
|
||||||
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
|
|
||||||
assert(compact->builder == nullptr);
|
|
||||||
assert(!compact->outfile);
|
|
||||||
|
|
||||||
SequenceNumber visible_at_tip = 0;
|
|
||||||
SequenceNumber earliest_snapshot;
|
|
||||||
SequenceNumber latest_snapshot = 0;
|
|
||||||
snapshots_.getAll(compact->existing_snapshots);
|
|
||||||
if (compact->existing_snapshots.size() == 0) {
|
|
||||||
// optimize for fast path if there are no snapshots
|
|
||||||
visible_at_tip = versions_->LastSequence();
|
|
||||||
earliest_snapshot = visible_at_tip;
|
|
||||||
} else {
|
|
||||||
latest_snapshot = compact->existing_snapshots.back();
|
|
||||||
// Add the current seqno as the 'latest' virtual
|
|
||||||
// snapshot to the end of this list.
|
|
||||||
compact->existing_snapshots.push_back(versions_->LastSequence());
|
|
||||||
earliest_snapshot = compact->existing_snapshots[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
// Is this compaction producing files at the bottommost level?
|
|
||||||
bool bottommost_level = compact->compaction->BottomMostLevel();
|
|
||||||
|
|
||||||
// Allocate the output file numbers before we release the lock
|
|
||||||
AllocateCompactionOutputFileNumbers(compact);
|
|
||||||
|
|
||||||
// Release mutex while we're actually doing the compaction work
|
|
||||||
mutex_.Unlock();
|
|
||||||
// flush log buffer immediately after releasing the mutex
|
|
||||||
log_buffer->FlushBufferToLog();
|
|
||||||
|
|
||||||
const uint64_t start_micros = env_->NowMicros();
|
|
||||||
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
|
|
||||||
input->SeekToFirst();
|
|
||||||
Status status;
|
Status status;
|
||||||
|
std::string compaction_filter_value;
|
||||||
ParsedInternalKey ikey;
|
ParsedInternalKey ikey;
|
||||||
std::string current_user_key;
|
std::string current_user_key;
|
||||||
bool has_current_user_key = false;
|
bool has_current_user_key = false;
|
||||||
|
std::vector<char> delete_key; // for compaction filter
|
||||||
SequenceNumber last_sequence_for_key __attribute__((unused)) =
|
SequenceNumber last_sequence_for_key __attribute__((unused)) =
|
||||||
kMaxSequenceNumber;
|
kMaxSequenceNumber;
|
||||||
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
|
||||||
std::string compaction_filter_value;
|
|
||||||
std::vector<char> delete_key; // for compaction filter
|
|
||||||
MergeHelper merge(user_comparator(), options_.merge_operator.get(),
|
MergeHelper merge(user_comparator(), options_.merge_operator.get(),
|
||||||
options_.info_log.get(),
|
options_.info_log.get(),
|
||||||
options_.min_partial_merge_operands,
|
options_.min_partial_merge_operands,
|
||||||
@ -2490,12 +2569,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
imm_micros += (env_->NowMicros() - imm_start);
|
imm_micros += (env_->NowMicros() - imm_start);
|
||||||
}
|
}
|
||||||
|
|
||||||
Slice key = input->key();
|
Slice key;
|
||||||
Slice value = input->value();
|
Slice value;
|
||||||
|
// If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
|
||||||
|
// This prefix batch should contain results after calling
|
||||||
|
// compaction_filter_v2.
|
||||||
|
//
|
||||||
|
// If is_compaction_v2 is off, this function will go through all the
|
||||||
|
// kv-pairs in input.
|
||||||
|
if (!is_compaction_v2) {
|
||||||
|
key = input->key();
|
||||||
|
value = input->value();
|
||||||
|
} else {
|
||||||
|
if (combined_idx >= compact->combined_key_buf_.size()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assert(combined_idx < compact->combined_key_buf_.size());
|
||||||
|
key = compact->combined_key_buf_[combined_idx];
|
||||||
|
value = compact->combined_value_buf_[combined_idx];
|
||||||
|
|
||||||
|
++combined_idx;
|
||||||
|
}
|
||||||
|
|
||||||
if (compact->compaction->ShouldStopBefore(key) &&
|
if (compact->compaction->ShouldStopBefore(key) &&
|
||||||
compact->builder != nullptr) {
|
compact->builder != nullptr) {
|
||||||
status = FinishCompactionOutputFile(compact, input.get());
|
status = FinishCompactionOutputFile(compact, input);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2521,9 +2619,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
has_current_user_key = true;
|
has_current_user_key = true;
|
||||||
last_sequence_for_key = kMaxSequenceNumber;
|
last_sequence_for_key = kMaxSequenceNumber;
|
||||||
visible_in_snapshot = kMaxSequenceNumber;
|
visible_in_snapshot = kMaxSequenceNumber;
|
||||||
|
|
||||||
// apply the compaction filter to the first occurrence of the user key
|
// apply the compaction filter to the first occurrence of the user key
|
||||||
if (compaction_filter &&
|
if (compaction_filter && !is_compaction_v2 &&
|
||||||
ikey.type == kTypeValue &&
|
ikey.type == kTypeValue &&
|
||||||
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
||||||
// If the user has specified a compaction filter and the sequence
|
// If the user has specified a compaction filter and the sequence
|
||||||
@ -2596,8 +2693,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
// object to minimize change to the existing flow. Turn out this
|
// object to minimize change to the existing flow. Turn out this
|
||||||
// logic could also be nicely re-used for memtable flush purge
|
// logic could also be nicely re-used for memtable flush purge
|
||||||
// optimization in BuildTable.
|
// optimization in BuildTable.
|
||||||
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level,
|
int steps = 0;
|
||||||
options_.statistics.get());
|
merge.MergeUntil(input, prev_snapshot, bottommost_level,
|
||||||
|
options_.statistics.get(), &steps);
|
||||||
|
// Skip the Merge ops
|
||||||
|
combined_idx = combined_idx - 1 + steps;
|
||||||
|
|
||||||
current_entry_is_merging = true;
|
current_entry_is_merging = true;
|
||||||
if (merge.IsSuccess()) {
|
if (merge.IsSuccess()) {
|
||||||
// Successfully found Put/Delete/(end-of-key-range) while merging
|
// Successfully found Put/Delete/(end-of-key-range) while merging
|
||||||
@ -2699,7 +2800,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
// Close output file if it is big enough
|
// Close output file if it is big enough
|
||||||
if (compact->builder->FileSize() >=
|
if (compact->builder->FileSize() >=
|
||||||
compact->compaction->MaxOutputFileSize()) {
|
compact->compaction->MaxOutputFileSize()) {
|
||||||
status = FinishCompactionOutputFile(compact, input.get());
|
status = FinishCompactionOutputFile(compact, input);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -2736,6 +2837,278 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBImpl::CallCompactionFilterV2(CompactionState* compact,
|
||||||
|
CompactionFilterV2* compaction_filter_v2) {
|
||||||
|
if (compact == nullptr || compaction_filter_v2 == nullptr) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<Slice> user_key_buf;
|
||||||
|
for (const auto& key : compact->ikey_buf_) {
|
||||||
|
user_key_buf.emplace_back(key.user_key);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the user has specified a compaction filter and the sequence
|
||||||
|
// number is greater than any external snapshot, then invoke the
|
||||||
|
// filter.
|
||||||
|
// If the return value of the compaction filter is true, replace
|
||||||
|
// the entry with a delete marker.
|
||||||
|
compact->to_delete_buf_ = compaction_filter_v2->Filter(
|
||||||
|
compact->compaction->level(),
|
||||||
|
user_key_buf, compact->existing_value_buf_,
|
||||||
|
&compact->new_value_buf_,
|
||||||
|
&compact->value_changed_buf_);
|
||||||
|
|
||||||
|
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
|
||||||
|
// kv-pairs in this compaction run needs to be deleted.
|
||||||
|
assert(compact->to_delete_buf_.size() ==
|
||||||
|
compact->key_buf_.size());
|
||||||
|
assert(compact->to_delete_buf_.size() ==
|
||||||
|
compact->existing_value_buf_.size());
|
||||||
|
assert(compact->to_delete_buf_.size() ==
|
||||||
|
compact->value_changed_buf_.size());
|
||||||
|
|
||||||
|
int new_value_idx = 0;
|
||||||
|
for (unsigned int i = 0; i < compact->to_delete_buf_.size(); ++i) {
|
||||||
|
if (compact->to_delete_buf_[i]) {
|
||||||
|
// update the string buffer directly
|
||||||
|
// the Slice buffer points to the updated buffer
|
||||||
|
UpdateInternalKey(&compact->key_str_buf_[i][0],
|
||||||
|
compact->key_str_buf_[i].size(),
|
||||||
|
compact->ikey_buf_[i].sequence,
|
||||||
|
kTypeDeletion);
|
||||||
|
|
||||||
|
// no value associated with delete
|
||||||
|
compact->existing_value_buf_[i].clear();
|
||||||
|
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER);
|
||||||
|
} else if (compact->value_changed_buf_[i]) {
|
||||||
|
compact->existing_value_buf_[i] =
|
||||||
|
Slice(compact->new_value_buf_[new_value_idx++]);
|
||||||
|
}
|
||||||
|
} // for
|
||||||
|
}
|
||||||
|
|
||||||
|
Status DBImpl::DoCompactionWork(CompactionState* compact,
|
||||||
|
DeletionState& deletion_state,
|
||||||
|
LogBuffer* log_buffer) {
|
||||||
|
assert(compact);
|
||||||
|
compact->CleanupBatchBuffer();
|
||||||
|
compact->CleanupMergedBuffer();
|
||||||
|
compact->cur_prefix_ = kNullString;
|
||||||
|
|
||||||
|
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
||||||
|
Log(options_.info_log,
|
||||||
|
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
|
||||||
|
compact->compaction->num_input_files(0),
|
||||||
|
compact->compaction->level(),
|
||||||
|
compact->compaction->num_input_files(1),
|
||||||
|
compact->compaction->output_level(),
|
||||||
|
compact->compaction->score(),
|
||||||
|
options_.max_background_compactions - bg_compaction_scheduled_);
|
||||||
|
char scratch[2345];
|
||||||
|
compact->compaction->Summary(scratch, sizeof(scratch));
|
||||||
|
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
|
||||||
|
|
||||||
|
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
|
||||||
|
assert(compact->builder == nullptr);
|
||||||
|
assert(!compact->outfile);
|
||||||
|
|
||||||
|
SequenceNumber visible_at_tip = 0;
|
||||||
|
SequenceNumber earliest_snapshot;
|
||||||
|
SequenceNumber latest_snapshot = 0;
|
||||||
|
snapshots_.getAll(compact->existing_snapshots);
|
||||||
|
if (compact->existing_snapshots.size() == 0) {
|
||||||
|
// optimize for fast path if there are no snapshots
|
||||||
|
visible_at_tip = versions_->LastSequence();
|
||||||
|
earliest_snapshot = visible_at_tip;
|
||||||
|
} else {
|
||||||
|
latest_snapshot = compact->existing_snapshots.back();
|
||||||
|
// Add the current seqno as the 'latest' virtual
|
||||||
|
// snapshot to the end of this list.
|
||||||
|
compact->existing_snapshots.push_back(versions_->LastSequence());
|
||||||
|
earliest_snapshot = compact->existing_snapshots[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Is this compaction producing files at the bottommost level?
|
||||||
|
bool bottommost_level = compact->compaction->BottomMostLevel();
|
||||||
|
|
||||||
|
// Allocate the output file numbers before we release the lock
|
||||||
|
AllocateCompactionOutputFileNumbers(compact);
|
||||||
|
|
||||||
|
// Release mutex while we're actually doing the compaction work
|
||||||
|
mutex_.Unlock();
|
||||||
|
|
||||||
|
const uint64_t start_micros = env_->NowMicros();
|
||||||
|
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
|
||||||
|
input->SeekToFirst();
|
||||||
|
shared_ptr<Iterator> backup_input(
|
||||||
|
versions_->MakeInputIterator(compact->compaction));
|
||||||
|
backup_input->SeekToFirst();
|
||||||
|
|
||||||
|
Status status;
|
||||||
|
ParsedInternalKey ikey;
|
||||||
|
std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2
|
||||||
|
= nullptr;
|
||||||
|
auto context = compact->GetFilterContext();
|
||||||
|
compaction_filter_from_factory_v2 =
|
||||||
|
options_.compaction_filter_factory_v2->CreateCompactionFilterV2(context);
|
||||||
|
auto compaction_filter_v2 =
|
||||||
|
compaction_filter_from_factory_v2.get();
|
||||||
|
|
||||||
|
// temp_backup_input always point to the start of the current buffer
|
||||||
|
// temp_backup_input = backup_input;
|
||||||
|
// iterate through input,
|
||||||
|
// 1) buffer ineligible keys and value keys into 2 separate buffers;
|
||||||
|
// 2) send value_buffer to compaction filter and alternate the values;
|
||||||
|
// 3) merge value_buffer with ineligible_value_buffer;
|
||||||
|
// 4) run the modified "compaction" using the old for loop.
|
||||||
|
if (compaction_filter_v2) {
|
||||||
|
for (; backup_input->Valid() && !shutting_down_.Acquire_Load(); ) {
|
||||||
|
// Prioritize immutable compaction work
|
||||||
|
if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) {
|
||||||
|
const uint64_t imm_start = env_->NowMicros();
|
||||||
|
LogFlush(options_.info_log);
|
||||||
|
mutex_.Lock();
|
||||||
|
if (imm_.IsFlushPending()) {
|
||||||
|
FlushMemTableToOutputFile(nullptr, deletion_state, log_buffer);
|
||||||
|
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
|
||||||
|
}
|
||||||
|
mutex_.Unlock();
|
||||||
|
imm_micros += (env_->NowMicros() - imm_start);
|
||||||
|
}
|
||||||
|
|
||||||
|
Slice key = backup_input->key();
|
||||||
|
Slice value = backup_input->value();
|
||||||
|
|
||||||
|
const SliceTransform* transformer =
|
||||||
|
options_.compaction_filter_factory_v2->GetPrefixExtractor();
|
||||||
|
std::string key_prefix = transformer->Transform(key).ToString();
|
||||||
|
if (compact->cur_prefix_ == kNullString) {
|
||||||
|
compact->cur_prefix_ = key_prefix;
|
||||||
|
}
|
||||||
|
if (!ParseInternalKey(key, &ikey)) {
|
||||||
|
// log error
|
||||||
|
Log(options_.info_log, "Failed to parse key: %s",
|
||||||
|
key.ToString().c_str());
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
// If the prefix remains the same, keep buffering
|
||||||
|
if (key_prefix == compact->cur_prefix_) {
|
||||||
|
// Apply the compaction filter V2 to all the kv pairs sharing
|
||||||
|
// the same prefix
|
||||||
|
if (ikey.type == kTypeValue &&
|
||||||
|
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
||||||
|
// Buffer all keys sharing the same prefix for CompactionFilterV2
|
||||||
|
// Iterate through keys to check prefix
|
||||||
|
compact->BufferKeyValueSlices(key, value);
|
||||||
|
} else {
|
||||||
|
// buffer ineligible keys
|
||||||
|
compact->BufferOtherKeyValueSlices(key, value);
|
||||||
|
}
|
||||||
|
backup_input->Next();
|
||||||
|
continue;
|
||||||
|
// finish changing values for eligible keys
|
||||||
|
} else {
|
||||||
|
// Now prefix changes, this batch is done.
|
||||||
|
// Call compaction filter on the buffered values to change the value
|
||||||
|
if (compact->key_buf_.size() > 0) {
|
||||||
|
CallCompactionFilterV2(compact, compaction_filter_v2);
|
||||||
|
}
|
||||||
|
compact->cur_prefix_ = key_prefix;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge this batch of data (values + ineligible keys)
|
||||||
|
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
|
||||||
|
|
||||||
|
// Done buffering for the current prefix. Spit it out to disk
|
||||||
|
// Now just iterate through all the kv-pairs
|
||||||
|
status = ProcessKeyValueCompaction(
|
||||||
|
visible_at_tip,
|
||||||
|
earliest_snapshot,
|
||||||
|
latest_snapshot,
|
||||||
|
deletion_state,
|
||||||
|
bottommost_level,
|
||||||
|
imm_micros,
|
||||||
|
input.get(),
|
||||||
|
compact,
|
||||||
|
true,
|
||||||
|
log_buffer);
|
||||||
|
|
||||||
|
if (!status.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// After writing the kv-pairs, we can safely remove the reference
|
||||||
|
// to the string buffer and clean them up
|
||||||
|
compact->CleanupBatchBuffer();
|
||||||
|
compact->CleanupMergedBuffer();
|
||||||
|
// Buffer the key that triggers the mismatch in prefix
|
||||||
|
if (ikey.type == kTypeValue &&
|
||||||
|
(visible_at_tip || ikey.sequence > latest_snapshot)) {
|
||||||
|
compact->BufferKeyValueSlices(key, value);
|
||||||
|
} else {
|
||||||
|
compact->BufferOtherKeyValueSlices(key, value);
|
||||||
|
}
|
||||||
|
backup_input->Next();
|
||||||
|
if (!backup_input->Valid()) {
|
||||||
|
// If this is the single last value, we need to merge it.
|
||||||
|
if (compact->key_buf_.size() > 0) {
|
||||||
|
CallCompactionFilterV2(compact, compaction_filter_v2);
|
||||||
|
}
|
||||||
|
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
|
||||||
|
|
||||||
|
status = ProcessKeyValueCompaction(
|
||||||
|
visible_at_tip,
|
||||||
|
earliest_snapshot,
|
||||||
|
latest_snapshot,
|
||||||
|
deletion_state,
|
||||||
|
bottommost_level,
|
||||||
|
imm_micros,
|
||||||
|
input.get(),
|
||||||
|
compact,
|
||||||
|
true,
|
||||||
|
log_buffer);
|
||||||
|
|
||||||
|
compact->CleanupBatchBuffer();
|
||||||
|
compact->CleanupMergedBuffer();
|
||||||
|
}
|
||||||
|
} // done processing all prefix batches
|
||||||
|
// finish the last batch
|
||||||
|
if (compact->key_buf_.size() > 0) {
|
||||||
|
CallCompactionFilterV2(compact, compaction_filter_v2);
|
||||||
|
}
|
||||||
|
compact->MergeKeyValueSliceBuffer(&internal_comparator_);
|
||||||
|
status = ProcessKeyValueCompaction(
|
||||||
|
visible_at_tip,
|
||||||
|
earliest_snapshot,
|
||||||
|
latest_snapshot,
|
||||||
|
deletion_state,
|
||||||
|
bottommost_level,
|
||||||
|
imm_micros,
|
||||||
|
input.get(),
|
||||||
|
compact,
|
||||||
|
true,
|
||||||
|
log_buffer);
|
||||||
|
} // checking for compaction filter v2
|
||||||
|
|
||||||
|
if (!compaction_filter_v2) {
|
||||||
|
status = ProcessKeyValueCompaction(
|
||||||
|
visible_at_tip,
|
||||||
|
earliest_snapshot,
|
||||||
|
latest_snapshot,
|
||||||
|
deletion_state,
|
||||||
|
bottommost_level,
|
||||||
|
imm_micros,
|
||||||
|
input.get(),
|
||||||
|
compact,
|
||||||
|
false,
|
||||||
|
log_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
if (status.ok() && shutting_down_.Acquire_Load()) {
|
if (status.ok() && shutting_down_.Acquire_Load()) {
|
||||||
status = Status::ShutdownInProgress(
|
status = Status::ShutdownInProgress(
|
||||||
"Database shutdown started during compaction");
|
"Database shutdown started during compaction");
|
||||||
|
19
db/db_impl.h
19
db/db_impl.h
@ -36,6 +36,7 @@ class TableCache;
|
|||||||
class Version;
|
class Version;
|
||||||
class VersionEdit;
|
class VersionEdit;
|
||||||
class VersionSet;
|
class VersionSet;
|
||||||
|
class CompactionFilterV2;
|
||||||
|
|
||||||
class DBImpl : public DB {
|
class DBImpl : public DB {
|
||||||
public:
|
public:
|
||||||
@ -366,6 +367,24 @@ class DBImpl : public DB {
|
|||||||
DeletionState& deletion_state,
|
DeletionState& deletion_state,
|
||||||
LogBuffer* log_buffer);
|
LogBuffer* log_buffer);
|
||||||
|
|
||||||
|
// Call compaction filter if is_compaction_v2 is not true. Then iterate
|
||||||
|
// through input and compact the kv-pairs
|
||||||
|
Status ProcessKeyValueCompaction(
|
||||||
|
SequenceNumber visible_at_tip,
|
||||||
|
SequenceNumber earliest_snapshot,
|
||||||
|
SequenceNumber latest_snapshot,
|
||||||
|
DeletionState& deletion_state,
|
||||||
|
bool bottommost_level,
|
||||||
|
int64_t& imm_micros,
|
||||||
|
Iterator* input,
|
||||||
|
CompactionState* compact,
|
||||||
|
bool is_compaction_v2,
|
||||||
|
LogBuffer* log_buffer);
|
||||||
|
|
||||||
|
// Call compaction_filter_v2->Filter() on kv-pairs in compact
|
||||||
|
void CallCompactionFilterV2(CompactionState* compact,
|
||||||
|
CompactionFilterV2* compaction_filter_v2);
|
||||||
|
|
||||||
Status OpenCompactionOutputFile(CompactionState* compact);
|
Status OpenCompactionOutputFile(CompactionState* compact);
|
||||||
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
|
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
|
||||||
Status InstallCompactionResults(CompactionState* compact);
|
Status InstallCompactionResults(CompactionState* compact);
|
||||||
|
254
db/db_test.cc
254
db/db_test.cc
@ -2434,7 +2434,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
|
|||||||
: check_context_(check_context) {}
|
: check_context_(check_context) {}
|
||||||
|
|
||||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||||
const CompactionFilter::Context& context) override {
|
const CompactionFilterContext& context) override {
|
||||||
if (check_context_) {
|
if (check_context_) {
|
||||||
ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
|
ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
|
||||||
ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
|
ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
|
||||||
@ -2451,7 +2451,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
|
|||||||
class DeleteFilterFactory : public CompactionFilterFactory {
|
class DeleteFilterFactory : public CompactionFilterFactory {
|
||||||
public:
|
public:
|
||||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||||
const CompactionFilter::Context& context) override {
|
const CompactionFilterContext& context) override {
|
||||||
if (context.is_manual_compaction) {
|
if (context.is_manual_compaction) {
|
||||||
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
|
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
|
||||||
} else {
|
} else {
|
||||||
@ -2467,7 +2467,7 @@ class ChangeFilterFactory : public CompactionFilterFactory {
|
|||||||
explicit ChangeFilterFactory() {}
|
explicit ChangeFilterFactory() {}
|
||||||
|
|
||||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||||
const CompactionFilter::Context& context) override {
|
const CompactionFilterContext& context) override {
|
||||||
return std::unique_ptr<CompactionFilter>(new ChangeFilter());
|
return std::unique_ptr<CompactionFilter>(new ChangeFilter());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3507,7 +3507,7 @@ TEST(DBTest, CompactionFilterWithValueChange) {
|
|||||||
|
|
||||||
// verify that all keys now have the new value that
|
// verify that all keys now have the new value that
|
||||||
// was set by the compaction process.
|
// was set by the compaction process.
|
||||||
for (int i = 0; i < 100000; i++) {
|
for (int i = 0; i < 100001; i++) {
|
||||||
char key[100];
|
char key[100];
|
||||||
snprintf(key, sizeof(key), "B%010d", i);
|
snprintf(key, sizeof(key), "B%010d", i);
|
||||||
std::string newvalue = Get(key);
|
std::string newvalue = Get(key);
|
||||||
@ -3570,6 +3570,252 @@ TEST(DBTest, CompactionFilterContextManual) {
|
|||||||
delete iter;
|
delete iter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class KeepFilterV2 : public CompactionFilterV2 {
|
||||||
|
public:
|
||||||
|
virtual std::vector<bool> Filter(int level,
|
||||||
|
const SliceVector& keys,
|
||||||
|
const SliceVector& existing_values,
|
||||||
|
std::vector<std::string>* new_values,
|
||||||
|
std::vector<bool>* values_changed)
|
||||||
|
const override {
|
||||||
|
cfilter_count++;
|
||||||
|
std::vector<bool> ret;
|
||||||
|
new_values->clear();
|
||||||
|
values_changed->clear();
|
||||||
|
for (unsigned int i = 0; i < keys.size(); ++i) {
|
||||||
|
values_changed->push_back(false);
|
||||||
|
ret.push_back(false);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "KeepFilterV2";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class DeleteFilterV2 : public CompactionFilterV2 {
|
||||||
|
public:
|
||||||
|
virtual std::vector<bool> Filter(int level,
|
||||||
|
const SliceVector& keys,
|
||||||
|
const SliceVector& existing_values,
|
||||||
|
std::vector<std::string>* new_values,
|
||||||
|
std::vector<bool>* values_changed)
|
||||||
|
const override {
|
||||||
|
cfilter_count++;
|
||||||
|
new_values->clear();
|
||||||
|
values_changed->clear();
|
||||||
|
std::vector<bool> ret;
|
||||||
|
for (unsigned int i = 0; i < keys.size(); ++i) {
|
||||||
|
values_changed->push_back(false);
|
||||||
|
ret.push_back(true);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "DeleteFilterV2";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class ChangeFilterV2 : public CompactionFilterV2 {
|
||||||
|
public:
|
||||||
|
virtual std::vector<bool> Filter(int level,
|
||||||
|
const SliceVector& keys,
|
||||||
|
const SliceVector& existing_values,
|
||||||
|
std::vector<std::string>* new_values,
|
||||||
|
std::vector<bool>* values_changed)
|
||||||
|
const override {
|
||||||
|
std::vector<bool> ret;
|
||||||
|
new_values->clear();
|
||||||
|
values_changed->clear();
|
||||||
|
for (unsigned int i = 0; i < keys.size(); ++i) {
|
||||||
|
values_changed->push_back(true);
|
||||||
|
new_values->push_back(NEW_VALUE);
|
||||||
|
ret.push_back(false);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "ChangeFilterV2";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class KeepFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||||
|
public:
|
||||||
|
explicit KeepFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||||
|
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||||
|
|
||||||
|
virtual std::unique_ptr<CompactionFilterV2>
|
||||||
|
CreateCompactionFilterV2(
|
||||||
|
const CompactionFilterContext& context) override {
|
||||||
|
return std::unique_ptr<CompactionFilterV2>(new KeepFilterV2());
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "KeepFilterFactoryV2";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class DeleteFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||||
|
public:
|
||||||
|
explicit DeleteFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||||
|
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||||
|
|
||||||
|
virtual std::unique_ptr<CompactionFilterV2>
|
||||||
|
CreateCompactionFilterV2(
|
||||||
|
const CompactionFilterContext& context) override {
|
||||||
|
return std::unique_ptr<CompactionFilterV2>(new DeleteFilterV2());
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "DeleteFilterFactoryV2";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class ChangeFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||||
|
public:
|
||||||
|
explicit ChangeFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||||
|
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||||
|
|
||||||
|
virtual std::unique_ptr<CompactionFilterV2>
|
||||||
|
CreateCompactionFilterV2(
|
||||||
|
const CompactionFilterContext& context) override {
|
||||||
|
return std::unique_ptr<CompactionFilterV2>(new ChangeFilterV2());
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "ChangeFilterFactoryV2";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST(DBTest, CompactionFilterV2) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.num_levels = 3;
|
||||||
|
options.max_mem_compaction_level = 0;
|
||||||
|
// extract prefix
|
||||||
|
auto prefix_extractor = NewFixedPrefixTransform(8);
|
||||||
|
options.compaction_filter_factory_v2
|
||||||
|
= std::make_shared<KeepFilterFactoryV2>(prefix_extractor);
|
||||||
|
// In a testing environment, we can only flush the application
|
||||||
|
// compaction filter buffer using universal compaction
|
||||||
|
option_config_ = kUniversalCompaction;
|
||||||
|
options.compaction_style = (rocksdb::CompactionStyle)1;
|
||||||
|
Reopen(&options);
|
||||||
|
|
||||||
|
// Write 100K keys, these are written to a few files in L0.
|
||||||
|
const std::string value(10, 'x');
|
||||||
|
for (int i = 0; i < 100000; i++) {
|
||||||
|
char key[100];
|
||||||
|
snprintf(key, sizeof(key), "B%08d%010d", i , i);
|
||||||
|
Put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
dbfull()->TEST_FlushMemTable();
|
||||||
|
|
||||||
|
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||||
|
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
|
||||||
|
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
|
||||||
|
|
||||||
|
// All the files are in the lowest level.
|
||||||
|
int count = 0;
|
||||||
|
int total = 0;
|
||||||
|
Iterator* iter = dbfull()->TEST_NewInternalIterator();
|
||||||
|
iter->SeekToFirst();
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
while (iter->Valid()) {
|
||||||
|
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
|
||||||
|
ikey.sequence = -1;
|
||||||
|
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
|
||||||
|
total++;
|
||||||
|
if (ikey.sequence != 0) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
iter->Next();
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(total, 100000);
|
||||||
|
// 1 snapshot only. Since we are using universal compacton,
|
||||||
|
// the sequence no is cleared for better compression
|
||||||
|
ASSERT_EQ(count, 1);
|
||||||
|
delete iter;
|
||||||
|
|
||||||
|
// create a new database with the compaction
|
||||||
|
// filter in such a way that it deletes all keys
|
||||||
|
options.compaction_filter_factory_v2 =
|
||||||
|
std::make_shared<DeleteFilterFactoryV2>(prefix_extractor);
|
||||||
|
options.create_if_missing = true;
|
||||||
|
DestroyAndReopen(&options);
|
||||||
|
|
||||||
|
// write all the keys once again.
|
||||||
|
for (int i = 0; i < 100000; i++) {
|
||||||
|
char key[100];
|
||||||
|
snprintf(key, sizeof(key), "B%08d%010d", i, i);
|
||||||
|
Put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
dbfull()->TEST_FlushMemTable();
|
||||||
|
ASSERT_NE(NumTableFilesAtLevel(0), 0);
|
||||||
|
|
||||||
|
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||||
|
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
|
||||||
|
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
|
||||||
|
|
||||||
|
// Scan the entire database to ensure that nothing is left
|
||||||
|
iter = db_->NewIterator(ReadOptions());
|
||||||
|
iter->SeekToFirst();
|
||||||
|
count = 0;
|
||||||
|
while (iter->Valid()) {
|
||||||
|
count++;
|
||||||
|
iter->Next();
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(count, 0);
|
||||||
|
delete iter;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(DBTest, CompactionFilterV2WithValueChange) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.num_levels = 3;
|
||||||
|
options.max_mem_compaction_level = 0;
|
||||||
|
auto prefix_extractor = NewFixedPrefixTransform(8);
|
||||||
|
options.compaction_filter_factory_v2 =
|
||||||
|
std::make_shared<ChangeFilterFactoryV2>(prefix_extractor);
|
||||||
|
// In a testing environment, we can only flush the application
|
||||||
|
// compaction filter buffer using universal compaction
|
||||||
|
option_config_ = kUniversalCompaction;
|
||||||
|
options.compaction_style = (rocksdb::CompactionStyle)1;
|
||||||
|
Reopen(&options);
|
||||||
|
|
||||||
|
// Write 100K+1 keys, these are written to a few files
|
||||||
|
// in L0. We do this so that the current snapshot points
|
||||||
|
// to the 100001 key.The compaction filter is not invoked
|
||||||
|
// on keys that are visible via a snapshot because we
|
||||||
|
// anyways cannot delete it.
|
||||||
|
const std::string value(10, 'x');
|
||||||
|
for (int i = 0; i < 100001; i++) {
|
||||||
|
char key[100];
|
||||||
|
snprintf(key, sizeof(key), "B%08d%010d", i, i);
|
||||||
|
Put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
// push all files to lower levels
|
||||||
|
dbfull()->TEST_FlushMemTable();
|
||||||
|
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||||
|
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
|
||||||
|
|
||||||
|
// verify that all keys now have the new value that
|
||||||
|
// was set by the compaction process.
|
||||||
|
for (int i = 0; i < 100001; i++) {
|
||||||
|
char key[100];
|
||||||
|
snprintf(key, sizeof(key), "B%08d%010d", i, i);
|
||||||
|
std::string newvalue = Get(key);
|
||||||
|
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST(DBTest, SparseMerge) {
|
TEST(DBTest, SparseMerge) {
|
||||||
do {
|
do {
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
|
@ -21,7 +21,7 @@ namespace rocksdb {
|
|||||||
// operands_ stores the list of merge operands encountered while merging.
|
// operands_ stores the list of merge operands encountered while merging.
|
||||||
// keys_[i] corresponds to operands_[i] for each i.
|
// keys_[i] corresponds to operands_[i] for each i.
|
||||||
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||||
bool at_bottom, Statistics* stats) {
|
bool at_bottom, Statistics* stats, int* steps) {
|
||||||
// Get a copy of the internal key, before it's invalidated by iter->Next()
|
// Get a copy of the internal key, before it's invalidated by iter->Next()
|
||||||
// Also maintain the list of merge operands seen.
|
// Also maintain the list of merge operands seen.
|
||||||
keys_.clear();
|
keys_.clear();
|
||||||
@ -42,6 +42,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||||||
bool hit_the_next_user_key = false;
|
bool hit_the_next_user_key = false;
|
||||||
ParsedInternalKey ikey;
|
ParsedInternalKey ikey;
|
||||||
std::string merge_result; // Temporary value for merge results
|
std::string merge_result; // Temporary value for merge results
|
||||||
|
if (steps) {
|
||||||
|
++(*steps);
|
||||||
|
}
|
||||||
for (iter->Next(); iter->Valid(); iter->Next()) {
|
for (iter->Next(); iter->Valid(); iter->Next()) {
|
||||||
assert(operands_.size() >= 1); // Should be invariants!
|
assert(operands_.size() >= 1); // Should be invariants!
|
||||||
assert(keys_.size() == operands_.size());
|
assert(keys_.size() == operands_.size());
|
||||||
@ -91,6 +94,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||||||
|
|
||||||
// move iter to the next entry (before doing anything else)
|
// move iter to the next entry (before doing anything else)
|
||||||
iter->Next();
|
iter->Next();
|
||||||
|
if (steps) {
|
||||||
|
++(*steps);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,6 +125,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||||||
|
|
||||||
// move iter to the next entry
|
// move iter to the next entry
|
||||||
iter->Next();
|
iter->Next();
|
||||||
|
if (steps) {
|
||||||
|
++(*steps);
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -133,6 +142,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||||||
// request or later did a partial merge.
|
// request or later did a partial merge.
|
||||||
keys_.push_front(iter->key().ToString());
|
keys_.push_front(iter->key().ToString());
|
||||||
operands_.push_front(iter->value().ToString());
|
operands_.push_front(iter->value().ToString());
|
||||||
|
if (steps) {
|
||||||
|
++(*steps);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,7 +47,8 @@ class MergeHelper {
|
|||||||
// at_bottom: (IN) true if the iterator covers the bottem level, which means
|
// at_bottom: (IN) true if the iterator covers the bottem level, which means
|
||||||
// we could reach the start of the history of this user key.
|
// we could reach the start of the history of this user key.
|
||||||
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
|
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
|
||||||
bool at_bottom = false, Statistics* stats = nullptr);
|
bool at_bottom = false, Statistics* stats = nullptr,
|
||||||
|
int* steps = nullptr);
|
||||||
|
|
||||||
// Query the merge result
|
// Query the merge result
|
||||||
// These are valid until the next MergeUntil call
|
// These are valid until the next MergeUntil call
|
||||||
|
@ -10,26 +10,27 @@
|
|||||||
#define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
|
#define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
class Slice;
|
class Slice;
|
||||||
|
class SliceTransform;
|
||||||
|
|
||||||
|
// Context information of a compaction run
|
||||||
|
struct CompactionFilterContext {
|
||||||
|
// Does this compaction run include all data files
|
||||||
|
bool is_full_compaction;
|
||||||
|
// Is this compaction requested by the client (true),
|
||||||
|
// or is it occurring as an automatic compaction process
|
||||||
|
bool is_manual_compaction;
|
||||||
|
};
|
||||||
|
|
||||||
// CompactionFilter allows an application to modify/delete a key-value at
|
// CompactionFilter allows an application to modify/delete a key-value at
|
||||||
// the time of compaction.
|
// the time of compaction.
|
||||||
|
|
||||||
class CompactionFilter {
|
class CompactionFilter {
|
||||||
public:
|
public:
|
||||||
|
|
||||||
// Context information of a compaction run
|
|
||||||
struct Context {
|
|
||||||
// Does this compaction run include all data files
|
|
||||||
bool is_full_compaction;
|
|
||||||
// Is this compaction requested by the client (true),
|
|
||||||
// or is it occurring as an automatic compaction process
|
|
||||||
bool is_manual_compaction;
|
|
||||||
};
|
|
||||||
|
|
||||||
virtual ~CompactionFilter() {}
|
virtual ~CompactionFilter() {}
|
||||||
|
|
||||||
// The compaction process invokes this
|
// The compaction process invokes this
|
||||||
@ -64,14 +65,47 @@ class CompactionFilter {
|
|||||||
virtual const char* Name() const = 0;
|
virtual const char* Name() const = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// CompactionFilterV2 that buffers kv pairs sharing the same prefix and let
|
||||||
|
// application layer to make individual decisions for all the kv pairs in the
|
||||||
|
// buffer.
|
||||||
|
class CompactionFilterV2 {
|
||||||
|
public:
|
||||||
|
virtual ~CompactionFilterV2() {}
|
||||||
|
|
||||||
|
// The compaction process invokes this method for all the kv pairs
|
||||||
|
// sharing the same prefix. It is a "roll-up" version of CompactionFilter.
|
||||||
|
//
|
||||||
|
// Each entry in the return vector indicates if the corresponding kv should
|
||||||
|
// be preserved in the output of this compaction run. The application can
|
||||||
|
// inspect the exisitng values of the keys and make decision based on it.
|
||||||
|
//
|
||||||
|
// When a value is to be preserved, the application has the option
|
||||||
|
// to modify the entry in existing_values and pass it back through an entry
|
||||||
|
// in new_values. A corresponding values_changed entry needs to be set to
|
||||||
|
// true in this case. Note that the new_values vector contains only changed
|
||||||
|
// values, i.e. new_values.size() <= values_changed.size().
|
||||||
|
//
|
||||||
|
typedef std::vector<Slice> SliceVector;
|
||||||
|
virtual std::vector<bool> Filter(int level,
|
||||||
|
const SliceVector& keys,
|
||||||
|
const SliceVector& existing_values,
|
||||||
|
std::vector<std::string>* new_values,
|
||||||
|
std::vector<bool>* values_changed)
|
||||||
|
const = 0;
|
||||||
|
|
||||||
|
// Returns a name that identifies this compaction filter.
|
||||||
|
// The name will be printed to LOG file on start up for diagnosis.
|
||||||
|
virtual const char* Name() const = 0;
|
||||||
|
};
|
||||||
|
|
||||||
// Each compaction will create a new CompactionFilter allowing the
|
// Each compaction will create a new CompactionFilter allowing the
|
||||||
// application to know about different campactions
|
// application to know about different campactions
|
||||||
class CompactionFilterFactory {
|
class CompactionFilterFactory {
|
||||||
public:
|
public:
|
||||||
virtual ~CompactionFilterFactory() { };
|
virtual ~CompactionFilterFactory() { }
|
||||||
|
|
||||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||||
const CompactionFilter::Context& context) = 0;
|
const CompactionFilterContext& context) = 0;
|
||||||
|
|
||||||
// Returns a name that identifies this compaction filter factory.
|
// Returns a name that identifies this compaction filter factory.
|
||||||
virtual const char* Name() const = 0;
|
virtual const char* Name() const = 0;
|
||||||
@ -82,7 +116,7 @@ class CompactionFilterFactory {
|
|||||||
class DefaultCompactionFilterFactory : public CompactionFilterFactory {
|
class DefaultCompactionFilterFactory : public CompactionFilterFactory {
|
||||||
public:
|
public:
|
||||||
virtual std::unique_ptr<CompactionFilter>
|
virtual std::unique_ptr<CompactionFilter>
|
||||||
CreateCompactionFilter(const CompactionFilter::Context& context) override {
|
CreateCompactionFilter(const CompactionFilterContext& context) override {
|
||||||
return std::unique_ptr<CompactionFilter>(nullptr);
|
return std::unique_ptr<CompactionFilter>(nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,6 +125,65 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Each compaction will create a new CompactionFilterV2
|
||||||
|
//
|
||||||
|
// CompactionFilterFactoryV2 enables application to specify a prefix and use
|
||||||
|
// CompactionFilterV2 to filter kv-pairs in batches. Each batch contains all
|
||||||
|
// the kv-pairs sharing the same prefix.
|
||||||
|
//
|
||||||
|
// This is useful for applications that require grouping kv-pairs in
|
||||||
|
// compaction filter to make a purge/no-purge decision. For example, if the
|
||||||
|
// key prefix is user id and the rest of key represents the type of value.
|
||||||
|
// This batching filter will come in handy if the application's compaction
|
||||||
|
// filter requires knowledge of all types of values for any user id.
|
||||||
|
//
|
||||||
|
class CompactionFilterFactoryV2 {
|
||||||
|
public:
|
||||||
|
explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor)
|
||||||
|
: prefix_extractor_(prefix_extractor) { }
|
||||||
|
|
||||||
|
virtual ~CompactionFilterFactoryV2() { }
|
||||||
|
|
||||||
|
virtual std::unique_ptr<CompactionFilterV2> CreateCompactionFilterV2(
|
||||||
|
const CompactionFilterContext& context) = 0;
|
||||||
|
|
||||||
|
// Returns a name that identifies this compaction filter factory.
|
||||||
|
virtual const char* Name() const = 0;
|
||||||
|
|
||||||
|
const SliceTransform* GetPrefixExtractor() const {
|
||||||
|
return prefix_extractor_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetPrefixExtractor(const SliceTransform* prefix_extractor) {
|
||||||
|
prefix_extractor_ = prefix_extractor;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Prefix extractor for compaction filter v2
|
||||||
|
// Keys sharing the same prefix will be buffered internally.
|
||||||
|
// Client can implement a Filter callback function to operate on the buffer
|
||||||
|
const SliceTransform* prefix_extractor_;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Default implementaion of CompactionFilterFactoryV2 which does not
|
||||||
|
// return any filter
|
||||||
|
class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 {
|
||||||
|
public:
|
||||||
|
explicit DefaultCompactionFilterFactoryV2(
|
||||||
|
const SliceTransform* prefix_extractor)
|
||||||
|
: CompactionFilterFactoryV2(prefix_extractor) { }
|
||||||
|
|
||||||
|
virtual std::unique_ptr<CompactionFilterV2>
|
||||||
|
CreateCompactionFilterV2(
|
||||||
|
const CompactionFilterContext& context) override {
|
||||||
|
return std::unique_ptr<CompactionFilterV2>(nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual const char* Name() const override {
|
||||||
|
return "DefaultCompactionFilterFactoryV2";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
#endif // STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
|
#endif // STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
|
||||||
|
@ -214,7 +214,7 @@ class Env {
|
|||||||
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
|
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
|
||||||
|
|
||||||
// Wait for all threads started by StartThread to terminate.
|
// Wait for all threads started by StartThread to terminate.
|
||||||
virtual void WaitForJoin() = 0;
|
virtual void WaitForJoin() {}
|
||||||
|
|
||||||
// Get thread pool queue length for specific thrad pool.
|
// Get thread pool queue length for specific thrad pool.
|
||||||
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {
|
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {
|
||||||
|
@ -22,6 +22,7 @@ namespace rocksdb {
|
|||||||
class Cache;
|
class Cache;
|
||||||
class CompactionFilter;
|
class CompactionFilter;
|
||||||
class CompactionFilterFactory;
|
class CompactionFilterFactory;
|
||||||
|
class CompactionFilterFactoryV2;
|
||||||
class Comparator;
|
class Comparator;
|
||||||
class Env;
|
class Env;
|
||||||
enum InfoLogLevel : unsigned char;
|
enum InfoLogLevel : unsigned char;
|
||||||
@ -123,6 +124,10 @@ struct Options {
|
|||||||
// Default: a factory that doesn't provide any object
|
// Default: a factory that doesn't provide any object
|
||||||
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
|
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
|
||||||
|
|
||||||
|
// Version TWO of the compaction_filter_factory
|
||||||
|
// It supports rolling compaction
|
||||||
|
std::shared_ptr<CompactionFilterFactoryV2> compaction_filter_factory_v2;
|
||||||
|
|
||||||
// If true, the database will be created if it is missing.
|
// If true, the database will be created if it is missing.
|
||||||
// Default: false
|
// Default: false
|
||||||
bool create_if_missing;
|
bool create_if_missing;
|
||||||
|
@ -32,6 +32,9 @@ Options::Options()
|
|||||||
compaction_filter(nullptr),
|
compaction_filter(nullptr),
|
||||||
compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>(
|
compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>(
|
||||||
new DefaultCompactionFilterFactory())),
|
new DefaultCompactionFilterFactory())),
|
||||||
|
compaction_filter_factory_v2(
|
||||||
|
new DefaultCompactionFilterFactoryV2(
|
||||||
|
NewFixedPrefixTransform(8))),
|
||||||
create_if_missing(false),
|
create_if_missing(false),
|
||||||
error_if_exists(false),
|
error_if_exists(false),
|
||||||
paranoid_checks(false),
|
paranoid_checks(false),
|
||||||
@ -131,6 +134,8 @@ Options::Dump(Logger* log) const
|
|||||||
compaction_filter? compaction_filter->Name() : "None");
|
compaction_filter? compaction_filter->Name() : "None");
|
||||||
Log(log," Options.compaction_filter_factory: %s",
|
Log(log," Options.compaction_filter_factory: %s",
|
||||||
compaction_filter_factory->Name());
|
compaction_filter_factory->Name());
|
||||||
|
Log(log, " Options.compaction_filter_factory_v2: %s",
|
||||||
|
compaction_filter_factory_v2->Name());
|
||||||
Log(log," Options.memtable_factory: %s",
|
Log(log," Options.memtable_factory: %s",
|
||||||
memtable_factory->Name());
|
memtable_factory->Name());
|
||||||
Log(log," Options.table_factory: %s", table_factory->Name());
|
Log(log," Options.table_factory: %s", table_factory->Name());
|
||||||
|
@ -192,7 +192,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
|
|||||||
user_comp_filter_factory_(comp_filter_factory) { }
|
user_comp_filter_factory_(comp_filter_factory) { }
|
||||||
|
|
||||||
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
||||||
const CompactionFilter::Context& context) {
|
const CompactionFilterContext& context) {
|
||||||
return std::unique_ptr<TtlCompactionFilter>(
|
return std::unique_ptr<TtlCompactionFilter>(
|
||||||
new TtlCompactionFilter(
|
new TtlCompactionFilter(
|
||||||
ttl_,
|
ttl_,
|
||||||
|
@ -285,7 +285,7 @@ class TtlTest {
|
|||||||
|
|
||||||
virtual std::unique_ptr<CompactionFilter>
|
virtual std::unique_ptr<CompactionFilter>
|
||||||
CreateCompactionFilter(
|
CreateCompactionFilter(
|
||||||
const CompactionFilter::Context& context) override {
|
const CompactionFilterContext& context) override {
|
||||||
return std::unique_ptr<CompactionFilter>(
|
return std::unique_ptr<CompactionFilter>(
|
||||||
new TestFilter(kSampleSize_, kNewValue_));
|
new TestFilter(kSampleSize_, kNewValue_));
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user