Merge operator fixes part 1.
Summary: -Added null checks and revisions to DBIter::MergeValuesNewToOld() -Added DBIter test to stringappend_test -Major fix with Merge and TTL More plans for fixes later. Test Plan: -make clean; make stringappend_test -j 32; ./stringappend_test -make all check; Reviewers: haobo, emayanke, vamsi, dhruba Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D12315
This commit is contained in:
parent
1635ea06c2
commit
e1346968d8
2
Makefile
2
Makefile
@ -20,7 +20,7 @@ include build_config.mk
|
||||
|
||||
WARNING_FLAGS = -Wall -Werror
|
||||
CFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CCFLAGS) $(OPT)
|
||||
CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x
|
||||
CXXFLAGS += -g $(WARNING_FLAGS) -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) -std=gnu++0x -Woverloaded-virtual
|
||||
|
||||
LDFLAGS += $(PLATFORM_LDFLAGS)
|
||||
|
||||
|
@ -212,11 +212,8 @@ void DBIter::FindNextUserEntry(bool skipping) {
|
||||
SaveKey(ikey.user_key, &saved_key_);
|
||||
current_entry_is_merged_ = true;
|
||||
valid_ = true;
|
||||
// Go to a different state machine
|
||||
MergeValuesNewToOld();
|
||||
// TODO: what if !iter_->Valid()
|
||||
MergeValuesNewToOld(); // Go to a different state machine
|
||||
return;
|
||||
break;
|
||||
case kTypeLogData:
|
||||
assert(false);
|
||||
break;
|
||||
@ -235,7 +232,11 @@ void DBIter::FindNextUserEntry(bool skipping) {
|
||||
// POST: saved_value_ has the merged value for the user key
|
||||
// iter_ points to the next entry (or invalid)
|
||||
void DBIter::MergeValuesNewToOld() {
|
||||
// TODO: Is there a way to unite with MergeHelper or other similar code?
|
||||
if (!user_merge_operator_) {
|
||||
Log(logger_, "Options::merge_operator is null.");
|
||||
throw std::logic_error("DBIter::MergeValuesNewToOld() with"
|
||||
" Options::merge_operator null");
|
||||
}
|
||||
|
||||
// Start the merge process by pushing the first operand
|
||||
std::deque<std::string> operands;
|
||||
@ -266,8 +267,8 @@ void DBIter::MergeValuesNewToOld() {
|
||||
// final result in saved_value_. We are done!
|
||||
// ignore corruption if there is any.
|
||||
const Slice value = iter_->value();
|
||||
user_merge_operator_->Merge(ikey.user_key, &value, operands,
|
||||
&saved_value_, logger_.get());
|
||||
user_merge_operator_->FullMerge(ikey.user_key, &value, operands,
|
||||
&saved_value_, logger_.get());
|
||||
// iter_ is positioned after put
|
||||
iter_->Next();
|
||||
return;
|
||||
@ -300,8 +301,8 @@ void DBIter::MergeValuesNewToOld() {
|
||||
// a deletion marker.
|
||||
// feed null as the existing value to the merge operator, such that
|
||||
// client can differentiate this scenario and do things accordingly.
|
||||
user_merge_operator_->Merge(ikey.user_key, nullptr, operands,
|
||||
&saved_value_, logger_.get());
|
||||
user_merge_operator_->FullMerge(saved_key_, nullptr, operands,
|
||||
&saved_value_, logger_.get());
|
||||
}
|
||||
|
||||
void DBIter::Prev() {
|
||||
|
@ -167,8 +167,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
*s = Status::OK();
|
||||
if (merge_in_progress) {
|
||||
assert(merge_operator);
|
||||
if (!merge_operator->Merge(key.user_key(), &v, *operands,
|
||||
value, logger.get())) {
|
||||
if (!merge_operator->FullMerge(key.user_key(), &v, *operands,
|
||||
value, logger.get())) {
|
||||
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
|
||||
*s = Status::Corruption("Error: Could not perform merge.");
|
||||
}
|
||||
@ -181,8 +181,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
|
||||
if (merge_in_progress) {
|
||||
assert(merge_operator);
|
||||
*s = Status::OK();
|
||||
if (!merge_operator->Merge(key.user_key(), nullptr, *operands,
|
||||
value, logger.get())) {
|
||||
if (!merge_operator->FullMerge(key.user_key(), nullptr, *operands,
|
||||
value, logger.get())) {
|
||||
RecordTick(options.statistics, NUMBER_MERGE_FAILURES);
|
||||
*s = Status::Corruption("Error: Could not perform merge.");
|
||||
}
|
||||
|
@ -67,9 +67,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
// => store result in operands_.back() (and update keys_.back())
|
||||
// => change the entry type to kTypeValue for keys_.back()
|
||||
// We are done! Return a success if the merge passes.
|
||||
success_ = user_merge_operator_->Merge(ikey.user_key, nullptr,
|
||||
operands_, &merge_result,
|
||||
logger_);
|
||||
success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr,
|
||||
operands_, &merge_result,
|
||||
logger_);
|
||||
|
||||
// We store the result in keys_.back() and operands_.back()
|
||||
// if nothing went wrong (i.e.: no operand corruption on disk)
|
||||
@ -95,9 +95,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
// => change the entry type to kTypeValue for keys_.back()
|
||||
// We are done! Success!
|
||||
const Slice value = iter->value();
|
||||
success_ = user_merge_operator_->Merge(ikey.user_key, &value,
|
||||
operands_, &merge_result,
|
||||
logger_);
|
||||
success_ = user_merge_operator_->FullMerge(ikey.user_key, &value,
|
||||
operands_, &merge_result,
|
||||
logger_);
|
||||
|
||||
// We store the result in keys_.back() and operands_.back()
|
||||
// if nothing went wrong (i.e.: no operand corruption on disk)
|
||||
@ -170,9 +170,9 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before,
|
||||
assert(kTypeMerge == orig_ikey.type);
|
||||
assert(operands_.size() >= 1);
|
||||
assert(operands_.size() == keys_.size());
|
||||
success_ = user_merge_operator_->Merge(ikey.user_key, nullptr,
|
||||
operands_, &merge_result,
|
||||
logger_);
|
||||
success_ = user_merge_operator_->FullMerge(ikey.user_key, nullptr,
|
||||
operands_, &merge_result,
|
||||
logger_);
|
||||
|
||||
if (success_) {
|
||||
std::string& key = keys_.back(); // The original key encountered
|
||||
|
@ -12,7 +12,7 @@ namespace leveldb {
|
||||
// Given a "real" merge from the library, call the user's
|
||||
// associative merge function one-by-one on each of the operands.
|
||||
// NOTE: It is assumed that the client's merge-operator will handle any errors.
|
||||
bool AssociativeMergeOperator::Merge(
|
||||
bool AssociativeMergeOperator::FullMerge(
|
||||
const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_list,
|
||||
|
@ -286,8 +286,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
||||
} else if (kMerge == s->state) {
|
||||
assert(s->merge_operator != nullptr);
|
||||
s->state = kFound;
|
||||
if (!s->merge_operator->Merge(s->user_key, &v, *ops,
|
||||
s->value, s->logger)) {
|
||||
if (!s->merge_operator->FullMerge(s->user_key, &v, *ops,
|
||||
s->value, s->logger)) {
|
||||
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
||||
s->state = kCorrupt;
|
||||
}
|
||||
@ -301,8 +301,8 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
|
||||
s->state = kDeleted;
|
||||
} else if (kMerge == s->state) {
|
||||
s->state = kFound;
|
||||
if (!s->merge_operator->Merge(s->user_key, nullptr, *ops,
|
||||
s->value, s->logger)) {
|
||||
if (!s->merge_operator->FullMerge(s->user_key, nullptr, *ops,
|
||||
s->value, s->logger)) {
|
||||
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
|
||||
s->state = kCorrupt;
|
||||
}
|
||||
@ -521,8 +521,8 @@ void Version::Get(const ReadOptions& options,
|
||||
if (kMerge == saver.state) {
|
||||
// merge_operands are in saver and we hit the beginning of the key history
|
||||
// do a final merge of nullptr and operands;
|
||||
if (merge_operator->Merge(user_key, nullptr, *saver.merge_operands,
|
||||
value, logger.get())) {
|
||||
if (merge_operator->FullMerge(user_key, nullptr, *saver.merge_operands,
|
||||
value, logger.get())) {
|
||||
*status = Status::OK();
|
||||
} else {
|
||||
RecordTick(db_options.statistics, NUMBER_MERGE_FAILURES);
|
||||
|
@ -29,10 +29,11 @@ class Logger;
|
||||
// into rocksdb); numeric addition and string concatenation are examples;
|
||||
//
|
||||
// b) MergeOperator - the generic class for all the more abstract / complex
|
||||
// operations; one method to merge a Put/Delete value with a merge operand;
|
||||
// and another method (PartialMerge) that merges two operands together.
|
||||
// this is especially useful if your key values have a complex structure,
|
||||
// but you would still like to support client-specific incremental updates.
|
||||
// operations; one method (FullMerge) to merge a Put/Delete value with a
|
||||
// merge operand; and another method (PartialMerge) that merges two
|
||||
// operands together. this is especially useful if your key values have a
|
||||
// complex structure but you would still like to support client-specific
|
||||
// incremental updates.
|
||||
//
|
||||
// AssociativeMergeOperator is simpler to implement. MergeOperator is simply
|
||||
// more powerful.
|
||||
@ -60,11 +61,11 @@ class MergeOperator {
|
||||
// internal corruption. This will be treated as an error by the library.
|
||||
//
|
||||
// Also make use of the *logger for error messages.
|
||||
virtual bool Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_list,
|
||||
std::string* new_value,
|
||||
Logger* logger) const = 0;
|
||||
virtual bool FullMerge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_list,
|
||||
std::string* new_value,
|
||||
Logger* logger) const = 0;
|
||||
|
||||
// This function performs merge(left_op, right_op)
|
||||
// when both the operands are themselves merge operation types
|
||||
@ -85,7 +86,7 @@ class MergeOperator {
|
||||
// TODO: Presently there is no way to differentiate between error/corruption
|
||||
// and simply "return false". For now, the client should simply return
|
||||
// false in any case it cannot perform partial-merge, regardless of reason.
|
||||
// If there is corruption in the data, handle it in the above Merge() function,
|
||||
// If there is corruption in the data, handle it in the FullMerge() function,
|
||||
// and return false there.
|
||||
virtual bool PartialMerge(const Slice& key,
|
||||
const Slice& left_operand,
|
||||
@ -128,11 +129,11 @@ class AssociativeMergeOperator : public MergeOperator {
|
||||
|
||||
private:
|
||||
// Default implementations of the MergeOperator functions
|
||||
virtual bool Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_list,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override;
|
||||
virtual bool FullMerge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_list,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override;
|
||||
|
||||
virtual bool PartialMerge(const Slice& key,
|
||||
const Slice& left_operand,
|
||||
|
@ -60,7 +60,7 @@ class StackableDB : public DB {
|
||||
const Slice& key,
|
||||
std::string* value,
|
||||
bool* value_found = nullptr) override {
|
||||
return KeyMayExist(options, key, value, value_found);
|
||||
return sdb_->KeyMayExist(options, key, value, value_found);
|
||||
}
|
||||
|
||||
virtual Status Delete(const WriteOptions& wopts, const Slice& key) override {
|
||||
|
@ -17,11 +17,11 @@ namespace { // anonymous namespace
|
||||
// From the client-perspective, semantics are the same.
|
||||
class PutOperator : public MergeOperator {
|
||||
public:
|
||||
virtual bool Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_sequence,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override {
|
||||
virtual bool FullMerge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_sequence,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override {
|
||||
// Put basically only looks at the current/latest value
|
||||
assert(!operand_sequence.empty());
|
||||
assert(new_value != nullptr);
|
||||
|
@ -11,7 +11,6 @@ namespace leveldb {
|
||||
|
||||
class StringAppendOperator : public AssociativeMergeOperator {
|
||||
public:
|
||||
|
||||
StringAppendOperator(char delim_char); /// Constructor: specify delimiter
|
||||
|
||||
virtual bool Merge(const Slice& key,
|
||||
|
@ -20,11 +20,12 @@ StringAppendTESTOperator::StringAppendTESTOperator(char delim_char)
|
||||
}
|
||||
|
||||
// Implementation for the merge operation (concatenates two strings)
|
||||
bool StringAppendTESTOperator::Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operands,
|
||||
std::string* new_value,
|
||||
Logger* logger) const {
|
||||
bool StringAppendTESTOperator::FullMerge(
|
||||
const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operands,
|
||||
std::string* new_value,
|
||||
Logger* logger) const {
|
||||
|
||||
// Clear the *new_value for writing.
|
||||
assert(new_value);
|
||||
|
@ -20,11 +20,11 @@ class StringAppendTESTOperator : public MergeOperator {
|
||||
|
||||
StringAppendTESTOperator(char delim_char); /// Constructor with delimiter
|
||||
|
||||
virtual bool Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_sequence,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override;
|
||||
virtual bool FullMerge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operand_sequence,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override;
|
||||
|
||||
virtual bool PartialMerge(const Slice& key,
|
||||
const Slice& left_operand,
|
||||
|
@ -96,6 +96,101 @@ class StringLists {
|
||||
// THE TEST CASES BEGIN HERE
|
||||
|
||||
class StringAppendOperatorTest { };
|
||||
TEST(StringAppendOperatorTest, IteratorTest) {
|
||||
DestroyDB(kDbName, Options()); // Start this test with a fresh DB
|
||||
|
||||
StringAppendOperator append_op(',');
|
||||
auto db_ = OpenDb(&append_op);
|
||||
StringLists slists(db_);
|
||||
|
||||
slists.Append("k1","v1");
|
||||
slists.Append("k1","v2");
|
||||
slists.Append("k1","v3");
|
||||
|
||||
slists.Append("k2","a1");
|
||||
slists.Append("k2","a2");
|
||||
slists.Append("k2","a3");
|
||||
|
||||
std::string res;
|
||||
std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
std::string k1("k1");
|
||||
std::string k2("k2");
|
||||
bool first = true;
|
||||
for (it->Seek(k1); it->Valid(); it->Next()) {
|
||||
res = it->value().ToString();
|
||||
if (first) {
|
||||
ASSERT_EQ(res, "v1,v2,v3");
|
||||
first = false;
|
||||
} else {
|
||||
ASSERT_EQ(res, "a1,a2,a3");
|
||||
}
|
||||
}
|
||||
slists.Append("k2", "a4");
|
||||
slists.Append("k1", "v4");
|
||||
|
||||
// Snapshot should still be the same. Should ignore a4 and v4.
|
||||
first = true;
|
||||
for (it->Seek(k1); it->Valid(); it->Next()) {
|
||||
res = it->value().ToString();
|
||||
if (first) {
|
||||
ASSERT_EQ(res, "v1,v2,v3");
|
||||
first = false;
|
||||
} else {
|
||||
ASSERT_EQ(res, "a1,a2,a3");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Should release the snapshot and be aware of the new stuff now
|
||||
it.reset(db_->NewIterator(ReadOptions()));
|
||||
first = true;
|
||||
for (it->Seek(k1); it->Valid(); it->Next()) {
|
||||
res = it->value().ToString();
|
||||
if (first) {
|
||||
ASSERT_EQ(res, "v1,v2,v3,v4");
|
||||
first = false;
|
||||
} else {
|
||||
ASSERT_EQ(res, "a1,a2,a3,a4");
|
||||
}
|
||||
}
|
||||
|
||||
// start from k2 this time.
|
||||
for (it->Seek(k2); it->Valid(); it->Next()) {
|
||||
res = it->value().ToString();
|
||||
if (first) {
|
||||
ASSERT_EQ(res, "v1,v2,v3,v4");
|
||||
first = false;
|
||||
} else {
|
||||
ASSERT_EQ(res, "a1,a2,a3,a4");
|
||||
}
|
||||
}
|
||||
|
||||
slists.Append("k3","g1");
|
||||
|
||||
it.reset(db_->NewIterator(ReadOptions()));
|
||||
first = true;
|
||||
std::string k3("k3");
|
||||
for(it->Seek(k2); it->Valid(); it->Next()) {
|
||||
res = it->value().ToString();
|
||||
if (first) {
|
||||
ASSERT_EQ(res, "a1,a2,a3,a4");
|
||||
first = false;
|
||||
} else {
|
||||
ASSERT_EQ(res, "g1");
|
||||
}
|
||||
}
|
||||
for(it->Seek(k3); it->Valid(); it->Next()) {
|
||||
res = it->value().ToString();
|
||||
if (first) {
|
||||
// should not be hit
|
||||
ASSERT_EQ(res, "a1,a2,a3,a4");
|
||||
first = false;
|
||||
} else {
|
||||
ASSERT_EQ(res, "g1");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TEST(StringAppendOperatorTest,SimpleTest) {
|
||||
DestroyDB(kDbName, Options()); // Start this test with a fresh DB
|
||||
|
@ -154,7 +154,7 @@ std::vector<Status> DBWithTTL::MultiGet(const ReadOptions& options,
|
||||
supported with TTL"));
|
||||
}
|
||||
|
||||
bool DBWithTTL::KeyMayExist(ReadOptions& options,
|
||||
bool DBWithTTL::KeyMayExist(const ReadOptions& options,
|
||||
const Slice& key,
|
||||
std::string* value,
|
||||
bool* value_found) {
|
||||
|
@ -36,10 +36,10 @@ class DBWithTTL : public StackableDB {
|
||||
const std::vector<Slice>& keys,
|
||||
std::vector<std::string>* values);
|
||||
|
||||
virtual bool KeyMayExist(ReadOptions& options,
|
||||
virtual bool KeyMayExist(const ReadOptions& options,
|
||||
const Slice& key,
|
||||
std::string* value,
|
||||
bool* value_found = nullptr);
|
||||
bool* value_found = nullptr) override;
|
||||
|
||||
virtual Status Delete(const WriteOptions& wopts, const Slice& key);
|
||||
|
||||
@ -259,11 +259,11 @@ class TtlMergeOperator : public MergeOperator {
|
||||
assert(merge_op);
|
||||
}
|
||||
|
||||
virtual bool Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operands,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override {
|
||||
virtual bool FullMerge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const std::deque<std::string>& operands,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override {
|
||||
const uint32_t ts_len = DBWithTTL::kTSLength;
|
||||
if (existing_value && existing_value->size() < ts_len) {
|
||||
Log(logger, "Error: Could not remove timestamp from existing value.");
|
||||
@ -281,14 +281,20 @@ class TtlMergeOperator : public MergeOperator {
|
||||
}
|
||||
|
||||
// Apply the user merge operator (store result in *new_value)
|
||||
bool good = true;
|
||||
if (existing_value) {
|
||||
Slice existing_value_without_ts(existing_value->data(),
|
||||
existing_value->size() - ts_len);
|
||||
user_merge_op_->Merge(key, &existing_value_without_ts,
|
||||
operands_without_ts, new_value, logger);
|
||||
good = user_merge_op_->FullMerge(key, &existing_value_without_ts,
|
||||
operands_without_ts, new_value, logger);
|
||||
} else {
|
||||
user_merge_op_->Merge(key, nullptr, operands_without_ts, new_value,
|
||||
logger);
|
||||
good = user_merge_op_->FullMerge(key, nullptr, operands_without_ts,
|
||||
new_value, logger);
|
||||
}
|
||||
|
||||
// Return false if the user merge operator returned false
|
||||
if (!good) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Augment the *new_value with the ttl time-stamp
|
||||
@ -321,8 +327,10 @@ class TtlMergeOperator : public MergeOperator {
|
||||
assert(new_value);
|
||||
Slice left_without_ts(left_operand.data(), left_operand.size() - ts_len);
|
||||
Slice right_without_ts(right_operand.data(), right_operand.size() - ts_len);
|
||||
user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts,
|
||||
new_value, logger);
|
||||
if (!user_merge_op_->PartialMerge(key, left_without_ts, right_without_ts,
|
||||
new_value, logger)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Augment the *new_value with the ttl time-stamp
|
||||
int32_t curtime;
|
||||
|
Loading…
Reference in New Issue
Block a user