Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	include/rocksdb/options.h
	util/options.cc
This commit is contained in:
Igor Canadi 2014-03-25 11:09:40 -07:00
commit e8168382c4
29 changed files with 1280 additions and 281 deletions

View File

@ -13,7 +13,7 @@
* Removed arena.h from public header files.
* By default, checksums are verified on every read from database
* Added is_manual_compaction to CompactionFilter::Context
* Added "virtual void WaitForJoin() = 0" in class Env
* Added "virtual void WaitForJoin()" in class Env. Default operation is no-op.
* Removed BackupEngine::DeleteBackupsNewerThan() function
* Added new option -- verify_checksums_in_compaction
* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership)
@ -21,11 +21,13 @@
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
* Added a command "checkconsistency" in ldb tool, which checks
if file system state matches DB state (file existence and file sizes)
* CompactionFilter::Context is now CompactionFilterContext. It is shared by CompactionFilter and CompactionFilterV2
### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files,
we will ignore it. We assume that writers of these records were interrupted
and that we can safely ignore it.
* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB.
## 2.7.0 (01/28/2014)

View File

@ -73,6 +73,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
MergeHelper merge(internal_comparator.user_comparator(),
options.merge_operator.get(), options.info_log.get(),
options.min_partial_merge_operands,
true /* internal key corruption is not ok */);
if (purge) {

64
db/c.cc
View File

@ -159,12 +159,10 @@ struct rocksdb_mergeoperator_t : public MergeOperator {
const char* const* operands_list, const size_t* operands_list_length,
int num_operands,
unsigned char* success, size_t* new_value_length);
char* (*partial_merge_)(
void*,
const char* key, size_t key_length,
const char* left_operand, size_t left_operand_length,
const char* right_operand, size_t right_operand_length,
unsigned char* success, size_t* new_value_length);
char* (*partial_merge_)(void*, const char* key, size_t key_length,
const char* const* operands_list,
const size_t* operands_list_length, int num_operands,
unsigned char* success, size_t* new_value_length);
void (*delete_value_)(
void*,
const char* value, size_t value_length);
@ -219,21 +217,23 @@ struct rocksdb_mergeoperator_t : public MergeOperator {
return success;
}
virtual bool PartialMerge(
const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const {
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const {
size_t operand_count = operand_list.size();
std::vector<const char*> operand_pointers(operand_count);
std::vector<size_t> operand_sizes(operand_count);
for (size_t i = 0; i < operand_count; ++i) {
Slice operand(operand_list[i]);
operand_pointers[i] = operand.data();
operand_sizes[i] = operand.size();
}
unsigned char success;
size_t new_value_len;
char* tmp_new_value = (*partial_merge_)(
state_,
key.data(), key.size(),
left_operand.data(), left_operand.size(),
right_operand.data(), right_operand.size(),
&success, &new_value_len);
state_, key.data(), key.size(), &operand_pointers[0], &operand_sizes[0],
operand_count, &success, &new_value_len);
new_value->assign(tmp_new_value, new_value_len);
if (delete_value_ != nullptr) {
@ -1094,24 +1094,18 @@ rocksdb_filterpolicy_t* rocksdb_filterpolicy_create_bloom(int bits_per_key) {
}
rocksdb_mergeoperator_t* rocksdb_mergeoperator_create(
void* state,
void (*destructor)(void*),
char* (*full_merge)(
void*,
const char* key, size_t key_length,
const char* existing_value, size_t existing_value_length,
const char* const* operands_list, const size_t* operands_list_length,
int num_operands,
unsigned char* success, size_t* new_value_length),
char* (*partial_merge)(
void*,
const char* key, size_t key_length,
const char* left_operand, size_t left_operand_length,
const char* right_operand, size_t right_operand_length,
unsigned char* success, size_t* new_value_length),
void (*delete_value)(
void*,
const char* value, size_t value_length),
void* state, void (*destructor)(void*),
char* (*full_merge)(void*, const char* key, size_t key_length,
const char* existing_value,
size_t existing_value_length,
const char* const* operands_list,
const size_t* operands_list_length, int num_operands,
unsigned char* success, size_t* new_value_length),
char* (*partial_merge)(void*, const char* key, size_t key_length,
const char* const* operands_list,
const size_t* operands_list_length, int num_operands,
unsigned char* success, size_t* new_value_length),
void (*delete_value)(void*, const char* value, size_t value_length),
const char* (*name)(void*)) {
rocksdb_mergeoperator_t* result = new rocksdb_mergeoperator_t;
result->state_ = state;

View File

@ -175,8 +175,8 @@ static char* MergeOperatorFullMerge(
static char* MergeOperatorPartialMerge(
void* arg,
const char* key, size_t key_length,
const char* left_operand, size_t left_operand_length,
const char* right_operand, size_t right_operand_length,
const char* const* operands_list, const size_t* operands_list_length,
int num_operands,
unsigned char* success, size_t* new_value_length) {
*new_value_length = 4;
*success = 1;

View File

@ -70,6 +70,7 @@
namespace rocksdb {
const std::string default_column_family_name("default");
const std::string kNullString = "NULL";
void DumpLeveldbBuildVersion(Logger * log);
@ -118,12 +119,129 @@ struct DBImpl::CompactionState {
}
// Create a client visible context of this compaction
CompactionFilter::Context GetFilterContext() {
CompactionFilter::Context context;
CompactionFilterContext GetFilterContext() {
CompactionFilterContext context;
context.is_full_compaction = compaction->IsFullCompaction();
context.is_manual_compaction = compaction->IsManualCompaction();
return context;
}
std::vector<Slice> key_buf_;
std::vector<Slice> existing_value_buf_;
std::vector<std::string> key_str_buf_;
std::vector<std::string> existing_value_str_buf_;
// new_value_buf_ will only be appended if a value changes
std::vector<std::string> new_value_buf_;
// if values_changed_buf_[i] is true
// new_value_buf_ will add a new entry with the changed value
std::vector<bool> value_changed_buf_;
// to_delete_buf_[i] is true iff key_buf_[i] is deleted
std::vector<bool> to_delete_buf_;
// buffer for the parsed internal keys, the string buffer is backed
// by key_str_buf_
std::vector<ParsedInternalKey> ikey_buf_;
std::vector<Slice> other_key_buf_;
std::vector<Slice> other_value_buf_;
std::vector<std::string> other_key_str_buf_;
std::vector<std::string> other_value_str_buf_;
std::vector<Slice> combined_key_buf_;
std::vector<Slice> combined_value_buf_;
std::string cur_prefix_;
// Buffers the kv-pair that will be run through compaction filter V2
// in the future.
void BufferKeyValueSlices(const Slice& key, const Slice& value) {
key_str_buf_.emplace_back(key.ToString());
existing_value_str_buf_.emplace_back(value.ToString());
key_buf_.emplace_back(Slice(key_str_buf_.back()));
existing_value_buf_.emplace_back(Slice(existing_value_str_buf_.back()));
ParsedInternalKey ikey;
ParseInternalKey(key_buf_.back(), &ikey);
ikey_buf_.emplace_back(ikey);
}
// Buffers the kv-pair that will not be run through compaction filter V2
// in the future.
void BufferOtherKeyValueSlices(const Slice& key, const Slice& value) {
other_key_str_buf_.emplace_back(key.ToString());
other_value_str_buf_.emplace_back(value.ToString());
other_key_buf_.emplace_back(Slice(other_key_str_buf_.back()));
other_value_buf_.emplace_back(Slice(other_value_str_buf_.back()));
}
// Add a kv-pair to the combined buffer
void AddToCombinedKeyValueSlices(const Slice& key, const Slice& value) {
// The real strings are stored in the batch buffers
combined_key_buf_.emplace_back(key);
combined_value_buf_.emplace_back(value);
}
// Merging the two buffers
void MergeKeyValueSliceBuffer(const InternalKeyComparator* comparator) {
size_t i = 0;
size_t j = 0;
size_t total_size = key_buf_.size() + other_key_buf_.size();
combined_key_buf_.reserve(total_size);
combined_value_buf_.reserve(total_size);
while (i + j < total_size) {
int comp_res = 0;
if (i < key_buf_.size() && j < other_key_buf_.size()) {
comp_res = comparator->Compare(key_buf_[i], other_key_buf_[j]);
} else if (i >= key_buf_.size() && j < other_key_buf_.size()) {
comp_res = 1;
} else if (j >= other_key_buf_.size() && i < key_buf_.size()) {
comp_res = -1;
}
if (comp_res > 0) {
AddToCombinedKeyValueSlices(other_key_buf_[j], other_value_buf_[j]);
j++;
} else if (comp_res < 0) {
AddToCombinedKeyValueSlices(key_buf_[i], existing_value_buf_[i]);
i++;
}
}
}
void CleanupBatchBuffer() {
to_delete_buf_.clear();
key_buf_.clear();
existing_value_buf_.clear();
key_str_buf_.clear();
existing_value_str_buf_.clear();
new_value_buf_.clear();
value_changed_buf_.clear();
ikey_buf_.clear();
to_delete_buf_.shrink_to_fit();
key_buf_.shrink_to_fit();
existing_value_buf_.shrink_to_fit();
key_str_buf_.shrink_to_fit();
existing_value_str_buf_.shrink_to_fit();
new_value_buf_.shrink_to_fit();
value_changed_buf_.shrink_to_fit();
ikey_buf_.shrink_to_fit();
other_key_buf_.clear();
other_value_buf_.clear();
other_key_str_buf_.clear();
other_value_str_buf_.clear();
other_key_buf_.shrink_to_fit();
other_value_buf_.shrink_to_fit();
other_key_str_buf_.shrink_to_fit();
other_value_str_buf_.shrink_to_fit();
}
void CleanupMergedBuffer() {
combined_key_buf_.clear();
combined_value_buf_.clear();
combined_key_buf_.shrink_to_fit();
combined_value_buf_.shrink_to_fit();
}
};
namespace {
@ -2346,69 +2464,32 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
return 0;
}
Status DBImpl::DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
assert(compact);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
ColumnFamilyData* cfd = compact->compaction->column_family_data();
Log(options_.info_log,
"[CF %u] Compacting %d@%d + %d@%d files, score %.2f slots available %d",
cfd->GetID(), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->num_input_files(1),
compact->compaction->output_level(), compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[2345];
compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
assert(compact->compaction->input_version()->NumLevelFiles(
compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(!compact->outfile);
SequenceNumber visible_at_tip = 0;
SequenceNumber earliest_snapshot;
SequenceNumber latest_snapshot = 0;
snapshots_.getAll(compact->existing_snapshots);
if (compact->existing_snapshots.size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip = versions_->LastSequence();
earliest_snapshot = visible_at_tip;
} else {
latest_snapshot = compact->existing_snapshots.back();
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
compact->existing_snapshots.push_back(versions_->LastSequence());
earliest_snapshot = compact->existing_snapshots[0];
}
// Is this compaction producing files at the bottommost level?
bool bottommost_level = compact->compaction->BottomMostLevel();
// Allocate the output file numbers before we release the lock
AllocateCompactionOutputFileNumbers(compact);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
// flush log buffer immediately after releasing the mutex
log_buffer->FlushBufferToLog();
const uint64_t start_micros = env_->NowMicros();
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
input->SeekToFirst();
Status DBImpl::ProcessKeyValueCompaction(
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot,
DeletionState& deletion_state,
bool bottommost_level,
int64_t& imm_micros,
Iterator* input,
CompactionState* compact,
bool is_compaction_v2,
LogBuffer* log_buffer) {
size_t combined_idx = 0;
Status status;
std::string compaction_filter_value;
ParsedInternalKey ikey;
std::string current_user_key;
bool has_current_user_key = false;
std::vector<char> delete_key; // for compaction filter
SequenceNumber last_sequence_for_key __attribute__((unused)) =
kMaxSequenceNumber;
SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
std::string compaction_filter_value;
std::vector<char> delete_key; // for compaction filter
ColumnFamilyData* cfd = compact->compaction->column_family_data();
MergeHelper merge(
cfd->user_comparator(), cfd->options()->merge_operator.get(),
options_.info_log.get(), false /* internal key corruption is expected */);
options_.info_log.get(), cfd->options()->min_partial_merge_operands,
false /* internal key corruption is expected */);
auto compaction_filter = cfd->options()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (!compaction_filter) {
@ -2419,14 +2500,32 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compaction_filter = compaction_filter_from_factory.get();
}
while (input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) {
Slice key = input->key();
Slice value = input->value();
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
Slice key;
Slice value;
// If is_compaction_v2 is on, kv-pairs are reset to the prefix batch.
// This prefix batch should contain results after calling
// compaction_filter_v2.
//
// If is_compaction_v2 is off, this function will go through all the
// kv-pairs in input.
if (!is_compaction_v2) {
key = input->key();
value = input->value();
} else {
if (combined_idx >= compact->combined_key_buf_.size()) {
break;
}
assert(combined_idx < compact->combined_key_buf_.size());
key = compact->combined_key_buf_[combined_idx];
value = compact->combined_value_buf_[combined_idx];
++combined_idx;
}
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input.get());
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
@ -2452,9 +2551,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber;
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter &&
if (compaction_filter && !is_compaction_v2 &&
ikey.type == kTypeValue &&
(visible_at_tip || ikey.sequence > latest_snapshot)) {
// If the user has specified a compaction filter and the sequence
@ -2472,7 +2570,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
delete_key.assign(key.data(), key.data() + key.size());
// convert it to a delete
UpdateInternalKey(&delete_key[0], delete_key.size(),
ikey.sequence, kTypeDeletion);
ikey.sequence, kTypeDeletion);
// anchor the key again
key = Slice(&delete_key[0], delete_key.size());
// needed because ikey is backed by key
@ -2493,8 +2591,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
SequenceNumber visible = visible_at_tip ?
visible_at_tip :
findEarliestVisibleSnapshot(ikey.sequence,
compact->existing_snapshots,
&prev_snapshot);
compact->existing_snapshots,
&prev_snapshot);
if (visible_in_snapshot == visible) {
// If the earliest snapshot is which this key is visible in
@ -2506,8 +2604,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
drop = true; // (A)
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_NEWER_ENTRY);
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= earliest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
ikey.sequence <= earliest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
@ -2524,8 +2622,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// object to minimize change to the existing flow. Turn out this
// logic could also be nicely re-used for memtable flush purge
// optimization in BuildTable.
merge.MergeUntil(input.get(), prev_snapshot, bottommost_level,
options_.statistics.get());
int steps = 0;
merge.MergeUntil(input, prev_snapshot, bottommost_level,
options_.statistics.get(), &steps);
// Skip the Merge ops
combined_idx = combined_idx - 1 + steps;
current_entry_is_merging = true;
if (merge.IsSuccess()) {
// Successfully found Put/Delete/(end-of-key-range) while merging
@ -2627,7 +2729,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// Close output file if it is big enough
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input.get());
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
@ -2664,9 +2766,269 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
}
return status;
}
void DBImpl::CallCompactionFilterV2(CompactionState* compact,
CompactionFilterV2* compaction_filter_v2) {
if (compact == nullptr || compaction_filter_v2 == nullptr) {
return;
}
std::vector<Slice> user_key_buf;
for (const auto& key : compact->ikey_buf_) {
user_key_buf.emplace_back(key.user_key);
}
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter.
// If the return value of the compaction filter is true, replace
// the entry with a delete marker.
compact->to_delete_buf_ = compaction_filter_v2->Filter(
compact->compaction->level(),
user_key_buf, compact->existing_value_buf_,
&compact->new_value_buf_,
&compact->value_changed_buf_);
// new_value_buf_.size() <= to_delete__buf_.size(). "=" iff all
// kv-pairs in this compaction run needs to be deleted.
assert(compact->to_delete_buf_.size() ==
compact->key_buf_.size());
assert(compact->to_delete_buf_.size() ==
compact->existing_value_buf_.size());
assert(compact->to_delete_buf_.size() ==
compact->value_changed_buf_.size());
int new_value_idx = 0;
for (unsigned int i = 0; i < compact->to_delete_buf_.size(); ++i) {
if (compact->to_delete_buf_[i]) {
// update the string buffer directly
// the Slice buffer points to the updated buffer
UpdateInternalKey(&compact->key_str_buf_[i][0],
compact->key_str_buf_[i].size(),
compact->ikey_buf_[i].sequence,
kTypeDeletion);
// no value associated with delete
compact->existing_value_buf_[i].clear();
RecordTick(options_.statistics.get(), COMPACTION_KEY_DROP_USER);
} else if (compact->value_changed_buf_[i]) {
compact->existing_value_buf_[i] =
Slice(compact->new_value_buf_[new_value_idx++]);
}
} // for
}
Status DBImpl::DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
assert(compact);
compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer();
compact->cur_prefix_ = kNullString;
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
ColumnFamilyData* cfd = compact->compaction->column_family_data();
Log(options_.info_log,
"[CF %u] Compacting %d@%d + %d@%d files, score %.2f slots available %d",
cfd->GetID(), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->num_input_files(1),
compact->compaction->output_level(), compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[2345];
compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
assert(cfd->current()->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(!compact->outfile);
SequenceNumber visible_at_tip = 0;
SequenceNumber earliest_snapshot;
SequenceNumber latest_snapshot = 0;
snapshots_.getAll(compact->existing_snapshots);
if (compact->existing_snapshots.size() == 0) {
// optimize for fast path if there are no snapshots
visible_at_tip = versions_->LastSequence();
earliest_snapshot = visible_at_tip;
} else {
latest_snapshot = compact->existing_snapshots.back();
// Add the current seqno as the 'latest' virtual
// snapshot to the end of this list.
compact->existing_snapshots.push_back(versions_->LastSequence());
earliest_snapshot = compact->existing_snapshots[0];
}
// Is this compaction producing files at the bottommost level?
bool bottommost_level = compact->compaction->BottomMostLevel();
// Allocate the output file numbers before we release the lock
AllocateCompactionOutputFileNumbers(compact);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
const uint64_t start_micros = env_->NowMicros();
unique_ptr<Iterator> input(versions_->MakeInputIterator(compact->compaction));
input->SeekToFirst();
shared_ptr<Iterator> backup_input(
versions_->MakeInputIterator(compact->compaction));
backup_input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2
= nullptr;
auto context = compact->GetFilterContext();
compaction_filter_from_factory_v2 =
cfd->options()->compaction_filter_factory_v2->CreateCompactionFilterV2(
context);
auto compaction_filter_v2 =
compaction_filter_from_factory_v2.get();
// temp_backup_input always point to the start of the current buffer
// temp_backup_input = backup_input;
// iterate through input,
// 1) buffer ineligible keys and value keys into 2 separate buffers;
// 2) send value_buffer to compaction filter and alternate the values;
// 3) merge value_buffer with ineligible_value_buffer;
// 4) run the modified "compaction" using the old for loop.
if (compaction_filter_v2) {
while (backup_input->Valid() && !shutting_down_.Acquire_Load() &&
!cfd->IsDropped()) {
Slice key = backup_input->key();
Slice value = backup_input->value();
const SliceTransform* transformer =
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
std::string key_prefix = transformer->Transform(key).ToString();
if (compact->cur_prefix_ == kNullString) {
compact->cur_prefix_ = key_prefix;
}
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(options_.info_log, "Failed to parse key: %s",
key.ToString().c_str());
continue;
} else {
// If the prefix remains the same, keep buffering
if (key_prefix == compact->cur_prefix_) {
// Apply the compaction filter V2 to all the kv pairs sharing
// the same prefix
if (ikey.type == kTypeValue &&
(visible_at_tip || ikey.sequence > latest_snapshot)) {
// Buffer all keys sharing the same prefix for CompactionFilterV2
// Iterate through keys to check prefix
compact->BufferKeyValueSlices(key, value);
} else {
// buffer ineligible keys
compact->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
continue;
// finish changing values for eligible keys
} else {
// Now prefix changes, this batch is done.
// Call compaction filter on the buffered values to change the value
if (compact->key_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->cur_prefix_ = key_prefix;
}
}
// Merge this batch of data (values + ineligible keys)
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
log_buffer);
if (!status.ok()) {
break;
}
// After writing the kv-pairs, we can safely remove the reference
// to the string buffer and clean them up
compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer();
// Buffer the key that triggers the mismatch in prefix
if (ikey.type == kTypeValue &&
(visible_at_tip || ikey.sequence > latest_snapshot)) {
compact->BufferKeyValueSlices(key, value);
} else {
compact->BufferOtherKeyValueSlices(key, value);
}
backup_input->Next();
if (!backup_input->Valid()) {
// If this is the single last value, we need to merge it.
if (compact->key_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
log_buffer);
compact->CleanupBatchBuffer();
compact->CleanupMergedBuffer();
}
} // done processing all prefix batches
// finish the last batch
if (compact->key_buf_.size() > 0) {
CallCompactionFilterV2(compact, compaction_filter_v2);
}
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
log_buffer);
} // checking for compaction filter v2
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
false,
log_buffer);
}
if (status.ok() && (shutting_down_.Acquire_Load() || cfd->IsDropped())) {
status = Status::ShutdownInProgress(
"Database shutdown started during compaction");
"Database shutdown or Column family drop during compaction");
}
if (status.ok() && compact->builder != nullptr) {
status = FinishCompactionOutputFile(compact, input.get());

View File

@ -38,6 +38,7 @@ class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class CompactionFilterV2;
class DBImpl : public DB {
public:
@ -338,6 +339,24 @@ class DBImpl : public DB {
DeletionState& deletion_state,
LogBuffer* log_buffer);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot,
DeletionState& deletion_state,
bool bottommost_level,
int64_t& imm_micros,
Iterator* input,
CompactionState* compact,
bool is_compaction_v2,
LogBuffer* log_buffer);
// Call compaction_filter_v2->Filter() on kv-pairs in compact
void CallCompactionFilterV2(CompactionState* compact,
CompactionFilterV2* compaction_filter_v2);
Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact);

View File

@ -313,21 +313,6 @@ void DBIter::MergeValuesNewToOld() {
// when complete, add result to operands and continue.
const Slice& value = iter_->value();
operands.push_front(value.ToString());
while(operands.size() >= 2) {
// Call user associative-merge until it returns false
if (user_merge_operator_->PartialMerge(ikey.user_key,
Slice(operands[0]),
Slice(operands[1]),
&merge_result,
logger_)) {
operands.pop_front();
swap(operands.front(), merge_result);
} else {
// Associative merge returns false ==> stack the operands
break;
}
}
}
}

View File

@ -2582,7 +2582,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
: check_context_(check_context) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
const CompactionFilterContext& context) override {
if (check_context_) {
ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
@ -2599,7 +2599,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
class DeleteFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
const CompactionFilterContext& context) override {
if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
} else {
@ -2615,7 +2615,7 @@ class ChangeFilterFactory : public CompactionFilterFactory {
explicit ChangeFilterFactory() {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilter>(new ChangeFilter());
}
@ -3660,7 +3660,7 @@ TEST(DBTest, CompactionFilterWithValueChange) {
// verify that all keys now have the new value that
// was set by the compaction process.
for (int i = 0; i < 100000; i++) {
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
std::string newvalue = Get(1, key);
@ -3723,6 +3723,252 @@ TEST(DBTest, CompactionFilterContextManual) {
delete iter;
}
class KeepFilterV2 : public CompactionFilterV2 {
public:
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const override {
cfilter_count++;
std::vector<bool> ret;
new_values->clear();
values_changed->clear();
for (unsigned int i = 0; i < keys.size(); ++i) {
values_changed->push_back(false);
ret.push_back(false);
}
return ret;
}
virtual const char* Name() const override {
return "KeepFilterV2";
}
};
class DeleteFilterV2 : public CompactionFilterV2 {
public:
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const override {
cfilter_count++;
new_values->clear();
values_changed->clear();
std::vector<bool> ret;
for (unsigned int i = 0; i < keys.size(); ++i) {
values_changed->push_back(false);
ret.push_back(true);
}
return ret;
}
virtual const char* Name() const override {
return "DeleteFilterV2";
}
};
class ChangeFilterV2 : public CompactionFilterV2 {
public:
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const override {
std::vector<bool> ret;
new_values->clear();
values_changed->clear();
for (unsigned int i = 0; i < keys.size(); ++i) {
values_changed->push_back(true);
new_values->push_back(NEW_VALUE);
ret.push_back(false);
}
return ret;
}
virtual const char* Name() const override {
return "ChangeFilterV2";
}
};
class KeepFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit KeepFilterFactoryV2(const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(new KeepFilterV2());
}
virtual const char* Name() const override {
return "KeepFilterFactoryV2";
}
};
class DeleteFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit DeleteFilterFactoryV2(const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(new DeleteFilterV2());
}
virtual const char* Name() const override {
return "DeleteFilterFactoryV2";
}
};
class ChangeFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit ChangeFilterFactoryV2(const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(new ChangeFilterV2());
}
virtual const char* Name() const override {
return "ChangeFilterFactoryV2";
}
};
TEST(DBTest, CompactionFilterV2) {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
// extract prefix
auto prefix_extractor = NewFixedPrefixTransform(8);
options.compaction_filter_factory_v2
= std::make_shared<KeepFilterFactoryV2>(prefix_extractor);
// In a testing environment, we can only flush the application
// compaction filter buffer using universal compaction
option_config_ = kUniversalCompaction;
options.compaction_style = (rocksdb::CompactionStyle)1;
Reopen(&options);
// Write 100K keys, these are written to a few files in L0.
const std::string value(10, 'x');
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i , i);
Put(key, value);
}
dbfull()->TEST_FlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
// All the files are in the lowest level.
int count = 0;
int total = 0;
Iterator* iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
total++;
if (ikey.sequence != 0) {
count++;
}
iter->Next();
}
ASSERT_EQ(total, 100000);
// 1 snapshot only. Since we are using universal compacton,
// the sequence no is cleared for better compression
ASSERT_EQ(count, 1);
delete iter;
// create a new database with the compaction
// filter in such a way that it deletes all keys
options.compaction_filter_factory_v2 =
std::make_shared<DeleteFilterFactoryV2>(prefix_extractor);
options.create_if_missing = true;
DestroyAndReopen(&options);
// write all the keys once again.
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i, i);
Put(key, value);
}
dbfull()->TEST_FlushMemTable();
ASSERT_NE(NumTableFilesAtLevel(0), 0);
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
// Scan the entire database to ensure that nothing is left
iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
count = 0;
while (iter->Valid()) {
count++;
iter->Next();
}
ASSERT_EQ(count, 0);
delete iter;
}
TEST(DBTest, CompactionFilterV2WithValueChange) {
Options options = CurrentOptions();
options.num_levels = 3;
options.max_mem_compaction_level = 0;
auto prefix_extractor = NewFixedPrefixTransform(8);
options.compaction_filter_factory_v2 =
std::make_shared<ChangeFilterFactoryV2>(prefix_extractor);
// In a testing environment, we can only flush the application
// compaction filter buffer using universal compaction
option_config_ = kUniversalCompaction;
options.compaction_style = (rocksdb::CompactionStyle)1;
Reopen(&options);
// Write 100K+1 keys, these are written to a few files
// in L0. We do this so that the current snapshot points
// to the 100001 key.The compaction filter is not invoked
// on keys that are visible via a snapshot because we
// anyways cannot delete it.
const std::string value(10, 'x');
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i, i);
Put(key, value);
}
// push all files to lower levels
dbfull()->TEST_FlushMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
// verify that all keys now have the new value that
// was set by the compaction process.
for (int i = 0; i < 100001; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%010d", i, i);
std::string newvalue = Get(key);
ASSERT_EQ(newvalue.compare(NEW_VALUE), 0);
}
}
TEST(DBTest, SparseMerge) {
do {
Options options = CurrentOptions();

View File

@ -37,7 +37,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
kWriteBufferSize(options.write_buffer_size),
arena_(options.arena_block_size),
table_(options.memtable_factory->CreateMemTableRep(
comparator_, &arena_, options.prefix_extractor.get())),
comparator_, &arena_, options.prefix_extractor.get())),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
@ -353,17 +353,6 @@ static bool SaveValue(void* arg, const char* entry) {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
*(s->merge_in_progress) = true;
merge_context->PushOperand(v);
while (merge_context->GetNumOperands() >= 2) {
// Attempt to associative merge. (Returns true if successful)
if (merge_operator->PartialMerge(
s->key->user_key(), merge_context->GetOperand(0),
merge_context->GetOperand(1), &merge_result, s->logger)) {
merge_context->PushPartialMergeResult(merge_result);
} else {
// Stack them because user can't associative merge
break;
}
}
return true;
}
default:

