Helper function to time Merges
Summary: Remove duplicate code. If this diff looks good, I will cleanup other call sites as well. Test Plan: unit tests Reviewers: rven, yhchiang, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D37761
This commit is contained in:
parent
a087f80e9d
commit
d6f39c5ae3
@ -16,6 +16,40 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
|
// TODO(agiardullo): Clean up merge callsites to use this func
|
||||||
|
Status MergeHelper::TimedFullMerge(const Slice& key, const Slice* value,
|
||||||
|
const std::deque<std::string>& operands,
|
||||||
|
const MergeOperator* merge_operator,
|
||||||
|
Statistics* statistics, Env* env,
|
||||||
|
Logger* logger, std::string* result) {
|
||||||
|
if (operands.size() == 0) {
|
||||||
|
result->assign(value->data(), value->size());
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (merge_operator == nullptr) {
|
||||||
|
return Status::NotSupported("Provide a merge_operator when opening DB");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup to time the merge
|
||||||
|
StopWatchNano timer(env, statistics != nullptr);
|
||||||
|
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
||||||
|
|
||||||
|
// Do the merge
|
||||||
|
bool success =
|
||||||
|
merge_operator->FullMerge(key, value, operands, result, logger);
|
||||||
|
|
||||||
|
RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME,
|
||||||
|
env != nullptr ? timer.ElapsedNanos() : 0);
|
||||||
|
|
||||||
|
if (!success) {
|
||||||
|
RecordTick(statistics, NUMBER_MERGE_FAILURES);
|
||||||
|
return Status::Corruption("Error: Could not perform merge.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
// PRE: iter points to the first merge type entry
|
// PRE: iter points to the first merge type entry
|
||||||
// POST: iter points to the first entry beyond the merge process (or the end)
|
// POST: iter points to the first entry beyond the merge process (or the end)
|
||||||
// keys_, operands_ are updated to reflect the merge result.
|
// keys_, operands_ are updated to reflect the merge result.
|
||||||
@ -81,25 +115,20 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||||||
// => store result in operands_.back() (and update keys_.back())
|
// => store result in operands_.back() (and update keys_.back())
|
||||||
// => change the entry type to kTypeValue for keys_.back()
|
// => change the entry type to kTypeValue for keys_.back()
|
||||||
// We are done! Return a success if the merge passes.
|
// We are done! Return a success if the merge passes.
|
||||||
{
|
|
||||||
StopWatchNano timer(env_, stats != nullptr);
|
Status s = TimedFullMerge(ikey.user_key, nullptr, operands_,
|
||||||
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
user_merge_operator_, stats, env_, logger_,
|
||||||
success_ = user_merge_operator_->FullMerge(
|
&merge_result);
|
||||||
ikey.user_key, nullptr, operands_, &merge_result, logger_);
|
|
||||||
RecordTick(stats, MERGE_OPERATION_TOTAL_TIME,
|
|
||||||
env_ != nullptr ? timer.ElapsedNanos() : 0);
|
|
||||||
}
|
|
||||||
// We store the result in keys_.back() and operands_.back()
|
// We store the result in keys_.back() and operands_.back()
|
||||||
// if nothing went wrong (i.e.: no operand corruption on disk)
|
// if nothing went wrong (i.e.: no operand corruption on disk)
|
||||||
if (success_) {
|
if (s.ok()) {
|
||||||
std::string& original_key =
|
std::string& original_key =
|
||||||
keys_.back(); // The original key encountered
|
keys_.back(); // The original key encountered
|
||||||
orig_ikey.type = kTypeValue;
|
orig_ikey.type = kTypeValue;
|
||||||
UpdateInternalKey(&original_key[0], original_key.size(),
|
UpdateInternalKey(&original_key[0], original_key.size(),
|
||||||
orig_ikey.sequence, orig_ikey.type);
|
orig_ikey.sequence, orig_ikey.type);
|
||||||
swap(operands_.back(), merge_result);
|
swap(operands_.back(), merge_result);
|
||||||
} else {
|
|
||||||
RecordTick(stats, NUMBER_MERGE_FAILURES);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// move iter to the next entry (before doing anything else)
|
// move iter to the next entry (before doing anything else)
|
||||||
@ -117,25 +146,19 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|||||||
// => change the entry type to kTypeValue for keys_.back()
|
// => change the entry type to kTypeValue for keys_.back()
|
||||||
// We are done! Success!
|
// We are done! Success!
|
||||||
const Slice val = iter->value();
|
const Slice val = iter->value();
|
||||||
{
|
Status s =
|
||||||
StopWatchNano timer(env_, stats != nullptr);
|
TimedFullMerge(ikey.user_key, &val, operands_, user_merge_operator_,
|
||||||
PERF_TIMER_GUARD(merge_operator_time_nanos);
|
stats, env_, logger_, &merge_result);
|
||||||
success_ = user_merge_operator_->FullMerge(
|
|
||||||
ikey.user_key, &val, operands_, &merge_result, logger_);
|
|
||||||
RecordTick(stats, MERGE_OPERATION_TOTAL_TIME,
|
|
||||||
env_ != nullptr ? timer.ElapsedNanos() : 0);
|
|
||||||
}
|
|
||||||
// We store the result in keys_.back() and operands_.back()
|
// We store the result in keys_.back() and operands_.back()
|
||||||
// if nothing went wrong (i.e.: no operand corruption on disk)
|
// if nothing went wrong (i.e.: no operand corruption on disk)
|
||||||
if (success_) {
|
if (s.ok()) {
|
||||||
std::string& original_key =
|
std::string& original_key =
|
||||||
keys_.back(); // The original key encountered
|
keys_.back(); // The original key encountered
|
||||||
orig_ikey.type = kTypeValue;
|
orig_ikey.type = kTypeValue;
|
||||||
UpdateInternalKey(&original_key[0], original_key.size(),
|
UpdateInternalKey(&original_key[0], original_key.size(),
|
||||||
orig_ikey.sequence, orig_ikey.type);
|
orig_ikey.sequence, orig_ikey.type);
|
||||||
swap(operands_.back(), merge_result);
|
swap(operands_.back(), merge_result);
|
||||||
} else {
|
|
||||||
RecordTick(stats, NUMBER_MERGE_FAILURES);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// move iter to the next entry
|
// move iter to the next entry
|
||||||
|
@ -35,6 +35,15 @@ class MergeHelper {
|
|||||||
operands_(),
|
operands_(),
|
||||||
success_(false) {}
|
success_(false) {}
|
||||||
|
|
||||||
|
// Wrapper around MergeOperator::FullMerge() that records perf statistics.
|
||||||
|
// Result of merge will be written to result if status returned is OK.
|
||||||
|
// If operands is empty, the value will simply be copied to result.
|
||||||
|
static Status TimedFullMerge(const Slice& key, const Slice* value,
|
||||||
|
const std::deque<std::string>& operands,
|
||||||
|
const MergeOperator* merge_operator,
|
||||||
|
Statistics* statistics, Env* env, Logger* logger,
|
||||||
|
std::string* result);
|
||||||
|
|
||||||
// Merge entries until we hit
|
// Merge entries until we hit
|
||||||
// - a corrupted key
|
// - a corrupted key
|
||||||
// - a Put/Delete,
|
// - a Put/Delete,
|
||||||
|
Loading…
Reference in New Issue
Block a user