diff --git a/Makefile b/Makefile index cc536b6f1..1d9f02b0e 100644 --- a/Makefile +++ b/Makefile @@ -60,8 +60,8 @@ TESTS = \ reduce_levels_test \ write_batch_test \ auto_roll_logger_test \ - filelock_test - + filelock_test \ + merge_test TOOLS = \ sst_dump \ @@ -225,6 +225,9 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +merge_test: db/merge_test.o $(LIBOBJECTS) + $(CXX) db/merge_test.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) + $(MEMENVLIBRARY) : $(MEMENVOBJECTS) rm -f $@ $(AR) -rs $@ $(MEMENVOBJECTS) diff --git a/db/builder.cc b/db/builder.cc index 450fcf051..98e43195c 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -6,6 +6,7 @@ #include "db/filename.h" #include "db/dbformat.h" +#include "db/merge_helper.h" #include "db/table_cache.h" #include "db/version_edit.h" #include "leveldb/db.h" @@ -49,36 +50,75 @@ Status BuildTable(const std::string& dbname, Slice key = iter->key(); meta->smallest.DecodeFrom(key); + MergeHelper merge(user_comparator, options.merge_operator, + options.info_log.get(), + true /* internal key corruption is not ok */); + if (purge) { + ParsedInternalKey ikey; + // Ugly walkaround to avoid compiler error for release build + // TODO: find a clean way to treat in memory key corruption + ikey.type = kTypeValue; ParsedInternalKey prev_ikey; std::string prev_value; std::string prev_key; - // store first key-value - prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); - ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(prev_ikey.sequence >= earliest_seqno_in_memtable); + // Ugly walkaround to avoid compiler error for release build + // TODO: find a clean way to treat in memory key corruption + auto ok __attribute__((unused)) = ParseInternalKey(key, &ikey); + // in-memory key corruption is not ok; + assert(ok); - for (iter->Next(); iter->Valid(); iter->Next()) { + if (ikey.type == kTypeMerge) { + // merge values if the first entry is of merge type + merge.MergeUntil(iter, 0 /* don't worry about snapshot */); + prev_key.assign(merge.key().data(), merge.key().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + prev_value.assign(merge.value().data(), merge.value().size()); + } else { + // store first key-value + prev_key.assign(key.data(), key.size()); + prev_value.assign(iter->value().data(), iter->value().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + assert(prev_ikey.sequence >= earliest_seqno_in_memtable); + iter->Next(); + } + + while (iter->Valid()) { + bool iterator_at_next = false; ParsedInternalKey this_ikey; Slice key = iter->key(); - ParseInternalKey(key, &this_ikey); + ok = ParseInternalKey(key, &this_ikey); + assert(ok); assert(this_ikey.sequence >= earliest_seqno_in_memtable); if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) { // This key is different from previous key. // Output prev key and remember current key builder->Add(Slice(prev_key), Slice(prev_value)); - prev_key.assign(key.data(), key.size()); - prev_value.assign(iter->value().data(), iter->value().size()); - ParseInternalKey(Slice(prev_key), &prev_ikey); + if (this_ikey.type == kTypeMerge) { + merge.MergeUntil(iter, 0 /* don't worry about snapshot */); + iterator_at_next = true; + prev_key.assign(merge.key().data(), merge.key().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + prev_value.assign(merge.value().data(), merge.value().size()); + } else { + prev_key.assign(key.data(), key.size()); + prev_value.assign(iter->value().data(), iter->value().size()); + ok = ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(ok); + } } else { // seqno within the same key are in decreasing order assert(this_ikey.sequence < prev_ikey.sequence); // This key is an earlier version of the same key in prev_key. // Skip current key. } + + if (!iterator_at_next) iter->Next(); } // output last key builder->Add(Slice(prev_key), Slice(prev_value)); diff --git a/db/db_impl.cc b/db/db_impl.cc index 54f6d34dd..051e9806a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -21,12 +21,14 @@ #include "db/log_writer.h" #include "db/memtable.h" #include "db/memtablelist.h" +#include "db/merge_helper.h" #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" #include "db/transaction_log_iterator_impl.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/merge_operator.h" #include "leveldb/statistics.h" #include "leveldb/status.h" #include "leveldb/table_builder.h" @@ -497,6 +499,7 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table, if (!env_->FileExists(CurrentFileName(dbname_))) { if (options_.create_if_missing) { + // TODO: add merge_operator name check s = NewDB(); if (!s.ok()) { return s; @@ -1514,11 +1517,13 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { // Employ a sequential search because the total number of // snapshots are typically small. inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( - SequenceNumber in, std::vector& snapshots) { + SequenceNumber in, std::vector& snapshots, + SequenceNumber* prev_snapshot) { SequenceNumber prev __attribute__((unused)) = 0; for (const auto cur : snapshots) { assert(prev <= cur); if (cur >= in) { + *prev_snapshot = prev; return cur; } prev = cur; // assignment @@ -1591,6 +1596,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber; std::string compaction_filter_value; + MergeHelper merge(user_comparator(), options_.merge_operator, + options_.info_log.get(), + false /* internal key corruption is expected */); for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { @@ -1617,8 +1625,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Handle key/value, add to state, etc. bool drop = false; + bool current_entry_is_merged = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys + // TODO: error key stays in db forever? Figure out the intention/rationale + // v10 error v8 : we cannot hide v8 even though it's pretty obvious. current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; @@ -1637,15 +1648,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If there are no snapshots, then this kv affect visibility at tip. // Otherwise, search though all existing snapshots to find // the earlist snapshot that is affected by this kv. - SequenceNumber visible = visible_at_tip ? visible_at_tip : - findEarliestVisibleSnapshot(ikey.sequence, - compact->existing_snapshots); + SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot + SequenceNumber visible = visible_at_tip ? + visible_at_tip : + findEarliestVisibleSnapshot(ikey.sequence, + compact->existing_snapshots, + &prev_snapshot); if (visible_in_snapshot == visible) { // If the earliest snapshot is which this key is visible in // is the same as the visibily of a previous instance of the // same key, then this kv is not visible in any snapshot. // Hidden by an newer entry for same user key + // TODO: why not > ? assert(last_sequence_for_key >= ikey.sequence); drop = true; // (A) RecordTick(options_.statistics, COMPACTION_KEY_DROP_NEWER_ENTRY); @@ -1661,6 +1676,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Therefore this deletion marker is obsolete and can be dropped. drop = true; RecordTick(options_.statistics, COMPACTION_KEY_DROP_OBSOLETE); + } else if (ikey.type == kTypeMerge) { + // We know the merge type entry is not hidden, otherwise we would + // have hit (A) + // We encapsulate the merge related state machine in a different + // object to minimize change to the existing flow. Turn out this + // logic could also be nicely re-used for memtable flush purge + // optimization in BuildTable. + merge.MergeUntil(input.get(), prev_snapshot, bottommost_level); + current_entry_is_merged = true; + // get the merge result + key = merge.key(); + ParseInternalKey(key, &ikey); + value = merge.value(); } else if (options_.CompactionFilter != nullptr && ikey.type != kTypeDeletion && ikey.sequence < earliest_snapshot) { @@ -1709,7 +1737,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot) { + if (bottommost_level && ikey.sequence < earliest_snapshot && + ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems // with the priority queue that is managing the input key iterator @@ -1744,7 +1773,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } } - input->Next(); + // MergeUntil has moved input to the next entry + if (!current_entry_is_merged) { + input->Next(); + } } if (status.ok() && shutting_down_.Acquire_Load()) { @@ -1906,14 +1938,17 @@ Status DBImpl::Get(const ReadOptions& options, mutex_.Unlock(); bool have_stat_update = false; Version::GetStats stats; + // First look in the memtable, then in the immutable memtable (if any). + // s is both in/out. When in, s could either be OK or MergeInProgress. + // value will contain the current merge operand in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s)) { + if (mem->Get(lkey, value, &s, options_)) { // Done - } else if (imm.Get(lkey, value, &s)) { + } else if (imm.Get(lkey, value, &s, options_)) { // Done } else { - s = current->Get(options, lkey, value, &stats); + current->Get(options, lkey, value, &s, &stats, options_); have_stat_update = true; } mutex_.Lock(); @@ -1934,7 +1969,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); return NewDBIterator( - &dbname_, env_, user_comparator(), internal_iter, + &dbname_, env_, options_, user_comparator(), internal_iter, (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); @@ -1955,6 +1990,15 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } +Status DBImpl::Merge(const WriteOptions& o, const Slice& key, + const Slice& val) { + if (!options_.merge_operator) { + return Status::NotSupported("Provide a merge_operator when opening DB"); + } else { + return DB::Merge(o, key, val); + } +} + Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -2382,6 +2426,13 @@ Status DB::Delete(const WriteOptions& opt, const Slice& key) { return Write(opt, &batch); } +Status DB::Merge(const WriteOptions& opt, const Slice& key, + const Slice& value) { + WriteBatch batch; + batch.Merge(key, value); + return Write(opt, &batch); +} + DB::~DB() { } Status DB::Open(const Options& options, const std::string& dbname, diff --git a/db/db_impl.h b/db/db_impl.h index 2be00c9d8..2c241ba6b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -36,6 +36,8 @@ class DBImpl : public DB { // Implementations of the DB interface virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value); + virtual Status Merge(const WriteOptions&, const Slice& key, + const Slice& value); virtual Status Delete(const WriteOptions&, const Slice& key); virtual Status Write(const WriteOptions& options, WriteBatch* updates); virtual Status Get(const ReadOptions& options, @@ -339,9 +341,12 @@ class DBImpl : public DB { // dump the delayed_writes_ to the log file and reset counter. void DelayLoggingAndReset(); - // find the earliest snapshot where seqno is visible - inline SequenceNumber findEarliestVisibleSnapshot(SequenceNumber in, - std::vector& snapshots); + // Return the earliest snapshot where seqno is visible. + // Store the snapshot right before that, if any, in prev_snapshot + inline SequenceNumber findEarliestVisibleSnapshot( + SequenceNumber in, + std::vector& snapshots, + SequenceNumber* prev_snapshot); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 2ec52970b..d4170d30f 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -54,10 +54,10 @@ Status DBImplReadOnly::Get(const ReadOptions& options, Version* current = versions_->current(); SequenceNumber snapshot = versions_->LastSequence(); LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s)) { + if (mem->Get(lkey, value, &s, options_)) { } else { Version::GetStats stats; - s = current->Get(options, lkey, value, &stats); + current->Get(options, lkey, value, &s, &stats, options_); } return s; } @@ -66,7 +66,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot); return NewDBIterator( - &dbname_, env_, user_comparator(), internal_iter, + &dbname_, env_, options_, user_comparator(),internal_iter, (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 403b35452..317d290d0 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -37,6 +37,10 @@ public: virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) { return Status::NotSupported("Not supported operation in read only mode."); } + virtual Status Merge(const WriteOptions&, const Slice& key, + const Slice& value) { + return Status::NotSupported("Not supported operation in read only mode."); + } virtual Status Delete(const WriteOptions&, const Slice& key) { return Status::NotSupported("Not supported operation in read only mode."); } diff --git a/db/db_iter.cc b/db/db_iter.cc index 87dca2ded..a06bde51b 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -2,12 +2,15 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include "db/db_iter.h" #include "db/filename.h" #include "db/dbformat.h" #include "leveldb/env.h" +#include "leveldb/options.h" #include "leveldb/iterator.h" +#include "leveldb/merge_operator.h" #include "port/port.h" #include "util/logging.h" #include "util/mutexlock.h" @@ -36,6 +39,7 @@ namespace { // numbers, deletion markers, overwrites, etc. class DBIter: public Iterator { public: + // The following is grossly complicated. TODO: clean it up // Which direction is the iterator currently moving? // (1) When moving forward, the internal iterator is positioned at // the exact entry that yields this->key(), this->value() @@ -46,15 +50,18 @@ class DBIter: public Iterator { kReverse }; - DBIter(const std::string* dbname, Env* env, + DBIter(const std::string* dbname, Env* env, const Options& options, const Comparator* cmp, Iterator* iter, SequenceNumber s) : dbname_(dbname), env_(env), + logger_(options.info_log), user_comparator_(cmp), + user_merge_operator_(options.merge_operator), iter_(iter), sequence_(s), direction_(kForward), - valid_(false) { + valid_(false), + current_entry_is_merged_(false) { } virtual ~DBIter() { delete iter_; @@ -62,11 +69,12 @@ class DBIter: public Iterator { virtual bool Valid() const { return valid_; } virtual Slice key() const { assert(valid_); - return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_; + return saved_key_; } virtual Slice value() const { assert(valid_); - return (direction_ == kForward) ? iter_->value() : saved_value_; + return (direction_ == kForward && !current_entry_is_merged_) ? + iter_->value() : saved_value_; } virtual Status status() const { if (status_.ok()) { @@ -83,9 +91,10 @@ class DBIter: public Iterator { virtual void SeekToLast(); private: - void FindNextUserEntry(bool skipping, std::string* skip); + void FindNextUserEntry(bool skipping); void FindPrevUserEntry(); bool ParseKey(ParsedInternalKey* key); + void MergeValuesNewToOld(); inline void SaveKey(const Slice& k, std::string* dst) { dst->assign(k.data(), k.size()); @@ -102,15 +111,19 @@ class DBIter: public Iterator { const std::string* const dbname_; Env* const env_; + shared_ptr logger_; const Comparator* const user_comparator_; + const MergeOperator* const user_merge_operator_; Iterator* const iter_; SequenceNumber const sequence_; Status status_; std::string saved_key_; // == current key when direction_==kReverse std::string saved_value_; // == current raw value when direction_==kReverse + std::string skip_key_; Direction direction_; bool valid_; + bool current_entry_is_merged_; // No copying allowed DBIter(const DBIter&); @@ -120,6 +133,8 @@ class DBIter: public Iterator { inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { if (!ParseInternalKey(iter_->key(), ikey)) { status_ = Status::Corruption("corrupted internal key in DBIter"); + Log(logger_, "corrupted internal key in DBIter: %s", + iter_->key().ToString(true).c_str()); return false; } else { return true; @@ -146,47 +161,136 @@ void DBIter::Next() { } } - // Temporarily use saved_key_ as storage for key to skip. - std::string* skip = &saved_key_; - SaveKey(ExtractUserKey(iter_->key()), skip); - FindNextUserEntry(true, skip); + // If the current value is merged, we might already hit end of iter_ + if (!iter_->Valid()) { + valid_ = false; + return; + } + FindNextUserEntry(true /* skipping the current user key */); } -void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { + +// PRE: saved_key_ has the current user key if skipping +// POST: saved_key_ should have the next user key if valid_, +// if the current entry is a result of merge +// current_entry_is_merged_ => true +// saved_value_ => the merged value +// +// NOTE: In between, saved_key_ can point to a user key that has +// a delete marker +void DBIter::FindNextUserEntry(bool skipping) { // Loop until we hit an acceptable entry to yield assert(iter_->Valid()); assert(direction_ == kForward); + current_entry_is_merged_ = false; do { ParsedInternalKey ikey; if (ParseKey(&ikey) && ikey.sequence <= sequence_) { - switch (ikey.type) { - case kTypeDeletion: - // Arrange to skip all upcoming entries for this key since - // they are hidden by this deletion. - SaveKey(ikey.user_key, skip); - skipping = true; - break; - case kTypeValue: - if (skipping && - user_comparator_->Compare(ikey.user_key, *skip) <= 0) { - // Entry hidden - } else { + if (skipping && + user_comparator_->Compare(ikey.user_key, saved_key_) <= 0) { + // skip this entry + } else { + skipping = false; + switch (ikey.type) { + case kTypeDeletion: + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + SaveKey(ikey.user_key, &saved_key_); + skipping = true; + break; + case kTypeValue: valid_ = true; - saved_key_.clear(); + SaveKey(ikey.user_key, &saved_key_); return; - } - break; + case kTypeMerge: + // By now, we are sure the current ikey is going to yield a value + SaveKey(ikey.user_key, &saved_key_); + current_entry_is_merged_ = true; + valid_ = true; + // Go to a different state machine + MergeValuesNewToOld(); + // TODO: what if !iter_->Valid() + return; + break; + } } } iter_->Next(); } while (iter_->Valid()); - saved_key_.clear(); valid_ = false; } +// Merge values of the same user key starting from the current iter_ position +// Scan from the newer entries to older entries. +// PRE: iter_->key() points to the first merge type entry +// saved_key_ stores the user key +// POST: saved_value_ has the merged value for the user key +// iter_ points to the next entry (or invalid) +void DBIter::MergeValuesNewToOld() { + + const Slice value = iter_->value(); + std::string operand(value.data(), value.size()); + + ParsedInternalKey ikey; + for (iter_->Next(); iter_->Valid(); iter_->Next()) { + if (!ParseKey(&ikey)) { + // skip corrupted key + continue; + } + + if (user_comparator_->Compare(ikey.user_key, saved_key_) != 0) { + // hit the next user key, stop right here + break; + } + + if (kTypeDeletion == ikey.type) { + // hit a delete with the same user key, stop right here + // iter_ is positioned after delete + iter_->Next(); + break; + } + + if (kTypeValue == ikey.type) { + // hit a put, merge the put value with operand and store it in the + // final result saved_value_. We are done! + const Slice value = iter_->value(); + user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), + &saved_value_, logger_.get()); + // iter_ is positioned after put + iter_->Next(); + return; + } + + if (kTypeMerge == ikey.type) { + // hit a merge, merge the value with operand and continue. + // saved_value_ is used as a scratch area. The result is put + // back in operand + const Slice value = iter_->value(); + user_merge_operator_->Merge(ikey.user_key, &value, operand, + &saved_value_, logger_.get()); + swap(saved_value_, operand); + } + } + + // we either exhausted all internal keys under this user key, or hit + // a deletion marker. + // feed null as the existing value to the merge opexrator, such that + // client can differentiate this scenario and do things accordingly. + user_merge_operator_->Merge(ikey.user_key, nullptr, operand, + &saved_value_, logger_.get()); +} + void DBIter::Prev() { assert(valid_); + // TODO: support backward iteration + // Throw an exception now if merge_operator is provided + if (user_merge_operator_) { + Log(logger_, "Prev not supported yet if merge_operator is provided"); + throw std::logic_error("DBIter::Prev backward iteration not supported" + " if merge_operator is provided"); + } + if (direction_ == kForward) { // Switch directions? // iter_ is pointing at the current entry. Scan backwards until // the key changes so we can use the normal reverse scanning code. @@ -261,7 +365,7 @@ void DBIter::Seek(const Slice& target) { &saved_key_, ParsedInternalKey(target, sequence_, kValueTypeForSeek)); iter_->Seek(saved_key_); if (iter_->Valid()) { - FindNextUserEntry(false, &saved_key_ /* temporary storage */); + FindNextUserEntry(false /*not skipping */); } else { valid_ = false; } @@ -272,13 +376,21 @@ void DBIter::SeekToFirst() { ClearSavedValue(); iter_->SeekToFirst(); if (iter_->Valid()) { - FindNextUserEntry(false, &saved_key_ /* temporary storage */); + FindNextUserEntry(false /* not skipping */); } else { valid_ = false; } } void DBIter::SeekToLast() { + // TODO: support backward iteration + // throw an exception for now if merge_operator is provided + if (user_merge_operator_) { + Log(logger_, "SeekToLast not supported yet if merge_operator is provided"); + throw std::logic_error("DBIter::SeekToLast: backward iteration not" + " supported if merge_operator is provided"); + } + direction_ = kReverse; ClearSavedValue(); iter_->SeekToLast(); @@ -290,10 +402,12 @@ void DBIter::SeekToLast() { Iterator* NewDBIterator( const std::string* dbname, Env* env, - const Comparator* user_key_comparator, + const Options& options, + const Comparator *user_key_comparator, Iterator* internal_iter, const SequenceNumber& sequence) { - return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence); + return new DBIter(dbname, env, options, user_key_comparator, + internal_iter, sequence); } } // namespace leveldb diff --git a/db/db_iter.h b/db/db_iter.h index d9e1b174a..a8d849bb9 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -17,7 +17,8 @@ namespace leveldb { extern Iterator* NewDBIterator( const std::string* dbname, Env* env, - const Comparator* user_key_comparator, + const Options& options, + const Comparator *user_key_comparator, Iterator* internal_iter, const SequenceNumber& sequence); diff --git a/db/db_test.cc b/db/db_test.cc index b1ebd9004..6c042f05d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,6 +20,7 @@ #include "util/testharness.h" #include "util/testutil.h" #include "util/storage_options.h" +#include "utilities/merge_operators.h" namespace leveldb { @@ -209,6 +210,7 @@ class DBTest { // Sequence of option configurations to try enum OptionConfig { kDefault, + kMergePut, kFilter, kUncompressed, kNumLevel_3, @@ -220,6 +222,8 @@ class DBTest { }; int option_config_; + std::shared_ptr merge_operator_; + public: std::string dbname_; SpecialEnv* env_; @@ -228,6 +232,7 @@ class DBTest { Options last_options_; DBTest() : option_config_(kDefault), + merge_operator_(MergeOperators::CreatePutOperator()), env_(new SpecialEnv(Env::Default())) { filter_policy_ = NewBloomFilterPolicy(10); dbname_ = test::TmpDir() + "/db_test"; @@ -259,6 +264,9 @@ class DBTest { Options CurrentOptions() { Options options; switch (option_config_) { + case kMergePut: + options.merge_operator = merge_operator_.get(); + break; case kFilter: options.filter_policy = filter_policy_; break; @@ -326,8 +334,12 @@ class DBTest { return DB::Open(opts, dbname_, &db_); } - Status Put(const std::string& k, const std::string& v) { - return db_->Put(WriteOptions(), k, v); + Status Put(const Slice& k, const Slice& v) { + if (kMergePut == option_config_ ) { + return db_->Merge(WriteOptions(), k, v); + } else { + return db_->Put(WriteOptions(), k, v); + } } Status Delete(const std::string& k) { @@ -400,6 +412,10 @@ class DBTest { case kTypeValue: result += iter->value().ToString(); break; + case kTypeMerge: + // keep it the same as kTypeValue for testing kMergePut + result += iter->value().ToString(); + break; case kTypeDeletion: result += "DEL"; break; @@ -935,8 +951,11 @@ TEST(DBTest, IterMultiWithDelete) { Iterator* iter = db_->NewIterator(ReadOptions()); iter->Seek("c"); ASSERT_EQ(IterStatus(iter), "c->vc"); - iter->Prev(); - ASSERT_EQ(IterStatus(iter), "a->va"); + if (!CurrentOptions().merge_operator) { + // TODO: merge operator does not support backward iteration yet + iter->Prev(); + ASSERT_EQ(IterStatus(iter), "a->va"); + } delete iter; } while (ChangeOptions()); } @@ -2822,7 +2841,7 @@ static void MTThreadBody(void* arg) { // We add some padding for force compactions. snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d", key, id, static_cast(counter)); - ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf))); + ASSERT_OK(t->state->test->Put(Slice(keybuf), Slice(valbuf))); } else { // Read a value and verify that it matches the pattern written above. Status s = db->Get(ReadOptions(), Slice(keybuf), &value); @@ -2895,6 +2914,9 @@ class ModelDB: public DB { virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) { return DB::Put(o, k, v); } + virtual Status Merge(const WriteOptions& o, const Slice& k, const Slice& v) { + return DB::Merge(o, k, v); + } virtual Status Delete(const WriteOptions& o, const Slice& key) { return DB::Delete(o, key); } @@ -2930,6 +2952,10 @@ class ModelDB: public DB { virtual void Put(const Slice& key, const Slice& value) { (*map_)[key.ToString()] = value.ToString(); } + virtual void Merge(const Slice& key, const Slice& value) { + // ignore merge for now + //(*map_)[key.ToString()] = value.ToString(); + } virtual void Delete(const Slice& key) { map_->erase(key.ToString()); } diff --git a/db/dbformat.h b/db/dbformat.h index fb8525b0b..09af082e9 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -24,7 +24,8 @@ class InternalKey; // data structures. enum ValueType { kTypeDeletion = 0x0, - kTypeValue = 0x1 + kTypeValue = 0x1, + kTypeMerge = 0x2 }; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular @@ -32,7 +33,7 @@ enum ValueType { // and the value type is embedded as the low 8 bits in the sequence // number in internal keys, we need to use the highest-numbered // ValueType, not the lowest). -static const ValueType kValueTypeForSeek = kTypeValue; +static const ValueType kValueTypeForSeek = kTypeMerge; // We leave eight bits empty at the bottom so a type and sequence# // can be packed together into 64-bits. @@ -154,7 +155,7 @@ inline bool ParseInternalKey(const Slice& internal_key, result->sequence = num >> 8; result->type = static_cast(c); result->user_key = Slice(internal_key.data(), n - 8); - return (c <= static_cast(kTypeValue)); + return (c <= static_cast(kValueTypeForSeek)); } // Update the sequence number in the internal key diff --git a/db/memtable.cc b/db/memtable.cc index efe383fe7..097b60ff5 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -7,6 +7,7 @@ #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/iterator.h" +#include "leveldb/merge_operator.h" #include "util/coding.h" namespace leveldb { @@ -116,11 +117,23 @@ void MemTable::Add(SequenceNumber s, ValueType type, } } -bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, + const Options& options) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); - if (iter.Valid()) { + + bool merge_in_progress = false; + std::string operand; + if (s->IsMergeInProgress()) { + swap(*value, operand); + merge_in_progress = true; + } + + + auto merge_operator = options.merge_operator; + auto logger = options.info_log; + for (; iter.Valid(); iter.Next()) { // entry format is: // klength varint32 // userkey char[klength-8] @@ -141,15 +154,44 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - value->assign(v.data(), v.size()); + if (merge_in_progress) { + merge_operator->Merge(key.user_key(), &v, operand, + value, logger.get()); + } else { + value->assign(v.data(), v.size()); + } return true; } - case kTypeDeletion: - *s = Status::NotFound(Slice()); + case kTypeMerge: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + if (merge_in_progress) { + merge_operator->Merge(key.user_key(), &v, operand, + value, logger.get()); + swap(*value, operand); + } else { + assert(merge_operator); + merge_in_progress = true; + operand.assign(v.data(), v.size()); + } + break; + } + case kTypeDeletion: { + if (merge_in_progress) { + merge_operator->Merge(key.user_key(), nullptr, operand, + value, logger.get()); + } else { + *s = Status::NotFound(Slice()); + } return true; + } } } } + + if (merge_in_progress) { + swap(*value, operand); + *s = Status::MergeInProgress(""); + } return false; } diff --git a/db/memtable.h b/db/memtable.h index 61aa29205..8fb9ce943 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -62,8 +62,13 @@ class MemTable { // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. + // If memtable contains Merge operation as the most recent entry for a key, + // and the merge process does not stop (not reaching a value or delete), + // store the current merged result in value and MergeInProgress in s. + // return false // Else, return false. - bool Get(const LookupKey& key, std::string* value, Status* s); + bool Get(const LookupKey& key, std::string* value, Status* s, + const Options& options); // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } diff --git a/db/memtablelist.cc b/db/memtablelist.cc index c1ccda1a5..c12995726 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -174,10 +174,11 @@ size_t MemTableList::ApproximateMemoryUsage() { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. -bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s) { +bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, + const Options& options ) { for (list::iterator it = memlist_.begin(); it != memlist_.end(); ++it) { - if ((*it)->Get(key, value, s)) { + if ((*it)->Get(key, value, s, options)) { return true; } } diff --git a/db/memtablelist.h b/db/memtablelist.h index 9ab91a67a..de27150ef 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -71,7 +71,8 @@ class MemTableList { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. - bool Get(const LookupKey& key, std::string* value, Status* s); + bool Get(const LookupKey& key, std::string* value, Status* s, + const Options& options); // Returns the list of underlying memtables. void GetMemTables(std::vector* list); diff --git a/db/merge_helper.cc b/db/merge_helper.cc new file mode 100644 index 000000000..3520db15a --- /dev/null +++ b/db/merge_helper.cc @@ -0,0 +1,114 @@ +#include "merge_helper.h" +#include "db/dbformat.h" +#include "leveldb/comparator.h" +#include "leveldb/db.h" +#include "leveldb/merge_operator.h" +#include +#include + +namespace leveldb { + +// PRE: iter points to the first merge type entry +// POST: iter points to the first entry beyond the merge process (or the end) +// key_, value_ are updated to reflect the merge result +void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, + bool at_bottom) { + // get a copy of the internal key, before it's invalidated by iter->Next() + key_.assign(iter->key().data(), iter->key().size()); + // we need to parse the internal key again as the parsed key is + // backed by the internal key! + ParsedInternalKey orig_ikey; + // Assume no internal key corruption as it has been successfully parsed + // by the caller. + // TODO: determine a good alternative of assert (exception?) + ParseInternalKey(key_, &orig_ikey); + std::string operand(iter->value().data(), iter->value().size()); + + bool hit_the_next_user_key = false; + ParsedInternalKey ikey; + for (iter->Next(); iter->Valid(); iter->Next()) { + if (!ParseInternalKey(iter->key(), &ikey)) { + // stop at corrupted key + if (assert_valid_internal_key_) { + assert(!"corrupted internal key is not expected"); + } + break; + } + + if (user_comparator_->Compare(ikey.user_key, orig_ikey.user_key) != 0) { + // hit a different user key, stop right here + hit_the_next_user_key = true; + break; + } + + if (stop_before && ikey.sequence <= stop_before) { + // hit an entry that's visible by the previous snapshot, can't touch that + break; + } + + if (kTypeDeletion == ikey.type) { + // hit a delete + // => merge nullptr with operand + // => change the entry type to kTypeValue + // We are done! + user_merge_operator_->Merge(ikey.user_key, nullptr, operand, + &value_, logger_); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key_[0], key_.size(), + orig_ikey.sequence, orig_ikey.type); + // move iter to the next entry + iter->Next(); + return; + } + + if (kTypeValue == ikey.type) { + // hit a put + // => merge the put value with operand + // => change the entry type to kTypeValue + // We are done! + const Slice value = iter->value(); + user_merge_operator_->Merge(ikey.user_key, &value, Slice(operand), + &value_, logger_); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key_[0], key_.size(), + orig_ikey.sequence, orig_ikey.type); + // move iter to the next entry + iter->Next(); + return; + } + + if (kTypeMerge == ikey.type) { + // hit a merge + // => merge the value with operand. + // => put the result back to operand and continue + const Slice value = iter->value(); + user_merge_operator_->Merge(ikey.user_key, &value, operand, + &value_, logger_); + swap(value_, operand); + continue; + } + } + + // We have seen the root history of this key if we are at the + // bottem level and exhausted all internal keys of this user key + // NOTE: !iter->Valid() does not necessarily mean we hit the + // beginning of a user key, as versions of a user key might be + // split into multiple files and some files might not be included + // in the merge. + bool seen_the_beginning = hit_the_next_user_key && at_bottom; + + if (seen_the_beginning) { + // do a final merge with nullptr as the existing value and say + // bye to the merge type (it's now converted to a Put) + assert(kTypeMerge == orig_ikey.type); + user_merge_operator_->Merge(orig_ikey.user_key, nullptr, operand, + &value_, logger_); + orig_ikey.type = kTypeValue; + UpdateInternalKey(&key_[0], key_.size(), + orig_ikey.sequence, orig_ikey.type); + } else { + swap(value_, operand); + } +} + +} // namespace leveldb diff --git a/db/merge_helper.h b/db/merge_helper.h new file mode 100644 index 000000000..206d7a53a --- /dev/null +++ b/db/merge_helper.h @@ -0,0 +1,64 @@ +#ifndef MERGE_HELPER_H +#define MERGE_HELPER_H + +#include "db/dbformat.h" +#include "leveldb/slice.h" +#include + +namespace leveldb { + +class Comparator; +class Iterator; +class Logger; +class MergeOperator; + +class MergeHelper { + public: + MergeHelper(const Comparator* user_comparator, + const MergeOperator* user_merge_operator, + Logger* logger, + bool assert_valid_internal_key) + : user_comparator_(user_comparator), + user_merge_operator_(user_merge_operator), + logger_(logger), + assert_valid_internal_key_(assert_valid_internal_key) {} + + // Merge entries until we hit + // - a corrupted key + // - a Put/Delete, + // - a different user key, + // - a specific sequence number (snapshot boundary), + // or - the end of iteration + // iter: (IN) points to the first merge type entry + // (OUT) points to the first entry not included in the merge process + // stop_before: (IN) a sequence number that merge should not cross. + // 0 means no restriction + // at_bottom: (IN) true if the iterator covers the bottem level, which means + // we could reach the start of the history of this user key. + void MergeUntil(Iterator* iter, SequenceNumber stop_before = 0, + bool at_bottom = false); + + // Query the merge result + // These are valid until the next MergeUtil call + // IMPORTANT: the key type could change after the MergeUntil call. + // Put/Delete + Merge + ... + Merge => Put + // Merge + ... + Merge => Merge + Slice key() { return Slice(key_); } + Slice value() { return Slice(value_); } + + private: + const Comparator* user_comparator_; + const MergeOperator* user_merge_operator_; + Logger* logger_; + Iterator* iter_; // in: the internal iterator, positioned at the first merge entry + bool assert_valid_internal_key_; // enforce no internal key corruption? + + // the scratch area that holds the result of MergeUntil + // valid up to the next MergeUntil call + std::string key_; + std::string value_; +}; + +} // namespace leveldb + +#endif diff --git a/db/merge_test.cc b/db/merge_test.cc new file mode 100644 index 000000000..2d2f6514f --- /dev/null +++ b/db/merge_test.cc @@ -0,0 +1,253 @@ +#include +#include +#include + +#include "leveldb/cache.h" +#include "leveldb/comparator.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/merge_operator.h" +#include "db/dbformat.h" +#include "utilities/merge_operators.h" + +using namespace std; +using namespace leveldb; + +auto mergeOperator = MergeOperators::CreateUInt64AddOperator(); + +std::shared_ptr OpenDb() { + DB* db; + Options options; + options.create_if_missing = true; + options.merge_operator = mergeOperator.get(); + Status s = DB::Open(options, "/tmp/testdb", &db); + if (!s.ok()) { + cerr << s.ToString() << endl; + assert(false); + } + return std::shared_ptr(db); +} + +// Imagine we are maintaining a set of uint64 counters. +// Each counter has a distinct name. And we would like +// to support four high level operations: +// set, add, get and remove +// This is a quick implementation without a Merge operation. +class Counters { + + protected: + std::shared_ptr db_; + + WriteOptions put_option_; + ReadOptions get_option_; + WriteOptions delete_option_; + + uint64_t default_; + + public: + Counters(std::shared_ptr db, uint64_t defaultCount = 0) + : db_(db), + put_option_(), + get_option_(), + delete_option_(), + default_(defaultCount) { + assert(db_); + } + + virtual ~Counters() {} + + // public interface of Counters. + // All four functions return false + // if the underlying level db operation failed. + + // mapped to a levedb Put + bool set(const string& key, uint64_t value) { + // just treat the internal rep of int64 as the string + Slice slice((char *)&value, sizeof(value)); + auto s = db_->Put(put_option_, key, slice); + + if (s.ok()) { + return true; + } else { + cerr << s.ToString() << endl; + return false; + } + } + + // mapped to a leveldb Delete + bool remove(const string& key) { + auto s = db_->Delete(delete_option_, key); + + if (s.ok()) { + return true; + } else { + cerr << s.ToString() << std::endl; + return false; + } + } + + // mapped to a leveldb Get + bool get(const string& key, uint64_t *value) { + string str; + auto s = db_->Get(get_option_, key, &str); + + if (s.IsNotFound()) { + // return default value if not found; + *value = default_; + return true; + } else if (s.ok()) { + // deserialization + if (str.size() != sizeof(uint64_t)) { + cerr << "value corruption\n"; + return false; + } + *value = DecodeFixed64(&str[0]); + return true; + } else { + cerr << s.ToString() << std::endl; + return false; + } + } + + // 'add' is implemented as get -> modify -> set + // An alternative is a single merge operation, see MergeBasedCounters + virtual bool add(const string& key, uint64_t value) { + uint64_t base = default_; + return get(key, &base) && set(key, base + value); + } + + + // convenience functions for testing + void assert_set(const string& key, uint64_t value) { + assert(set(key, value)); + } + + void assert_remove(const string& key) { + assert(remove(key)); + } + + uint64_t assert_get(const string& key) { + uint64_t value = default_; + assert(get(key, &value)); + return value; + } + + void assert_add(const string& key, uint64_t value) { + assert(add(key, value)); + } +}; + +// Implement 'add' directly with the new Merge operation +class MergeBasedCounters : public Counters { + private: + WriteOptions merge_option_; // for merge + + public: + MergeBasedCounters(std::shared_ptr db, uint64_t defaultCount = 0) + : Counters(db, defaultCount), + merge_option_() { + } + + // mapped to a leveldb Merge operation + virtual bool add(const string& key, uint64_t value) override { + char encoded[sizeof(uint64_t)]; + EncodeFixed64(encoded, value); + Slice slice(encoded, sizeof(uint64_t)); + auto s = db_->Merge(merge_option_, key, slice); + + if (s.ok()) { + return true; + } else { + cerr << s.ToString() << endl; + return false; + } + } +}; + +void dumpDb(DB* db) { + auto it = unique_ptr(db->NewIterator(ReadOptions())); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + uint64_t value = DecodeFixed64(it->value().data()); + cout << it->key().ToString() << ": " << value << endl; + } + assert(it->status().ok()); // Check for any errors found during the scan +} + +void testCounters(Counters& counters, DB* db, bool test_compaction) { + + FlushOptions o; + o.wait = true; + + counters.assert_set("a", 1); + + if (test_compaction) db->Flush(o); + + assert(counters.assert_get("a") == 1); + + counters.assert_remove("b"); + + // defaut value is 0 if non-existent + assert(counters.assert_get("b") == 0); + + counters.assert_add("a", 2); + + if (test_compaction) db->Flush(o); + + // 1+2 = 3 + assert(counters.assert_get("a")== 3); + + dumpDb(db); + + std::cout << "1\n"; + + // 1+...+49 = ? + uint64_t sum = 0; + for (int i = 1; i < 50; i++) { + counters.assert_add("b", i); + sum += i; + } + assert(counters.assert_get("b") == sum); + + std::cout << "2\n"; + dumpDb(db); + + std::cout << "3\n"; + + if (test_compaction) { + db->Flush(o); + + cout << "Compaction started ...\n"; + db->CompactRange(nullptr, nullptr); + cout << "Compaction ended\n"; + + dumpDb(db); + + assert(counters.assert_get("a")== 3); + assert(counters.assert_get("b") == sum); + } +} + +int main(int argc, char *argv[]) { + + auto db = OpenDb(); + + { + cout << "Test read-modify-write counters... \n"; + Counters counters(db, 0); + testCounters(counters, db.get(), true); + } + + bool compact = false; + if (argc > 1) { + compact = true; + cout << "Turn on Compaction\n"; + } + + { + cout << "Test merge-based counters... \n"; + MergeBasedCounters counters(db, 0); + testCounters(counters, db.get(), compact); + } + + return 0; +} diff --git a/db/table_cache.cc b/db/table_cache.cc index 729d47018..9af91fea6 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -100,7 +100,7 @@ Status TableCache::Get(const ReadOptions& options, uint64_t file_size, const Slice& k, void* arg, - void (*saver)(void*, const Slice&, const Slice&, bool), + bool (*saver)(void*, const Slice&, const Slice&, bool), bool* tableIO) { Cache::Handle* handle = nullptr; Status s = FindTable(storage_options_, file_number, file_size, diff --git a/db/table_cache.h b/db/table_cache.h index 7b5dcc04d..f4dc3c86a 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -40,13 +40,14 @@ class TableCache { Table** tableptr = nullptr); // If a seek to internal key "k" in specified file finds an entry, - // call (*handle_result)(arg, found_key, found_value). + // call (*handle_result)(arg, found_key, found_value) repeatedly until + // it returns false. Status Get(const ReadOptions& options, uint64_t file_number, uint64_t file_size, const Slice& k, void* arg, - void (*handle_result)(void*, const Slice&, const Slice&, bool), + bool (*handle_result)(void*, const Slice&, const Slice&, bool), bool* tableIO); // Evict any entry for the specified file number diff --git a/db/version_set.cc b/db/version_set.cc index e86b790c2..428ceaff9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -12,6 +12,7 @@ #include "db/memtable.h" #include "db/table_cache.h" #include "leveldb/env.h" +#include "leveldb/merge_operator.h" #include "leveldb/table_builder.h" #include "table/merger.h" #include "table/two_level_iterator.h" @@ -227,29 +228,78 @@ enum SaverState { kFound, kDeleted, kCorrupt, + kMerge // value contains the current merge result (the operand) }; struct Saver { SaverState state; const Comparator* ucmp; Slice user_key; std::string* value; + const MergeOperator* merge_operator; + Logger* logger; bool didIO; // did we do any disk io? }; } -static void SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ +static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ Saver* s = reinterpret_cast(arg); ParsedInternalKey parsed_key; + // TODO: didIO and Merge? s->didIO = didIO; if (!ParseInternalKey(ikey, &parsed_key)) { + // TODO: what about corrupt during Merge? s->state = kCorrupt; } else { if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { - s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; - if (s->state == kFound) { - s->value->assign(v.data(), v.size()); + switch (parsed_key.type) { + case kTypeValue: + if (kNotFound == s->state) { + s->value->assign(v.data(), v.size()); + } else if (kMerge == s->state) { + std::string operand; + swap(operand, *s->value); + s->merge_operator->Merge(s->user_key, &v, operand, + s->value, s->logger); + } else { + assert(false); + } + s->state = kFound; + return false; + + case kTypeMerge: + if (kNotFound == s->state) { + s->state = kMerge; + s->value->assign(v.data(), v.size()); + } else if (kMerge == s->state) { + std::string operand; + swap(operand, *s->value); + s->merge_operator->Merge(s->user_key, &v, operand, + s->value, s->logger); + } else { + assert(false); + } + return true; + + case kTypeDeletion: + if (kNotFound == s->state) { + s->state = kDeleted; + } else if (kMerge == s->state) { + std::string operand; + swap(operand, *s->value); + s->merge_operator->Merge(s->user_key, nullptr, operand, + s->value, s->logger); + s->state = kFound; + } else { + assert(false); + } + return false; + } } } + + // s->state could be Corrupt, merge or notfound + + return false; } static bool NewestFirst(FileMetaData* a, FileMetaData* b) { @@ -269,14 +319,28 @@ Version::Version(VersionSet* vset, uint64_t version_number) files_ = new std::vector[vset->NumberLevels()]; } -Status Version::Get(const ReadOptions& options, - const LookupKey& k, - std::string* value, - GetStats* stats) { +void Version::Get(const ReadOptions& options, + const LookupKey& k, + std::string* value, + Status *status, + GetStats* stats, + const Options& db_options) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); - Status s; + + auto merge_operator = db_options.merge_operator; + auto logger = db_options.info_log; + + assert(status->ok() || status->IsMergeInProgress()); + Saver saver; + saver.state = status->ok()? kNotFound : kMerge; + saver.ucmp = ucmp; + saver.user_key = user_key; + saver.value = value; + saver.merge_operator = merge_operator; + saver.logger = logger.get(); + saver.didIO = false; stats->seek_file = nullptr; stats->seek_file_level = -1; @@ -325,24 +389,21 @@ Status Version::Get(const ReadOptions& options, } else { files = &tmp2; num_files = 1; + // TODO, is level 1-n files all disjoint in user key space? } } } - for (uint32_t i = 0; i < num_files; ++i) { + + for (uint32_t i = 0; i < num_files; ++i) { FileMetaData* f = files[i]; - Saver saver; - saver.state = kNotFound; - saver.ucmp = ucmp; - saver.user_key = user_key; - saver.value = value; - saver.didIO = false; bool tableIO = false; - s = vset_->table_cache_->Get(options, f->number, f->file_size, - ikey, &saver, SaveValue, &tableIO); - if (!s.ok()) { - return s; + *status = vset_->table_cache_->Get(options, f->number, f->file_size, + ikey, &saver, SaveValue, &tableIO); + // TODO: examine the behavior for corrupted key + if (!status->ok()) { + return; } if (last_file_read != nullptr && stats->seek_file == nullptr) { @@ -367,18 +428,33 @@ Status Version::Get(const ReadOptions& options, case kNotFound: break; // Keep searching in other files case kFound: - return s; + return; case kDeleted: - s = Status::NotFound(Slice()); // Use empty error message for speed - return s; + *status = Status::NotFound(Slice()); // Use empty error message for speed + return; case kCorrupt: - s = Status::Corruption("corrupted key for ", user_key); - return s; + *status = Status::Corruption("corrupted key for ", user_key); + return; + case kMerge: + break; } } } - return Status::NotFound(Slice()); // Use an empty error message for speed + + if (kMerge == saver.state) { + // merge operand is in *value and we hit the beginning of the key history + // do a final merge of nullptr and operand; + std::string operand; + swap(operand, *value); + merge_operator->Merge(user_key, nullptr, operand, + value, logger.get()); + *status = Status::OK(); + return; + } else { + *status = Status::NotFound(Slice()); // Use an empty error message for speed + return; + } } bool Version::UpdateStats(const GetStats& stats) { diff --git a/db/version_set.h b/db/version_set.h index 1484adf46..e8a611384 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -73,8 +73,8 @@ class Version { FileMetaData* seek_file; int seek_file_level; }; - Status Get(const ReadOptions&, const LookupKey& key, std::string* val, - GetStats* stats); + void Get(const ReadOptions&, const LookupKey& key, std::string* val, + Status* status, GetStats* stats, const Options& db_option); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. diff --git a/db/write_batch.cc b/db/write_batch.cc index 33f4a4257..2465c966e 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -7,7 +7,8 @@ // count: fixed32 // data: record[count] // record := -// kTypeValue varstring varstring | +// kTypeValue varstring varstring +// kTypeMerge varstring varstring // kTypeDeletion varstring // varstring := // len: varint32 @@ -20,6 +21,7 @@ #include "db/memtable.h" #include "db/write_batch_internal.h" #include "util/coding.h" +#include namespace leveldb { @@ -34,6 +36,10 @@ WriteBatch::~WriteBatch() { } WriteBatch::Handler::~Handler() { } +void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) { + throw std::runtime_error("Handler::Merge not implemented!"); +} + void WriteBatch::Clear() { rep_.clear(); rep_.resize(kHeader); @@ -68,6 +74,14 @@ Status WriteBatch::Iterate(Handler* handler) const { return Status::Corruption("bad WriteBatch Delete"); } break; + case kTypeMerge: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + handler->Merge(key, value); + } else { + return Status::Corruption("bad WriteBatch Merge"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -108,6 +122,14 @@ void WriteBatch::Delete(const Slice& key) { PutLengthPrefixedSlice(&rep_, key); } +void WriteBatch::Merge(const Slice& key, const Slice& value) { + WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); + rep_.push_back(static_cast(kTypeMerge)); + PutLengthPrefixedSlice(&rep_, key); + PutLengthPrefixedSlice(&rep_, value); +} + + namespace { class MemTableInserter : public WriteBatch::Handler { public: @@ -118,6 +140,10 @@ class MemTableInserter : public WriteBatch::Handler { mem_->Add(sequence_, kTypeValue, key, value); sequence_++; } + virtual void Merge(const Slice& key, const Slice& value) { + mem_->Add(sequence_, kTypeMerge, key, value); + sequence_++; + } virtual void Delete(const Slice& key) { mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 71a5e89cc..5a4a604bb 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -33,6 +33,14 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; + case kTypeMerge: + state.append("Merge("); + state.append(ikey.user_key.ToString()); + state.append(", "); + state.append(iter->value().ToString()); + state.append(")"); + count++; + break; case kTypeDeletion: state.append("Delete("); state.append(ikey.user_key.ToString()); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 6a6f6fc89..261e2139b 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -83,6 +83,14 @@ class DB { // Note: consider setting options.sync = true. virtual Status Delete(const WriteOptions& options, const Slice& key) = 0; + // Merge the database entry for "key" with "value". Returns OK on success, + // and a non-OK status on error. The semantics of this operation is + // determined by the user provided merge_operator when opening DB. + // Note: consider setting options.sync = true. + virtual Status Merge(const WriteOptions& options, + const Slice& key, + const Slice& value) = 0; + // Apply the specified updates to the database. // Returns OK on success, non-OK on failure. // Note: consider setting options.sync = true. @@ -185,7 +193,7 @@ class DB { virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size) = 0; - // The sequence number of the most recent transaction. + // The sequence number of the most recent transaction. virtual SequenceNumber GetLatestSequenceNumber() = 0; // Return's an iterator for all writes since the sequence number diff --git a/include/leveldb/merge_operator.h b/include/leveldb/merge_operator.h new file mode 100644 index 000000000..d948d3c6e --- /dev/null +++ b/include/leveldb/merge_operator.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ +#define STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ + +#include + +namespace leveldb { + +class Slice; +class Logger; + +// The Merge Operator interface. +// Client needs to provide an object implementing this interface if Merge +// operation is accessed. +// Essentially, MergeOperator specifies the SEMANTICS of a merge, which only +// client knows. It could be numeric addition, list append, string +// concatenation, ... , anything. +// The library, on the other hand, is concerned with the exercise of this +// interface, at the right time (during get, iteration, compaction...) +// Note that, even though in principle we don't require any special property +// of the merge operator, the current rocksdb compaction order does imply that +// an associative operator could be exercised more naturally (and more +// efficiently). +// +// Refer to my_test.cc for an example of implementation +// +class MergeOperator { + public: + virtual ~MergeOperator() {} + + // Gives the client a way to express the read -> modify -> write semantics + // key: (IN) The key that's associated with this merge operation. + // Client could multiplex the merge operator based on it + // if the key space is partitioned and different subspaces + // refer to different types of data which have different + // merge operation semantics + // existing: (IN) null indicates that the key does not exist before this op + // value: (IN) The passed-in merge operand value (when Merge is issued) + // new_value:(OUT) Client is responsible for filling the merge result here + // logger: (IN) Client could use this to log errors during merge. + // + // Note: Merge does not return anything to indicate if a merge is successful + // or not. + // Rationale: If a merge failed due to, say de-serialization error, we still + // need to define a consistent merge result. Should we throw away + // the existing value? the merge operand? Or reset the merged value + // to sth? The rocksdb library is not in a position to make the + // right choice. On the other hand, client knows exactly what + // happened during Merge, thus is able to make the best decision. + // Just save the final decision in new_value. logger is passed in, + // in case client wants to leave a trace of what went wrong. + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const = 0; + + + // The name of the MergeOperator. Used to check for MergeOperator + // mismatches (i.e., a DB created with one MergeOperator is + // accessed using a different MergeOperator) + // TODO: the name is currently not stored persistently and thus + // no checking is enforced. Client is responsible for providing + // consistent MergeOperator between DB opens. + virtual const char* Name() const = 0; + +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_MERGE_OPERATOR_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 346f173d8..557303138 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -20,6 +20,7 @@ class Comparator; class Env; class FilterPolicy; class Logger; +class MergeOperator; class Snapshot; using std::shared_ptr; @@ -63,6 +64,18 @@ struct Options { // comparator provided to previous open calls on the same DB. const Comparator* comparator; + // REQUIRES: The client must provide a merge operator if Merge operation + // needs to be accessed. Calling Merge on a DB without a merge operator + // would result in Status::NotSupported. The client must ensure that the + // merge operator supplied here has the same name and *exactly* the same + // semantics as the merge operator provided to previous open calls on + // the same DB. The only exception is reserved for upgrade, where a DB + // previously without a merge operator is introduced to Merge operation + // for the first time. It's necessary to specify a merge operator when + // openning the DB in this case. + // Default: nullptr + const MergeOperator* merge_operator; + // If true, the database will be created if it is missing. // Default: false bool create_if_missing; diff --git a/include/leveldb/status.h b/include/leveldb/status.h index 3e4573cbe..a20351f32 100644 --- a/include/leveldb/status.h +++ b/include/leveldb/status.h @@ -47,6 +47,9 @@ class Status { static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIOError, msg, msg2); } + static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kMergeInProgress, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return (state_ == nullptr); } @@ -66,6 +69,9 @@ class Status { // Returns true iff the status indicates an IOError. bool IsIOError() const { return code() == kIOError; } + // Returns true iff the status indicates an MergeInProgress. + bool IsMergeInProgress() const { return code() == kMergeInProgress; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -84,7 +90,8 @@ class Status { kCorruption = 2, kNotSupported = 3, kInvalidArgument = 4, - kIOError = 5 + kIOError = 5, + kMergeInProgress = 6 }; Code code() const { diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 6546cadaa..a0f4e80ba 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -36,6 +36,10 @@ class WriteBatch { // Store the mapping "key->value" in the database. void Put(const Slice& key, const Slice& value); + // Merge "value" with the existing value of "key" in the database. + // "key->merge(existing, value)" + void Merge(const Slice& key, const Slice& value); + // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); @@ -47,6 +51,10 @@ class WriteBatch { public: virtual ~Handler(); virtual void Put(const Slice& key, const Slice& value) = 0; + // Merge is not pure virtual. Otherwise, we would break existing + // clients of Handler on a source code level. + // The default implementation simply throws a runtime exception. + virtual void Merge(const Slice& key, const Slice& value); virtual void Delete(const Slice& key) = 0; }; Status Iterate(Handler* handler) const; diff --git a/table/table.cc b/table/table.cc index b9971f464..386b9e074 100644 --- a/table/table.cc +++ b/table/table.cc @@ -285,11 +285,11 @@ Iterator* Table::NewIterator(const ReadOptions& options) const { Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, - void (*saver)(void*, const Slice&, const Slice&, bool)) { + bool (*saver)(void*, const Slice&, const Slice&, bool)) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); - iiter->Seek(k); - if (iiter->Valid()) { + bool done = false; + for (iiter->Seek(k); iiter->Valid() && !done; iiter->Next()) { Slice handle_value = iiter->value(); FilterBlockReader* filter = rep_->filter; BlockHandle handle; @@ -297,14 +297,20 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, handle.DecodeFrom(&handle_value).ok() && !filter->KeyMayMatch(handle.offset(), k)) { // Not found + // TODO: think about interaction with Merge. If a user key cannot + // cross one data block, we should be fine. RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL); + break; } else { bool didIO = false; Iterator* block_iter = BlockReader(this, options, iiter->value(), &didIO); - block_iter->Seek(k); - if (block_iter->Valid()) { - (*saver)(arg, block_iter->key(), block_iter->value(), didIO); + + for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) { + if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) { + done = true; + break; + } } s = block_iter->status(); delete block_iter; @@ -317,8 +323,9 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, return s; } -void SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) { +bool SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) { *reinterpret_cast(arg) = didIO; + return false; } bool Table::TEST_KeyInCache(const ReadOptions& options, const Slice& key) { // We use InternalGet() as it has logic that checks whether we read the diff --git a/table/table.h b/table/table.h index 816748ac4..bb0bd0385 100644 --- a/table/table.h +++ b/table/table.h @@ -74,14 +74,14 @@ class Table { static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, bool* didIO); - // Calls (*handle_result)(arg, ...) with the entry found after a call - // to Seek(key). May not make such a call if filter policy says - // that key is not present. + // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found + // after a call to Seek(key), until handle_result returns false. + // May not make such a call if filter policy says that key is not present. friend class TableCache; Status InternalGet( const ReadOptions&, const Slice& key, void* arg, - void (*handle_result)(void* arg, const Slice& k, const Slice& v, bool)); + bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool)); void ReadMeta(const Footer& footer); diff --git a/util/options.cc b/util/options.cc index 1d4836479..0fa80a06b 100644 --- a/util/options.cc +++ b/util/options.cc @@ -6,15 +6,17 @@ #include +#include "leveldb/cache.h" #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/filter_policy.h" -#include "leveldb/cache.h" +#include "leveldb/merge_operator.h" namespace leveldb { Options::Options() : comparator(BytewiseComparator()), + merge_operator(nullptr), create_if_missing(false), error_if_exists(false), paranoid_checks(false), @@ -72,7 +74,8 @@ void Options::Dump(Logger* log) const { Log(log," Options.comparator: %s", comparator->Name()); - Log(log," Options.create_if_missing: %d", create_if_missing); + Log(log," Options.merge_operator: %s", + merge_operator? merge_operator->Name() : "None"); Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.env: %p", env); diff --git a/util/status.cc b/util/status.cc index 5591381a1..2cd737579 100644 --- a/util/status.cc +++ b/util/status.cc @@ -58,6 +58,9 @@ std::string Status::ToString() const { case kIOError: type = "IO error: "; break; + case kMergeInProgress: + type = "Merge In Progress: "; + break; default: snprintf(tmp, sizeof(tmp), "Unknown code(%d): ", static_cast(code())); diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h new file mode 100644 index 000000000..c7c06422a --- /dev/null +++ b/utilities/merge_operators.h @@ -0,0 +1,18 @@ +#ifndef MERGE_OPERATORS_H +#define MERGE_OPERATORS_H + +#include + +#include "leveldb/merge_operator.h" + +namespace leveldb { + +class MergeOperators { + public: + static std::shared_ptr CreatePutOperator(); + static std::shared_ptr CreateUInt64AddOperator(); +}; + +} + +#endif diff --git a/utilities/merge_operators/put.cc b/utilities/merge_operators/put.cc new file mode 100644 index 000000000..9a610c4ce --- /dev/null +++ b/utilities/merge_operators/put.cc @@ -0,0 +1,35 @@ +#include +#include "leveldb/slice.h" +#include "leveldb/merge_operator.h" +#include "utilities/merge_operators.h" + +using namespace leveldb; + +namespace { // anonymous namespace + +// A merge operator that mimics Put semantics +class PutOperator : public MergeOperator { + public: + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override { + // put basically only looks at the current value + new_value->assign(value.data(), value.size()); + } + + virtual const char* Name() const override { + return "PutOperator"; + } +}; + +} // end of anonymous namespace + +namespace leveldb { + +std::shared_ptr MergeOperators::CreatePutOperator() { + return std::make_shared(); +} + +} diff --git a/utilities/merge_operators/uint64add.cc b/utilities/merge_operators/uint64add.cc new file mode 100644 index 000000000..72ef29023 --- /dev/null +++ b/utilities/merge_operators/uint64add.cc @@ -0,0 +1,63 @@ +#include +#include "leveldb/env.h" +#include "leveldb/merge_operator.h" +#include "leveldb/slice.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + + +using namespace leveldb; + +namespace { // anonymous namespace + +// A 'model' merge operator with uint64 addition semantics +class UInt64AddOperator : public MergeOperator { + public: + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override { + // assuming 0 if no existing value + uint64_t existing = 0; + if (existing_value) { + if (existing_value->size() == sizeof(uint64_t)) { + existing = DecodeFixed64(existing_value->data()); + } else { + // if existing_value is corrupted, treat it as 0 + Log(logger, "existing value corruption, size: %zu > %zu", + existing_value->size(), sizeof(uint64_t)); + existing = 0; + } + } + + uint64_t operand; + if (value.size() == sizeof(uint64_t)) { + operand = DecodeFixed64(value.data()); + } else { + // if operand is corrupted, treat it as 0 + Log(logger, "operand value corruption, size: %zu > %zu", + value.size(), sizeof(uint64_t)); + operand = 0; + } + + new_value->resize(sizeof(uint64_t)); + EncodeFixed64(&(*new_value)[0], existing + operand); + + return; + } + + virtual const char* Name() const override { + return "UInt64AddOperator"; + } +}; + +} + +namespace leveldb { + +std::shared_ptr MergeOperators::CreateUInt64AddOperator() { + return std::make_shared(); +} + +} diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index c24719582..3af366c70 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -185,6 +185,12 @@ Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) { return db_->Delete(wopts, key); } +Status DBWithTTL::Merge(const WriteOptions& options, + const Slice& key, + const Slice& value) { + return Status::NotSupported("Merge operation not supported."); +} + Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { return db_->Write(opts, updates); } diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index e20c46e7c..e29a37b72 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -29,6 +29,11 @@ class DBWithTTL : public DB { virtual Status Delete(const WriteOptions& wopts, const Slice& key); + virtual Status Merge(const WriteOptions& options, + const Slice& key, + const Slice& value); + + virtual Status Write(const WriteOptions& opts, WriteBatch* updates); virtual Iterator* NewIterator(const ReadOptions& opts);