View File

@ -25,12 +25,12 @@ public:
operand_list->clear();
}
}
// Replace the first two operands of merge_result, which are expected be the
// merge results of them.
// Replace all operands with merge_result, which are expected to be the
// merge result of them.
void PushPartialMergeResult(std::string& merge_result) {
assert (operand_list);
operand_list->pop_front();
swap(operand_list->front(), merge_result);
operand_list->clear();
operand_list->push_front(std::move(merge_result));
}
// Push a merge operand
void PushOperand(const Slice& operand_slice) {

View File

@ -21,7 +21,7 @@ namespace rocksdb {
// operands_ stores the list of merge operands encountered while merging.
// keys_[i] corresponds to operands_[i] for each i.
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool at_bottom, Statistics* stats) {
bool at_bottom, Statistics* stats, int* steps) {
// Get a copy of the internal key, before it's invalidated by iter->Next()
// Also maintain the list of merge operands seen.
keys_.clear();
@ -42,6 +42,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
bool hit_the_next_user_key = false;
ParsedInternalKey ikey;
std::string merge_result; // Temporary value for merge results
if (steps) {
++(*steps);
}
for (iter->Next(); iter->Valid(); iter->Next()) {
assert(operands_.size() >= 1); // Should be invariants!
assert(keys_.size() == operands_.size());
@ -91,6 +94,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// move iter to the next entry (before doing anything else)
iter->Next();
if (steps) {
++(*steps);
}
return;
}
@ -119,6 +125,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// move iter to the next entry
iter->Next();
if (steps) {
++(*steps);
}
return;
}
@ -129,28 +138,13 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
// => then continue because we haven't yet seen a Put/Delete.
assert(!operands_.empty()); // Should have at least one element in it
// keep queuing keys and operands until we either meet a put / delete
// request or later did a partial merge.
keys_.push_front(iter->key().ToString());
operands_.push_front(iter->value().ToString());
while (operands_.size() >= 2) {
// Returns false when the merge_operator can no longer process it
if (user_merge_operator_->PartialMerge(ikey.user_key,
Slice(operands_[0]),
Slice(operands_[1]),
&merge_result,
logger_)) {
// Merging of operands (associative merge) was successful.
// Replace these frontmost two operands with the merge result
keys_.pop_front();
operands_.pop_front();
swap(operands_.front(), merge_result);
} else {
// Merging of operands (associative merge) returned false.
// The user merge_operator does not know how to merge these operands.
// So we just stack them up until we find a Put/Delete or end of key.
break;
}
if (steps) {
++(*steps);
}
continue;
}
}
@ -192,6 +186,23 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
RecordTick(stats, NUMBER_MERGE_FAILURES);
// Do nothing if not success_. Leave keys() and operands() as they are.
}
} else {
// We haven't seen the beginning of the key nor a Put/Delete.
// Attempt to use the user's associative merge function to
// merge the stacked merge operands into a single operand.
if (operands_.size() >= 2 &&
operands_.size() >= min_partial_merge_operands_ &&
user_merge_operator_->PartialMergeMulti(
ikey.user_key,
std::deque<Slice>(operands_.begin(), operands_.end()),
&merge_result, logger_)) {
// Merging of operands (associative merge) was successful.
// Replace operands with the merge result
operands_.clear();
operands_.push_front(std::move(merge_result));
keys_.erase(keys_.begin(), keys_.end() - 1);
}
}
}

