05e8854085
Summary: This diff introduces a new Merge operation into rocksdb. The purpose of this review is mostly getting feedback from the team (everyone please) on the design. Please focus on the four files under include/leveldb/, as they spell the client visible interface change. include/leveldb/db.h include/leveldb/merge_operator.h include/leveldb/options.h include/leveldb/write_batch.h Please go over local/my_test.cc carefully, as it is a concerete use case. Please also review the impelmentation files to see if the straw man implementation makes sense. Note that, the diff does pass all make check and truly supports forward iterator over db and a version of Get that's based on iterator. Future work: - Integration with compaction - A raw Get implementation I am working on a wiki that explains the design and implementation choices, but coding comes just naturally and I think it might be a good idea to share the code earlier. The code is heavily commented. Test Plan: run all local tests Reviewers: dhruba, heyongqiang Reviewed By: dhruba CC: leveldb, zshao, sheki, emayanke, MarkCallaghan Differential Revision: https://reviews.facebook.net/D9651
115 lines
4.0 KiB
C++
115 lines
4.0 KiB
C++
#include "merge_helper.h"
|
|
#include "db/dbformat.h"
|
|
#include "leveldb/comparator.h"
|
|
#include "leveldb/db.h"
|
|
#include "leveldb/merge_operator.h"
|
|
#include <string>
|
|
#include <stdio.h>
|
|
|
|
namespace leveldb {
|
|
|
|
// PRE: iter points to the first merge type entry
|
|
// POST: iter points to the first entry beyond the merge process (or the end)
|
|
// key_, value_ are updated to reflect the merge result
|
|
void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
|
bool at_bottom) {
|
|
// get a copy of the internal key, before it's invalidated by iter->Next()
|
|
key_.assign(iter->key().data(), iter->key().size());
|
|
// we need to parse the internal key again as the parsed key is
|
|
// backed by the internal key!
|
|
ParsedInternalKey orig_ikey;
|
|
// Assume no internal key corruption as it has been successfully parsed
|
|
// by the caller.
|
|
// TODO: determine a good alternative of assert (exception?)
|
|
ParseInternalKey(key_, &orig_ikey);
|
|
std::string operand(iter->value().data(), iter->value().size());
|
|
|
|
bool hit_the_next_user_key = false;
|
|
ParsedInternalKey ikey;
|
|
for (iter->Next(); iter->Valid(); iter->Next()) {
|
|
if (!ParseInternalKey(iter->key(), &ikey)) {
|
|
// stop at corrupted key
|
|
if (assert_valid_internal_key_) {
|
|
assert(!"corrupted internal key is not expected");
|
|
}
|
|
break;
|
|
}
|
|
|
|
if (user_comparator_->Compare(ikey.user_key, orig_ikey.user_key) != 0) {
|
|
// hit a different user key, stop right here
|
|
hit_the_next_user_key = true;
|
|
break;
|
|
}
|
|
|
|
if (stop_before && ikey.sequence <= stop_before) {
|
|
// hit an entry that's visible by the previous snapshot, can't touch that
|
|
break;
|
|
}
|
|
|
|
if (kTypeDeletion == ikey.type) {
|
|
// hit a delete
|
|
// => merge nullptr with operand
|
|
// => change the entry type to kTypeValue
|
|
// We are done!
|
|
user_merge_operator_->Merge(ikey.user_key, nullptr, operand,
|
|
&value_, logger_);
|
|
orig_ikey.type = kTypeValue;
|
|
UpdateInternalKey(&key_[0], key_.size(),
|
|
orig_ikey.sequence, orig_ikey.type);
|
|
// move iter to the next entry
|
|
iter->Next();
|
|
return;
|
|
}
|
|
|
|
if (kTypeValue == ikey.type) {
|
|
// hit a put
|
|
// => merge the put value with operand
|
|
// => change the entry type to kTypeValue
|
|
// We are done!
|
|
const Slice value = iter->value();
|
|
user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand),
|
|
&value_, logger_);
|
|
orig_ikey.type = kTypeValue;
|
|
UpdateInternalKey(&key_[0], key_.size(),
|
|
orig_ikey.sequence, orig_ikey.type);
|
|
// move iter to the next entry
|
|
iter->Next();
|
|
return;
|
|
}
|
|
|
|
if (kTypeMerge == ikey.type) {
|
|
// hit a merge
|
|
// => merge the value with operand.
|
|
// => put the result back to operand and continue
|
|
const Slice value = iter->value();
|
|
user_merge_operator_->Merge(ikey.user_key, &value, operand,
|
|
&value_, logger_);
|
|
swap(value_, operand);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// We have seen the root history of this key if we are at the
|
|
// bottem level and exhausted all internal keys of this user key
|
|
// NOTE: !iter->Valid() does not necessarily mean we hit the
|
|
// beginning of a user key, as versions of a user key might be
|
|
// split into multiple files and some files might not be included
|
|
// in the merge.
|
|
bool seen_the_beginning = hit_the_next_user_key && at_bottom;
|
|
|
|
if (seen_the_beginning) {
|
|
// do a final merge with nullptr as the existing value and say
|
|
// bye to the merge type (it's now converted to a Put)
|
|
assert(kTypeMerge == orig_ikey.type);
|
|
user_merge_operator_->Merge(orig_ikey.user_key, nullptr, operand,
|
|
&value_, logger_);
|
|
orig_ikey.type = kTypeValue;
|
|
UpdateInternalKey(&key_[0], key_.size(),
|
|
orig_ikey.sequence, orig_ikey.type);
|
|
} else {
|
|
swap(value_, operand);
|
|
}
|
|
}
|
|
|
|
} // namespace leveldb
|