View File

@ -22,12 +22,13 @@ class Statistics;
class MergeHelper {
public:
MergeHelper(const Comparator* user_comparator,
const MergeOperator* user_merge_operator,
Logger* logger,
const MergeOperator* user_merge_operator, Logger* logger,
unsigned min_partial_merge_operands,
bool assert_valid_internal_key)
: user_comparator_(user_comparator),
user_merge_operator_(user_merge_operator),
logger_(logger),
min_partial_merge_operands_(min_partial_merge_operands),
assert_valid_internal_key_(assert_valid_internal_key),
keys_(),
operands_(),
@ -46,7 +47,8 @@ class MergeHelper {
// at_bottom: (IN) true if the iterator covers the bottem level, which means
// we could reach the start of the history of this user key.
void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0,
bool at_bottom = false, Statistics* stats = nullptr);
bool at_bottom = false, Statistics* stats = nullptr,
int* steps = nullptr);
// Query the merge result
// These are valid until the next MergeUntil call
@ -88,6 +90,7 @@ class MergeHelper {
const Comparator* user_comparator_;
const MergeOperator* user_merge_operator_;
Logger* logger_;
unsigned min_partial_merge_operands_;
bool assert_valid_internal_key_; // enforce no internal key corruption?
// the scratch area that holds the result of MergeUntil

View File

@ -11,6 +11,28 @@
namespace rocksdb {
// The default implementation of PartialMergeMulti, which invokes
// PartialMerge multiple times internally and merges two operands at
// a time.
bool MergeOperator::PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const {
// Simply loop through the operands
std::string temp_value;
Slice temp_slice;
for (const auto& operand : operand_list) {
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
return false;
}
swap(temp_value, *new_value);
temp_slice = Slice(*new_value);
}
// The result will be in *new_value. All merges succeeded.
return true;
}
// Given a "real" merge from the library, call the user's
// associative merge function one-by-one on each of the operands.
// NOTE: It is assumed that the client's merge-operator will handle any errors.
@ -46,7 +68,6 @@ bool AssociativeMergeOperator::PartialMerge(
const Slice& right_operand,
std::string* new_value,
Logger* logger) const {
return Merge(key, &left_operand, right_operand, new_value, logger);
}

View File

@ -24,10 +24,14 @@ using namespace rocksdb;
namespace {
int numMergeOperatorCalls;
void resetNumMergeOperatorCalls() {
numMergeOperatorCalls = 0;
}
int num_partial_merge_calls;
void resetNumPartialMergeCalls() {
num_partial_merge_calls = 0;
}
}
class CountMergeOperator : public AssociativeMergeOperator {
@ -42,6 +46,11 @@ class CountMergeOperator : public AssociativeMergeOperator {
std::string* new_value,
Logger* logger) const override {
++numMergeOperatorCalls;
if (existing_value == nullptr) {
new_value->assign(value.data(), value.size());
return true;
}
return mergeOperator_->PartialMerge(
key,
*existing_value,
@ -50,6 +59,14 @@ class CountMergeOperator : public AssociativeMergeOperator {
logger);
}
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const {
++num_partial_merge_calls;
return mergeOperator_->PartialMergeMulti(key, operand_list, new_value,
logger);
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
@ -58,16 +75,16 @@ class CountMergeOperator : public AssociativeMergeOperator {
std::shared_ptr<MergeOperator> mergeOperator_;
};
std::shared_ptr<DB> OpenDb(
const string& dbname,
const bool ttl = false,
const unsigned max_successive_merges = 0) {
std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false,
const size_t max_successive_merges = 0,
const uint32_t min_partial_merge_operands = 2) {
DB* db;
StackableDB* sdb;
Options options;
options.create_if_missing = true;
options.merge_operator = std::make_shared<CountMergeOperator>();
options.max_successive_merges = max_successive_merges;
options.min_partial_merge_operands = min_partial_merge_operands;
Status s;
DestroyDB(dbname, Options());
if (ttl) {
@ -306,6 +323,44 @@ void testSuccessiveMerge(
}
}
void testPartialMerge(Counters* counters, DB* db, int max_merge, int min_merge,
int count) {
FlushOptions o;
o.wait = true;
// Test case 1: partial merge should be called when the number of merge
// operands exceeds the threshold.
uint64_t tmp_sum = 0;
resetNumPartialMergeCalls();
for (int i = 1; i <= count; i++) {
counters->assert_add("b", i);
tmp_sum += i;
}
db->Flush(o);
db->CompactRange(nullptr, nullptr);
ASSERT_EQ(tmp_sum, counters->assert_get("b"));
if (count > max_merge) {
// in this case, FullMerge should be called instead.
ASSERT_EQ(num_partial_merge_calls, 0);
} else {
// if count >= min_merge, then partial merge should be called once.
ASSERT_EQ((count >= min_merge), (num_partial_merge_calls == 1));
}
// Test case 2: partial merge should not be called when a put is found.
resetNumPartialMergeCalls();
tmp_sum = 0;
db->Put(rocksdb::WriteOptions(), "c", "10");
for (int i = 1; i <= count; i++) {
counters->assert_add("c", i);
tmp_sum += i;
}
db->Flush(o);
db->CompactRange(nullptr, nullptr);
ASSERT_EQ(tmp_sum, counters->assert_get("c"));
ASSERT_EQ(num_partial_merge_calls, 0);
}
void testSingleBatchSuccessiveMerge(
DB* db,
int max_num_merges,
@ -370,20 +425,40 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) {
{
cout << "Test merge in memtable... \n";
unsigned maxMerge = 5;
auto db = OpenDb(dbname, use_ttl, maxMerge);
size_t max_merge = 5;
auto db = OpenDb(dbname, use_ttl, max_merge);
MergeBasedCounters counters(db, 0);
testCounters(counters, db.get(), compact);
testSuccessiveMerge(counters, maxMerge, maxMerge * 2);
testSuccessiveMerge(counters, max_merge, max_merge * 2);
testSingleBatchSuccessiveMerge(db.get(), 5, 7);
DestroyDB(dbname, Options());
}
{
cout << "Test Partial-Merge\n";
size_t max_merge = 100;
for (uint32_t min_merge = 5; min_merge < 25; min_merge += 5) {
for (uint32_t count = min_merge - 1; count <= min_merge + 1; count++) {
auto db = OpenDb(dbname, use_ttl, max_merge, min_merge);
MergeBasedCounters counters(db, 0);
testPartialMerge(&counters, db.get(), max_merge, min_merge, count);
DestroyDB(dbname, Options());
}
{
auto db = OpenDb(dbname, use_ttl, max_merge, min_merge);
MergeBasedCounters counters(db, 0);
testPartialMerge(&counters, db.get(), max_merge, min_merge,
min_merge * 10);
DestroyDB(dbname, Options());
}
}
}
}
int main(int argc, char *argv[]) {
//TODO: Make this test like a general rocksdb unit-test
runTest(argc, test::TmpDir() + "/merge_testdb");
runTest(argc, test::TmpDir() + "/merge_testdbttl", true); // Run test on TTL database
printf("Passed all tests!\n");
return 0;
}

View File

@ -424,18 +424,7 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
assert(s->state == kNotFound || s->state == kMerge);
s->state = kMerge;
merge_contex->PushOperand(v);
while (merge_contex->GetNumOperands() >= 2) {
// Attempt to merge operands together via user associateive merge
if (s->merge_operator->PartialMerge(
s->user_key, merge_contex->GetOperand(0),
merge_contex->GetOperand(1), &merge_result, s->logger)) {
merge_contex->PushPartialMergeResult(merge_result);
} else {
// Associative merge returns false ==> stack the operands
break;
}
}
return true;
return true;
default:
assert(false);

View File

@ -425,8 +425,8 @@ extern rocksdb_mergeoperator_t* rocksdb_mergeoperator_create(
char* (*partial_merge)(
void*,
const char* key, size_t key_length,
const char* left_operand, size_t left_operand_length,
const char* right_operand, size_t right_operand_length,
const char* const* operands_list, const size_t* operands_list_length,
int num_operands,
unsigned char* success, size_t* new_value_length),
void (*delete_value)(
void*,

View File

@ -10,26 +10,27 @@
#define STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_
#include <string>
#include <vector>
namespace rocksdb {
class Slice;
class SliceTransform;
// Context information of a compaction run
struct CompactionFilterContext {
// Does this compaction run include all data files
bool is_full_compaction;
// Is this compaction requested by the client (true),
// or is it occurring as an automatic compaction process
bool is_manual_compaction;
};
// CompactionFilter allows an application to modify/delete a key-value at
// the time of compaction.
class CompactionFilter {
public:
// Context information of a compaction run
struct Context {
// Does this compaction run include all data files
bool is_full_compaction;
// Is this compaction requested by the client (true),
// or is it occurring as an automatic compaction process
bool is_manual_compaction;
};
virtual ~CompactionFilter() {}
// The compaction process invokes this
@ -64,14 +65,47 @@ class CompactionFilter {
virtual const char* Name() const = 0;
};
// CompactionFilterV2 that buffers kv pairs sharing the same prefix and let
// application layer to make individual decisions for all the kv pairs in the
// buffer.
class CompactionFilterV2 {
public:
virtual ~CompactionFilterV2() {}
// The compaction process invokes this method for all the kv pairs
// sharing the same prefix. It is a "roll-up" version of CompactionFilter.
//
// Each entry in the return vector indicates if the corresponding kv should
// be preserved in the output of this compaction run. The application can
// inspect the exisitng values of the keys and make decision based on it.
//
// When a value is to be preserved, the application has the option
// to modify the entry in existing_values and pass it back through an entry
// in new_values. A corresponding values_changed entry needs to be set to
// true in this case. Note that the new_values vector contains only changed
// values, i.e. new_values.size() <= values_changed.size().
//
typedef std::vector<Slice> SliceVector;
virtual std::vector<bool> Filter(int level,
const SliceVector& keys,
const SliceVector& existing_values,
std::vector<std::string>* new_values,
std::vector<bool>* values_changed)
const = 0;
// Returns a name that identifies this compaction filter.
// The name will be printed to LOG file on start up for diagnosis.
virtual const char* Name() const = 0;
};
// Each compaction will create a new CompactionFilter allowing the
// application to know about different campactions
class CompactionFilterFactory {
public:
virtual ~CompactionFilterFactory() { };
virtual ~CompactionFilterFactory() { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) = 0;
const CompactionFilterContext& context) = 0;
// Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0;
@ -82,7 +116,7 @@ class CompactionFilterFactory {
class DefaultCompactionFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(const CompactionFilter::Context& context) override {
CreateCompactionFilter(const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilter>(nullptr);
}
@ -91,6 +125,65 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory {
}
};
// Each compaction will create a new CompactionFilterV2
//
// CompactionFilterFactoryV2 enables application to specify a prefix and use
// CompactionFilterV2 to filter kv-pairs in batches. Each batch contains all
// the kv-pairs sharing the same prefix.
//
// This is useful for applications that require grouping kv-pairs in
// compaction filter to make a purge/no-purge decision. For example, if the
// key prefix is user id and the rest of key represents the type of value.
// This batching filter will come in handy if the application's compaction
// filter requires knowledge of all types of values for any user id.
//
class CompactionFilterFactoryV2 {
public:
explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor)
: prefix_extractor_(prefix_extractor) { }
virtual ~CompactionFilterFactoryV2() { }
virtual std::unique_ptr<CompactionFilterV2> CreateCompactionFilterV2(
const CompactionFilterContext& context) = 0;
// Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0;
const SliceTransform* GetPrefixExtractor() const {
return prefix_extractor_;
}
void SetPrefixExtractor(const SliceTransform* prefix_extractor) {
prefix_extractor_ = prefix_extractor;
}
private:
// Prefix extractor for compaction filter v2
// Keys sharing the same prefix will be buffered internally.
// Client can implement a Filter callback function to operate on the buffer
const SliceTransform* prefix_extractor_;
};
// Default implementaion of CompactionFilterFactoryV2 which does not
// return any filter
class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 {
public:
explicit DefaultCompactionFilterFactoryV2(
const SliceTransform* prefix_extractor)
: CompactionFilterFactoryV2(prefix_extractor) { }
virtual std::unique_ptr<CompactionFilterV2>
CreateCompactionFilterV2(
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilterV2>(nullptr);
}
virtual const char* Name() const override {
return "DefaultCompactionFilterFactoryV2";
}
};
} // namespace rocksdb
#endif // STORAGE_ROCKSDB_INCLUDE_COMPACTION_FILTER_H_

View File

@ -214,7 +214,7 @@ class Env {
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
// Wait for all threads started by StartThread to terminate.
virtual void WaitForJoin() = 0;
virtual void WaitForJoin() {}
// Get thread pool queue length for specific thrad pool.
virtual unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const {

View File

@ -32,9 +32,9 @@ class Logger;
//
// b) MergeOperator - the generic class for all the more abstract / complex
// operations; one method (FullMerge) to merge a Put/Delete value with a
// merge operand; and another method (PartialMerge) that merges two
// operands together. this is especially useful if your key values have a
// complex structure but you would still like to support client-specific
// merge operand; and another method (PartialMerge) that merges multiple
// operands together. this is especially useful if your key values have
// complex structures but you would still like to support client-specific
// incremental updates.
//
// AssociativeMergeOperator is simpler to implement. MergeOperator is simply
@ -80,6 +80,13 @@ class MergeOperator {
// DB::Merge(key, *new_value) would yield the same result as a call
// to DB::Merge(key, left_op) followed by DB::Merge(key, right_op).
//
// The default implementation of PartialMergeMulti will use this function
// as a helper, for backward compatibility. Any successor class of
// MergeOperator should either implement PartialMerge or PartialMergeMulti,
// although implementing PartialMergeMulti is suggested as it is in general
// more effective to merge multiple operands at a time instead of two
// operands at a time.
//
// If it is impossible or infeasible to combine the two operations,
// leave new_value unchanged and return false. The library will
// internally keep track of the operations, and apply them in the
@ -89,12 +96,38 @@ class MergeOperator {
// and simply "return false". For now, the client should simply return
// false in any case it cannot perform partial-merge, regardless of reason.
// If there is corruption in the data, handle it in the FullMerge() function,
// and return false there.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const = 0;
// and return false there. The default implementation of PartialMerge will
// always return false.
virtual bool PartialMerge(const Slice& key, const Slice& left_operand,
const Slice& right_operand, std::string* new_value,
Logger* logger) const {
return false;
}
// This function performs merge when all the operands are themselves merge
// operation types that you would have passed to a DB::Merge() call in the
// same order (front() first)
// (i.e. DB::Merge(key, operand_list[0]), followed by
// DB::Merge(key, operand_list[1]), ...)
//
// PartialMergeMulti should combine them into a single merge operation that is
// saved into *new_value, and then it should return true. *new_value should
// be constructed such that a call to DB::Merge(key, *new_value) would yield
// the same result as subquential individual calls to DB::Merge(key, operand)
// for each operand in operand_list from front() to back().
//
// The PartialMergeMulti function will be called only when the list of
// operands are long enough. The minimum amount of operands that will be
// passed to the function are specified by the "min_partial_merge_operands"
// option.
//
// In the default implementation, PartialMergeMulti will invoke PartialMerge
// multiple times, where each time it only merges two operands. Developers
// should either implement PartialMergeMulti, or implement PartialMerge which
// is served as the helper function of the default PartialMergeMulti.
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is

View File

@ -22,6 +22,7 @@ namespace rocksdb {
class Cache;
class CompactionFilter;
class CompactionFilterFactory;
class CompactionFilterFactoryV2;
class Comparator;
class Env;
enum InfoLogLevel : unsigned char;
@ -124,6 +125,10 @@ struct ColumnFamilyOptions {
// Default: a factory that doesn't provide any object
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
// Version TWO of the compaction_filter_factory
// It supports rolling compaction
std::shared_ptr<CompactionFilterFactoryV2> compaction_filter_factory_v2;
// -------------------
// Parameters that affect performance
@ -499,6 +504,15 @@ struct ColumnFamilyOptions {
// Default: 0 (disabled)
size_t max_successive_merges;
// The number of partial merge operands to accumulate before partial
// merge will be performed. Partial merge will not be called
// if the list of values to merge is less than min_partial_merge_operands.
//
// If min_partial_merge_operands < 2, then it will be treated as 2.
//
// Default: 2
uint32_t min_partial_merge_operands;
// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options
@ -754,6 +768,7 @@ struct Options : public DBOptions, public ColumnFamilyOptions {
// The reason that this is a function that returns "this" instead of a
// constructor is to enable chaining of multiple similar calls in the future.
//
// All data will be in level 0 without any automatic compaction.
// It's recommended to manually call CompactRange(NULL, NULL) before reading
// from the database, because otherwise the read can be very slow.

View File

@ -61,6 +61,16 @@ struct BackupableDBOptions {
// Default: true
bool backup_log_files;
// Max bytes that can be transferred in a second during backup.
// If 0, go as fast as you can
// Default: 0
uint64_t backup_rate_limit;
// Max bytes that can be transferred in a second during restore.
// If 0, go as fast as you can
// Default: 0
uint64_t restore_rate_limit;
void Dump(Logger* logger) const;
explicit BackupableDBOptions(const std::string& _backup_dir,
@ -68,14 +78,18 @@ struct BackupableDBOptions {
bool _share_table_files = true,
Logger* _info_log = nullptr, bool _sync = true,
bool _destroy_old_data = false,
bool _backup_log_files = true)
bool _backup_log_files = true,
uint64_t _backup_rate_limit = 0,
uint64_t _restore_rate_limit = 0)
: backup_dir(_backup_dir),
backup_env(_backup_env),
share_table_files(_share_table_files),
info_log(_info_log),
sync(_sync),
destroy_old_data(_destroy_old_data),
backup_log_files(_backup_log_files) {}
backup_log_files(_backup_log_files),
backup_rate_limit(_backup_rate_limit),
restore_rate_limit(_restore_rate_limit) {}
};
struct RestoreOptions {

View File

@ -30,10 +30,11 @@ ColumnFamilyOptions::ColumnFamilyOptions()
: comparator(BytewiseComparator()),
merge_operator(nullptr),
compaction_filter(nullptr),
compaction_filter_factory(
std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())),
write_buffer_size(4<<20),
compaction_filter_factory(std::shared_ptr<CompactionFilterFactory>(
new DefaultCompactionFilterFactory())),
compaction_filter_factory_v2(
new DefaultCompactionFilterFactoryV2(NewFixedPrefixTransform(8))),
write_buffer_size(4 << 20),
max_write_buffer_number(2),
min_write_buffer_number_to_merge(1),
block_cache(nullptr),
@ -72,13 +73,14 @@ ColumnFamilyOptions::ColumnFamilyOptions()
max_sequential_skip_in_iterations(8),
memtable_factory(std::shared_ptr<SkipListFactory>(new SkipListFactory)),
table_factory(
std::shared_ptr<TableFactory>(new BlockBasedTableFactory())),
std::shared_ptr<TableFactory>(new BlockBasedTableFactory())),
inplace_update_support(false),
inplace_update_num_locks(10000),
inplace_callback(nullptr),
memtable_prefix_bloom_bits(0),
memtable_prefix_bloom_probes(6),
max_successive_merges(0) {
max_successive_merges(0),
min_partial_merge_operands(2) {
assert(memtable_factory.get() != nullptr);
}
@ -87,6 +89,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
merge_operator(options.merge_operator),
compaction_filter(options.compaction_filter),
compaction_filter_factory(options.compaction_filter_factory),
compaction_filter_factory_v2(options.compaction_filter_factory_v2),
write_buffer_size(options.write_buffer_size),
max_write_buffer_number(options.max_write_buffer_number),
min_write_buffer_number_to_merge(
@ -140,7 +143,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
inplace_callback(options.inplace_callback),
memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits),
memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes),
max_successive_merges(options.max_successive_merges) {
max_successive_merges(options.max_successive_merges),
min_partial_merge_operands(options.min_partial_merge_operands) {
assert(memtable_factory.get() != nullptr);
}
@ -292,6 +296,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
merge_operator ? merge_operator->Name() : "None");
Log(log, " Options.compaction_filter_factory: %s",
compaction_filter_factory->Name());
Log(log, " Options.compaction_filter_factory_v2: %s",
compaction_filter_factory_v2->Name());
Log(log, " Options.memtable_factory: %s", memtable_factory->Name());
Log(log, " Options.table_factory: %s", table_factory->Name());
Log(log, " Options.write_buffer_size: %zd", write_buffer_size);
@ -408,6 +414,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
inplace_update_support);
Log(log, " Options.inplace_update_num_locks: %zd",
inplace_update_num_locks);
Log(log, " Options.min_partial_merge_operands: %u",
min_partial_merge_operands);
// TODO: easier config for bloom (maybe based on avg key/value size)
Log(log, " Options.memtable_prefix_bloom_bits: %d",
memtable_prefix_bloom_bits);

View File

@ -26,17 +26,60 @@
namespace rocksdb {
namespace {
class RateLimiter {
public:
RateLimiter(Env* env, uint64_t max_bytes_per_second, uint64_t bytes_per_check)
: env_(env),
max_bytes_per_second_(max_bytes_per_second),
bytes_per_check_(bytes_per_check),
micros_start_time_(env->NowMicros()),
bytes_since_start_(0) {}
void ReportAndWait(uint64_t bytes_since_last_call) {
bytes_since_start_ += bytes_since_last_call;
if (bytes_since_start_ < bytes_per_check_) {
// not enough bytes to be rate-limited
return;
}
uint64_t now = env_->NowMicros();
uint64_t interval = now - micros_start_time_;
uint64_t should_take_micros =
(bytes_since_start_ * kMicrosInSecond) / max_bytes_per_second_;
if (should_take_micros > interval) {
env_->SleepForMicroseconds(should_take_micros - interval);
now = env_->NowMicros();
}
// reset interval
micros_start_time_ = now;
bytes_since_start_ = 0;
}
private:
Env* env_;
uint64_t max_bytes_per_second_;
uint64_t bytes_per_check_;
uint64_t micros_start_time_;
uint64_t bytes_since_start_;
static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};
} // namespace
void BackupableDBOptions::Dump(Logger* logger) const {
Log(logger, " Options.backup_dir: %s", backup_dir.c_str());
Log(logger, " Options.backup_env: %p", backup_env);
Log(logger, "Options.share_table_files: %d",
Log(logger, " Options.backup_dir: %s", backup_dir.c_str());
Log(logger, " Options.backup_env: %p", backup_env);
Log(logger, " Options.share_table_files: %d",
static_cast<int>(share_table_files));
Log(logger, " Options.info_log: %p", info_log);
Log(logger, " Options.sync: %d", static_cast<int>(sync));
Log(logger, " Options.destroy_old_data: %d",
Log(logger, " Options.info_log: %p", info_log);
Log(logger, " Options.sync: %d", static_cast<int>(sync));
Log(logger, " Options.destroy_old_data: %d",
static_cast<int>(destroy_old_data));
Log(logger, " Options.backup_log_files: %d",
Log(logger, " Options.backup_log_files: %d",
static_cast<int>(backup_log_files));
Log(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit);
Log(logger, "Options.restore_rate_limit: %" PRIu64, restore_rate_limit);
}
// -------- BackupEngineImpl class ---------
@ -170,6 +213,7 @@ class BackupEngineImpl : public BackupEngine {
Env* src_env,
Env* dst_env,
bool sync,
RateLimiter* rate_limiter,
uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0);
@ -179,6 +223,7 @@ class BackupEngineImpl : public BackupEngine {
bool shared,
const std::string& src_dir,
const std::string& src_fname, // starts with "/"
RateLimiter* rate_limiter,
uint64_t size_limit = 0);
Status CalculateChecksum(const std::string& src,
@ -209,7 +254,8 @@ class BackupEngineImpl : public BackupEngine {
unique_ptr<Directory> meta_directory_;
unique_ptr<Directory> private_directory_;
static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB
static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
size_t copy_file_buffer_size_;
};
BackupEngine* BackupEngine::NewBackupEngine(
@ -222,9 +268,8 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
: stop_backup_(false),
options_(options),
db_env_(db_env),
backup_env_(options.backup_env != nullptr ? options.backup_env
: db_env_) {
backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
copy_file_buffer_size_(kDefaultCopyFileBufferSize) {
options_.Dump(options_.info_log);
// create all the dirs we need
@ -350,6 +395,13 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
s = backup_env_->CreateDir(
GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)));
unique_ptr<RateLimiter> rate_limiter;
if (options_.backup_rate_limit > 0) {
copy_file_buffer_size_ = options_.backup_rate_limit / 10;
rate_limiter.reset(new RateLimiter(db_env_, options_.backup_rate_limit,
copy_file_buffer_size_));
}
// copy live_files
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
uint64_t number;
@ -371,6 +423,7 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
options_.share_table_files && type == kTableFile,
db->GetName(), /* src_dir */
live_files[i], /* src_fname */
rate_limiter.get(),
(type == kDescriptorFile) ? manifest_file_size : 0);
}
@ -383,7 +436,8 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) {
&new_backup,
false, /* not shared */
db->GetOptions().wal_dir,
live_wal_files[i]->PathName());
live_wal_files[i]->PathName(),
rate_limiter.get());
}
}
@ -527,6 +581,12 @@ Status BackupEngineImpl::RestoreDBFromBackup(
DeleteChildren(db_dir);
}
unique_ptr<RateLimiter> rate_limiter;
if (options_.restore_rate_limit > 0) {
copy_file_buffer_size_ = options_.restore_rate_limit / 10;
rate_limiter.reset(new RateLimiter(db_env_, options_.restore_rate_limit,
copy_file_buffer_size_));
}
Status s;
for (auto& file : backup.GetFiles()) {
std::string dst;
@ -551,7 +611,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(
Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
uint32_t checksum_value;
s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false,
nullptr /* size */, &checksum_value);
rate_limiter.get(), nullptr /* size */, &checksum_value);
if (!s.ok()) {
break;
}
@ -631,7 +691,8 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
Status BackupEngineImpl::CopyFile(const std::string& src,
const std::string& dst, Env* src_env,
Env* dst_env, bool sync, uint64_t* size,
Env* dst_env, bool sync,
RateLimiter* rate_limiter, uint64_t* size,
uint32_t* checksum_value,
uint64_t size_limit) {
Status s;
@ -684,6 +745,9 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
data.size());
}
s = dst_file->Append(data);
if (rate_limiter != nullptr) {
rate_limiter->ReportAndWait(data.size());
}
} while (s.ok() && data.size() > 0 && size_limit > 0);
if (s.ok() && sync) {
@ -697,6 +761,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
bool shared, const std::string& src_dir,
const std::string& src_fname,
RateLimiter* rate_limiter,
uint64_t size_limit) {
assert(src_fname.size() > 0 && src_fname[0] == '/');
@ -732,6 +797,7 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup,
db_env_,
backup_env_,
options_.sync,
rate_limiter,
&size,
&checksum_value,
size_limit);

View File

@ -299,13 +299,16 @@ class FileManager : public EnvWrapper {
}; // FileManager
// utility functions
static void FillDB(DB* db, int from, int to) {
static size_t FillDB(DB* db, int from, int to) {
size_t bytes_written = 0;
for (int i = from; i < to; ++i) {
std::string key = "testkey" + std::to_string(i);
std::string value = "testvalue" + std::to_string(i);
bytes_written += key.size() + value.size();
ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
}
return bytes_written;
}
static void AssertExists(DB* db, int from, int to) {
@ -800,8 +803,8 @@ TEST(BackupableDBTest, DeleteTmpFiles) {
}
TEST(BackupableDBTest, KeepLogFiles) {
// basically infinite
backupable_options_->backup_log_files = false;
// basically infinite
options_.WAL_ttl_seconds = 24 * 60 * 60;
OpenBackupableDB(true);
FillDB(db_.get(), 0, 100);
@ -820,6 +823,47 @@ TEST(BackupableDBTest, KeepLogFiles) {
AssertBackupConsistency(0, 0, 500, 600, true);
}
TEST(BackupableDBTest, RateLimiting) {
uint64_t const KB = 1024 * 1024;
size_t const kMicrosPerSec = 1000 * 1000LL;
std::vector<std::pair<uint64_t, uint64_t>> limits(
{{KB, 5 * KB}, {2 * KB, 3 * KB}});
for (const auto& limit : limits) {
// destroy old data
DestroyDB(dbname_, Options());
backupable_options_->backup_rate_limit = limit.first;
backupable_options_->restore_rate_limit = limit.second;
options_.compression = kNoCompression;
OpenBackupableDB(true);
size_t bytes_written = FillDB(db_.get(), 0, 100000);
auto start_backup = env_->NowMicros();
ASSERT_OK(db_->CreateNewBackup(false));
auto backup_time = env_->NowMicros() - start_backup;
auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) /
backupable_options_->backup_rate_limit;
ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time);
ASSERT_LT(backup_time, 1.3 * rate_limited_backup_time);
CloseBackupableDB();
OpenRestoreDB();
auto start_restore = env_->NowMicros();
ASSERT_OK(restore_db_->RestoreDBFromLatestBackup(dbname_, dbname_));
auto restore_time = env_->NowMicros() - start_restore;
CloseRestoreDB();
auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) /
backupable_options_->restore_rate_limit;
ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time);
ASSERT_LT(restore_time, 1.3 * rate_limited_restore_time);
AssertBackupConsistency(0, 0, 100000, 100010);
}
}
} // anon namespace
} // namespace rocksdb

View File

@ -1,3 +1,8 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <memory>
#include "rocksdb/slice.h"
#include "rocksdb/merge_operator.h"
@ -38,6 +43,15 @@ class PutOperator : public MergeOperator {
return true;
}
using MergeOperator::PartialMergeMulti;
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const
override {
new_value->assign(operand_list.back().data(), operand_list.back().size());
return true;
}
virtual const char* Name() const override {
return "PutOperator";
}

View File

@ -6,6 +6,7 @@
#include "stringappend2.h"
#include <memory>
#include <string>
#include <assert.h>
#include "rocksdb/slice.h"
@ -61,31 +62,39 @@ bool StringAppendTESTOperator::FullMerge(
return true;
}
bool StringAppendTESTOperator::PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const {
bool StringAppendTESTOperator::PartialMergeMulti(
const Slice& key, const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const {
return false;
}
// A version of PartialMerge that actually performs "partial merging".
// Use this to simulate the exact behaviour of the StringAppendOperator.
bool StringAppendTESTOperator::_AssocPartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const {
// Clear the *new_value for writing.
bool StringAppendTESTOperator::_AssocPartialMergeMulti(
const Slice& key, const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const {
// Clear the *new_value for writing
assert(new_value);
new_value->clear();
assert(operand_list.size() >= 2);
// Generic append
// Reserve correct size for *new_value, and apply concatenation.
new_value->reserve(left_operand.size() + 1 + right_operand.size());
new_value->assign(left_operand.data(), left_operand.size());
new_value->append(1,delim_);
new_value->append(right_operand.data(), right_operand.size());
// Determine and reserve correct size for *new_value.
size_t size = 0;
for (const auto& operand : operand_list) {
size += operand.size();
}
size += operand_list.size() - 1; // Delimiters
new_value->reserve(size);
// Apply concatenation
new_value->assign(operand_list.front().data(), operand_list.front().size());
for (std::deque<Slice>::const_iterator it = operand_list.begin() + 1;
it != operand_list.end(); ++it) {
new_value->append(1, delim_);
new_value->append(it->data(), it->size());
}
return true;
}

View File

@ -11,6 +11,9 @@
*/
#pragma once
#include <deque>
#include <string>
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
@ -18,8 +21,8 @@ namespace rocksdb {
class StringAppendTESTOperator : public MergeOperator {
public:
StringAppendTESTOperator(char delim_char); /// Constructor with delimiter
// Constructor with delimiter
explicit StringAppendTESTOperator(char delim_char);
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
@ -27,22 +30,19 @@ class StringAppendTESTOperator : public MergeOperator {
std::string* new_value,
Logger* logger) const override;
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const override;
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const
override;
virtual const char* Name() const override;
private:
// A version of PartialMerge that actually performs "partial merging".
// Use this to simulate the exact behaviour of the StringAppendOperator.
bool _AssocPartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const;
bool _AssocPartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const;
char delim_; // The delimiter is inserted between elements

View File

@ -3,6 +3,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <deque>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/compaction_filter.h"
@ -198,7 +201,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
user_comp_filter_factory_(comp_filter_factory) { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) {
const CompactionFilterContext& context) {
return std::unique_ptr<TtlCompactionFilter>(
new TtlCompactionFilter(
ttl_,
@ -277,24 +280,27 @@ class TtlMergeOperator : public MergeOperator {
}
}
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const override {
virtual bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const
override {
const uint32_t ts_len = DBWithTTL::kTSLength;
std::deque<Slice> operands_without_ts;
if (left_operand.size() < ts_len || right_operand.size() < ts_len) {
Log(logger, "Error: Could not remove timestamp from value.");
return false;
for (const auto& operand : operand_list) {
if (operand.size() < ts_len) {
Log(logger, "Error: Could not remove timestamp from value.");
return false;
}
operands_without_ts.push_back(
Slice(operand.data(), operand.size() - ts_len));
}
// Apply the user partial-merge operator (store result in *new_value)
assert(new_value);
Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len);
Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len);
if (!user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts,
new_value, logger)) {
if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
logger)) {
return false;
}

View File

@ -285,7 +285,7 @@ class TtlTest {
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(
const CompactionFilter::Context& context) override {
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilter>(
new TestFilter(kSampleSize_, kNewValue_));
}