From ff463742b5fa2c97ee20abbcd8b4378c8796325f Mon Sep 17 00:00:00 2001 From: mrambacher Date: Mon, 10 May 2021 12:49:25 -0700 Subject: [PATCH] Add Merge Operator support to WriteBatchWithIndex (#8135) Summary: The WBWI has two differing modes of operation dependent on the value of the constructor parameter `overwrite_key`. Currently, regardless of the parameter, neither mode performs as expected when using Merge. This PR remedies this by correctly invoking the appropriate Merge Operator before returning results from the WBWI. Examples of issues that exist which are solved by this PR: ## Example 1 with `overwrite_key=false` Currently, from an empty database, the following sequence: ``` Put('k1', 'v1') Merge('k1', 'v2') Get('k1') ``` Incorrectly yields `v2`, that is to say that the Merge behaves like a Put. ## Example 2 with o`verwrite_key=true` Currently, from an empty database, the following sequence: ``` Put('k1', 'v1') Merge('k1', 'v2') Get('k1') ``` Incorrectly yields `ERROR: kMergeInProgress`. ## Example 3 with `overwrite_key=false` Currently, with a database containing `('k1' -> 'v1')`, the following sequence: ``` Merge('k1', 'v2') GetFromBatchAndDB('k1') ``` Incorrectly yields `v1,v2` ## Example 4 with `overwrite_key=true` Currently, with a database containing `('k1' -> 'v1')`, the following sequence: ``` Merge('k1', 'v1') GetFromBatchAndDB('k1') ``` Incorrectly yields `ERROR: kMergeInProgress`. ## Example 5 with `overwrite_key=false` Currently, from an empty database, the following sequence: ``` Put('k1', 'v1') Merge('k1', 'v2') GetFromBatchAndDB('k1') ``` Incorrectly yields `v1,v2` ## Example 6 with `overwrite_key=true` Currently, from an empty database, `('k1' -> 'v1')`, the following sequence: ``` Put('k1', 'v1') Merge('k1', 'v2') GetFromBatchAndDB('k1') ``` Incorrectly yields `ERROR: kMergeInProgress`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8135 Reviewed By: pdillinger Differential Revision: D27657938 Pulled By: mrambacher fbshipit-source-id: 0fbda6bbc66bedeba96a84786d90141d776297df --- HISTORY.md | 2 + db/merge_context.h | 28 +- utilities/transactions/transaction_base.cc | 3 +- utilities/transactions/transaction_test.cc | 9 +- .../write_batch_with_index.cc | 168 +- .../write_batch_with_index_internal.cc | 304 ++-- .../write_batch_with_index_internal.h | 60 +- .../write_batch_with_index_test.cc | 1584 ++++++++++------- 8 files changed, 1268 insertions(+), 890 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index e0e4ffec3..163c6788f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -34,6 +34,8 @@ * `CompactFiles()` can no longer compact files from lower level to up level, which has the risk to corrupt DB (details: #8063). The validation is also added to all compactions. * Fixed some cases in which DB::OpenForReadOnly() could write to the filesystem. If you want a Logger with a read-only DB, you must now set DBOptions::info_log yourself, such as using CreateLoggerFromOptions(). * get_iostats_context() will never return nullptr. If thread-local support is not available, and user does not opt-out iostats context, then compilation will fail. The same applies to perf context as well. +* Added support for WriteBatchWithIndex::NewIteratorWithBase when overwrite_key=false. Previously, this combination was not supported and would assert or return nullptr. +* Improve the behavior of WriteBatchWithIndex for Merge operations. Now more operations may be stored in order to return the correct merged result. ### Bug Fixes * Use thread-safe `strerror_r()` to get error messages. diff --git a/db/merge_context.h b/db/merge_context.h index e1869a341..925bfc0e0 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -68,7 +68,7 @@ class MergeContext { } // Get the operand at the index. - Slice GetOperand(int index) { + Slice GetOperand(int index) const { assert(operand_list_); SetDirectionForward(); @@ -76,13 +76,21 @@ class MergeContext { } // Same as GetOperandsDirectionForward - const std::vector& GetOperands() { + // + // Note that the returned reference is only good until another call + // to this MergeContext. If the returned value is needed for longer, + // a copy must be made. + const std::vector& GetOperands() const { return GetOperandsDirectionForward(); } // Return all the operands in the order as they were merged (passed to // FullMerge or FullMergeV2) - const std::vector& GetOperandsDirectionForward() { + // + // Note that the returned reference is only good until another call + // to this MergeContext. If the returned value is needed for longer, + // a copy must be made. + const std::vector& GetOperandsDirectionForward() const { if (!operand_list_) { return empty_operand_list; } @@ -93,7 +101,11 @@ class MergeContext { // Return all the operands in the reversed order relative to how they were // merged (passed to FullMerge or FullMergeV2) - const std::vector& GetOperandsDirectionBackward() { + // + // Note that the returned reference is only good until another call + // to this MergeContext. If the returned value is needed for longer, + // a copy must be made. + const std::vector& GetOperandsDirectionBackward() const { if (!operand_list_) { return empty_operand_list; } @@ -110,14 +122,14 @@ class MergeContext { } } - void SetDirectionForward() { + void SetDirectionForward() const { if (operands_reversed_ == true) { std::reverse(operand_list_->begin(), operand_list_->end()); operands_reversed_ = false; } } - void SetDirectionBackward() { + void SetDirectionBackward() const { if (operands_reversed_ == false) { std::reverse(operand_list_->begin(), operand_list_->end()); operands_reversed_ = true; @@ -125,10 +137,10 @@ class MergeContext { } // List of operands - std::unique_ptr> operand_list_; + mutable std::unique_ptr> operand_list_; // Copy of operands that are not pinned. std::unique_ptr>> copied_operands_; - bool operands_reversed_ = true; + mutable bool operands_reversed_ = true; }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 49fa99d7d..1b713acc8 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -306,7 +306,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) { Iterator* db_iter = db_->NewIterator(read_options); assert(db_iter); - return write_batch_.NewIteratorWithBase(db_iter); + return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter, + &read_options); } Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index def6d0f6b..81a1e3a61 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2814,7 +2814,8 @@ TEST_P(TransactionTest, MultiGetBatchedTest) { ASSERT_TRUE(statuses[1].IsNotFound()); ASSERT_TRUE(statuses[2].ok()); ASSERT_EQ(values[2], "val3_new"); - ASSERT_TRUE(statuses[3].IsMergeInProgress()); + ASSERT_TRUE(statuses[3].ok()); + ASSERT_EQ(values[3], "foo,bar"); ASSERT_TRUE(statuses[4].ok()); ASSERT_EQ(values[4], "val5"); ASSERT_TRUE(statuses[5].ok()); @@ -4839,7 +4840,8 @@ TEST_P(TransactionTest, MergeTest) { ASSERT_OK(s); s = txn->Get(read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(s); + ASSERT_EQ("a0,1,2", value); s = txn->Put("A", "a"); ASSERT_OK(s); @@ -4852,7 +4854,8 @@ TEST_P(TransactionTest, MergeTest) { ASSERT_OK(s); s = txn->Get(read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(s); + ASSERT_EQ("a,3", value); TransactionOptions txn_options; txn_options.lock_timeout = 1; // 1 ms diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 809d3d04c..97b565a13 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -53,13 +53,16 @@ struct WriteBatchWithIndex::Rep { // In overwrite mode, find the existing entry for the same key and update it // to point to the current entry. // Return true if the key is found and updated. - bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); - bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); + bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key, + WriteType type); + bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key, + WriteType type); // Add the recent entry to the update. // In overwrite mode, if key already exists in the index, update it. - void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); - void AddOrUpdateIndex(const Slice& key); + void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key, + WriteType type); + void AddOrUpdateIndex(const Slice& key, WriteType type); // Allocate an index entry pointing to the last entry in the write batch and // put it to skip list. @@ -75,13 +78,13 @@ struct WriteBatchWithIndex::Rep { }; bool WriteBatchWithIndex::Rep::UpdateExistingEntry( - ColumnFamilyHandle* column_family, const Slice& key) { + ColumnFamilyHandle* column_family, const Slice& key, WriteType type) { uint32_t cf_id = GetColumnFamilyID(column_family); - return UpdateExistingEntryWithCfId(cf_id, key); + return UpdateExistingEntryWithCfId(cf_id, key, type); } bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( - uint32_t column_family_id, const Slice& key) { + uint32_t column_family_id, const Slice& key, WriteType type) { if (!overwrite_key) { return false; } @@ -91,9 +94,16 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( iter.Seek(key); if (!iter.Valid()) { return false; - } - if (!iter.MatchesKey(column_family_id, key)) { + } else if (!iter.MatchesKey(column_family_id, key)) { return false; + } else { + // Move to the end of this key (NextKey-Prev) + iter.NextKey(); // Move to the next key + if (iter.Valid()) { + iter.Prev(); // Move back one entry + } else { + iter.SeekToLast(); + } } WriteBatchIndexEntry* non_const_entry = const_cast(iter.GetRawEntry()); @@ -101,13 +111,17 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( last_sub_batch_offset = last_entry_offset; sub_batch_cnt++; } - non_const_entry->offset = last_entry_offset; - return true; + if (type == kMergeRecord) { + return false; + } else { + non_const_entry->offset = last_entry_offset; + return true; + } } void WriteBatchWithIndex::Rep::AddOrUpdateIndex( - ColumnFamilyHandle* column_family, const Slice& key) { - if (!UpdateExistingEntry(column_family, key)) { + ColumnFamilyHandle* column_family, const Slice& key, WriteType type) { + if (!UpdateExistingEntry(column_family, key, type)) { uint32_t cf_id = GetColumnFamilyID(column_family); const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); if (cf_cmp != nullptr) { @@ -117,8 +131,9 @@ void WriteBatchWithIndex::Rep::AddOrUpdateIndex( } } -void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { - if (!UpdateExistingEntryWithCfId(0, key)) { +void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key, + WriteType type) { + if (!UpdateExistingEntryWithCfId(0, key, type)) { AddNewEntry(0); } } @@ -190,14 +205,31 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() { switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key, kPutRecord)) { + AddNewEntry(column_family_id); + } + break; case kTypeColumnFamilyDeletion: case kTypeDeletion: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key, + kDeleteRecord)) { + AddNewEntry(column_family_id); + } + break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key, + kSingleDeleteRecord)) { + AddNewEntry(column_family_id); + } + break; case kTypeColumnFamilyMerge: case kTypeMerge: found++; - if (!UpdateExistingEntryWithCfId(column_family_id, key)) { + if (!UpdateExistingEntryWithCfId(column_family_id, key, kMergeRecord)) { AddNewEntry(column_family_id); } break; @@ -255,22 +287,19 @@ WBWIIterator* WriteBatchWithIndex::NewIterator( Iterator* WriteBatchWithIndex::NewIteratorWithBase( ColumnFamilyHandle* column_family, Iterator* base_iterator, const ReadOptions* read_options) { - if (rep->overwrite_key == false) { - assert(false); - return nullptr; - } - return new BaseDeltaIterator(base_iterator, NewIterator(column_family), + auto wbwiii = + new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list), + &rep->write_batch, &rep->comparator); + return new BaseDeltaIterator(column_family, base_iterator, wbwiii, GetColumnFamilyUserComparator(column_family), read_options); } Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { - if (rep->overwrite_key == false) { - assert(false); - return nullptr; - } // default column family's comparator - return new BaseDeltaIterator(base_iterator, NewIterator(), + auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch, + &rep->comparator); + return new BaseDeltaIterator(nullptr, base_iterator, wbwiii, rep->comparator.default_comparator()); } @@ -279,7 +308,7 @@ Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Put(column_family, key, value); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kPutRecord); } return s; } @@ -288,7 +317,7 @@ Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Put(key, value); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kPutRecord); } return s; } @@ -298,7 +327,7 @@ Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Delete(column_family, key); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kDeleteRecord); } return s; } @@ -307,7 +336,7 @@ Status WriteBatchWithIndex::Delete(const Slice& key) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Delete(key); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kDeleteRecord); } return s; } @@ -317,7 +346,7 @@ Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.SingleDelete(column_family, key); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kSingleDeleteRecord); } return s; } @@ -326,7 +355,7 @@ Status WriteBatchWithIndex::SingleDelete(const Slice& key) { rep->SetLastEntryOffset(); auto s = rep->write_batch.SingleDelete(key); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kSingleDeleteRecord); } return s; } @@ -336,7 +365,7 @@ Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(column_family, key, value); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kMergeRecord); } return s; } @@ -345,7 +374,7 @@ Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(key, value); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kMergeRecord); } return s; } @@ -361,18 +390,18 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { Status s; WriteBatchWithIndexInternal wbwii(&options, column_family); - auto result = wbwii.GetFromBatch(this, key, value, rep->overwrite_key, &s); + auto result = wbwii.GetFromBatch(this, key, value, &s); switch (result) { - case WriteBatchWithIndexInternal::Result::kFound: - case WriteBatchWithIndexInternal::Result::kError: + case WBWIIteratorImpl::kFound: + case WBWIIteratorImpl::kError: // use returned status break; - case WriteBatchWithIndexInternal::Result::kDeleted: - case WriteBatchWithIndexInternal::Result::kNotFound: + case WBWIIteratorImpl::kDeleted: + case WBWIIteratorImpl::kNotFound: s = Status::NotFound(); break; - case WriteBatchWithIndexInternal::Result::kMergeInProgress: + case WBWIIteratorImpl::kMergeInProgress: s = Status::MergeInProgress(); break; default: @@ -440,29 +469,18 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( // we cannot pin it as otherwise the returned value will not be available // after the transaction finishes. std::string& batch_value = *pinnable_val->GetSelf(); - auto result = - wbwii.GetFromBatch(this, key, &batch_value, rep->overwrite_key, &s); + auto result = wbwii.GetFromBatch(this, key, &batch_value, &s); - if (result == WriteBatchWithIndexInternal::Result::kFound) { + if (result == WBWIIteratorImpl::kFound) { pinnable_val->PinSelf(); return s; - } - if (result == WriteBatchWithIndexInternal::Result::kDeleted) { + } else if (!s.ok() || result == WBWIIteratorImpl::kError) { + return s; + } else if (result == WBWIIteratorImpl::kDeleted) { return Status::NotFound(); } - if (result == WriteBatchWithIndexInternal::Result::kError) { - return s; - } - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && - rep->overwrite_key == true) { - // Since we've overwritten keys, we do not know what other operations are - // in this batch for this key, so we cannot do a Merge to compute the - // result. Instead, we will simply return MergeInProgress. - return Status::MergeInProgress(); - } - - assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || - result == WriteBatchWithIndexInternal::Result::kNotFound); + assert(result == WBWIIteratorImpl::kMergeInProgress || + result == WBWIIteratorImpl::kNotFound); // Did not find key in batch OR could not resolve Merges. Try DB. if (!callback) { @@ -477,7 +495,7 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( } if (s.ok() || s.IsNotFound()) { // DB Get Succeeded - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { + if (result == WBWIIteratorImpl::kMergeInProgress) { // Merge result from DB with merges in Batch std::string merge_result; if (s.ok()) { @@ -513,7 +531,7 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( autovector key_context; autovector sorted_keys; // To hold merges from the write batch - autovector, + autovector, MultiGetContext::MAX_BATCH_SIZE> merges; // Since the lifetime of the WriteBatch is the same as that of the transaction @@ -524,31 +542,22 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( PinnableSlice* pinnable_val = &values[i]; std::string& batch_value = *pinnable_val->GetSelf(); Status* s = &statuses[i]; - auto result = wbwii.GetFromBatch(this, keys[i], &merge_context, - &batch_value, rep->overwrite_key, s); + auto result = + wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s); - if (result == WriteBatchWithIndexInternal::Result::kFound) { + if (result == WBWIIteratorImpl::kFound) { pinnable_val->PinSelf(); continue; } - if (result == WriteBatchWithIndexInternal::Result::kDeleted) { + if (result == WBWIIteratorImpl::kDeleted) { *s = Status::NotFound(); continue; } - if (result == WriteBatchWithIndexInternal::Result::kError) { + if (result == WBWIIteratorImpl::kError) { continue; } - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && - rep->overwrite_key == true) { - // Since we've overwritten keys, we do not know what other operations are - // in this batch for this key, so we cannot do a Merge to compute the - // result. Instead, we will simply return MergeInProgress. - *s = Status::MergeInProgress(); - continue; - } - - assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || - result == WriteBatchWithIndexInternal::Result::kNotFound); + assert(result == WBWIIteratorImpl::kMergeInProgress || + result == WBWIIteratorImpl::kNotFound); key_context.emplace_back(column_family, keys[i], &values[i], /*timestamp*/ nullptr, &statuses[i]); merges.emplace_back(result, std::move(merge_context)); @@ -569,10 +578,9 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( KeyContext& key = *iter; if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded size_t index = iter - key_context.begin(); - std::pair& - merge_result = merges[index]; - if (merge_result.first == - WriteBatchWithIndexInternal::Result::kMergeInProgress) { + std::pair& merge_result = + merges[index]; + if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) { // Merge result from DB with merges in Batch if (key.s->ok()) { *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second, diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index fb706bf88..81674cf09 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -19,8 +19,9 @@ #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { -BaseDeltaIterator::BaseDeltaIterator(Iterator* base_iterator, - WBWIIterator* delta_iterator, +BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family, + Iterator* base_iterator, + WBWIIteratorImpl* delta_iterator, const Comparator* comparator, const ReadOptions* read_options) : forward_(true), @@ -31,7 +32,9 @@ BaseDeltaIterator::BaseDeltaIterator(Iterator* base_iterator, delta_iterator_(delta_iterator), comparator_(comparator), iterate_upper_bound_(read_options ? read_options->iterate_upper_bound - : nullptr) {} + : nullptr) { + wbwii_.reset(new WriteBatchWithIndexInternal(column_family)); +} bool BaseDeltaIterator::Valid() const { return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid()) : false; @@ -144,8 +147,32 @@ Slice BaseDeltaIterator::key() const { } Slice BaseDeltaIterator::value() const { - return current_at_base_ ? base_iterator_->value() - : delta_iterator_->Entry().value; + if (current_at_base_) { + return base_iterator_->value(); + } else { + WriteEntry delta_entry = delta_iterator_->Entry(); + if (wbwii_->GetNumOperands() == 0) { + return delta_entry.value; + } else if (delta_entry.type == kDeleteRecord || + delta_entry.type == kSingleDeleteRecord) { + status_ = + wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf()); + } else if (delta_entry.type == kPutRecord) { + status_ = wbwii_->MergeKey(delta_entry.key, &delta_entry.value, + merge_result_.GetSelf()); + } else if (delta_entry.type == kMergeRecord) { + if (equal_keys_) { + Slice base_value = base_iterator_->value(); + status_ = wbwii_->MergeKey(delta_entry.key, &base_value, + merge_result_.GetSelf()); + } else { + status_ = + wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf()); + } + } + merge_result_.PinSelf(); + return merge_result_; + } } Status BaseDeltaIterator::status() const { @@ -228,12 +255,11 @@ void BaseDeltaIterator::Advance() { void BaseDeltaIterator::AdvanceDelta() { if (forward_) { - delta_iterator_->Next(); + delta_iterator_->NextKey(); } else { - delta_iterator_->Prev(); + delta_iterator_->PrevKey(); } } - void BaseDeltaIterator::AdvanceBase() { if (forward_) { base_iterator_->Next(); @@ -243,17 +269,18 @@ void BaseDeltaIterator::AdvanceBase() { } bool BaseDeltaIterator::BaseValid() const { return base_iterator_->Valid(); } - bool BaseDeltaIterator::DeltaValid() const { return delta_iterator_->Valid(); } - void BaseDeltaIterator::UpdateCurrent() { // Suppress false positive clang analyzer warnings. #ifndef __clang_analyzer__ status_ = Status::OK(); while (true) { + auto delta_result = WBWIIteratorImpl::kNotFound; WriteEntry delta_entry; if (DeltaValid()) { assert(delta_iterator_->status().ok()); + delta_result = + delta_iterator_->FindLatestUpdate(wbwii_->GetMergeContext()); delta_entry = delta_iterator_->Entry(); } else if (!delta_iterator_->status().ok()) { // Expose the error status and stop. @@ -279,8 +306,8 @@ void BaseDeltaIterator::UpdateCurrent() { return; } } - if (delta_entry.type == kDeleteRecord || - delta_entry.type == kSingleDeleteRecord) { + if (delta_result == WBWIIteratorImpl::kDeleted && + wbwii_->GetNumOperands() == 0) { AdvanceDelta(); } else { current_at_base_ = false; @@ -298,8 +325,8 @@ void BaseDeltaIterator::UpdateCurrent() { if (compare == 0) { equal_keys_ = true; } - if (delta_entry.type != kDeleteRecord && - delta_entry.type != kSingleDeleteRecord) { + if (delta_result != WBWIIteratorImpl::kDeleted || + wbwii_->GetNumOperands() > 0) { current_at_base_ = false; return; } @@ -319,9 +346,105 @@ void BaseDeltaIterator::UpdateCurrent() { #endif // __clang_analyzer__ } -class Env; -class Logger; -class Statistics; +void WBWIIteratorImpl::AdvanceKey(bool forward) { + if (Valid()) { + Slice key = Entry().key; + do { + if (forward) { + Next(); + } else { + Prev(); + } + } while (MatchesKey(column_family_id_, key)); + } +} + +void WBWIIteratorImpl::NextKey() { AdvanceKey(true); } + +void WBWIIteratorImpl::PrevKey() { + AdvanceKey(false); // Move to the tail of the previous key + if (Valid()) { + AdvanceKey(false); // Move back another key. Now we are at the start of + // the previous key + if (Valid()) { // Still a valid + Next(); // Move forward one onto this key + } else { + SeekToFirst(); // Not valid, move to the start + } + } +} + +WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate( + MergeContext* merge_context) { + if (Valid()) { + Slice key = Entry().key; + return FindLatestUpdate(key, merge_context); + } else { + merge_context->Clear(); // Clear any entries in the MergeContext + return WBWIIteratorImpl::kNotFound; + } +} + +WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate( + const Slice& key, MergeContext* merge_context) { + Result result = WBWIIteratorImpl::kNotFound; + merge_context->Clear(); // Clear any entries in the MergeContext + // TODO(agiardullo): consider adding support for reverse iteration + if (!Valid()) { + return result; + } else if (comparator_->CompareKey(column_family_id_, Entry().key, key) != + 0) { + return result; + } else { + // We want to iterate in the reverse order that the writes were added to the + // batch. Since we don't have a reverse iterator, we must seek past the + // end. We do this by seeking to the next key, and then back one step + NextKey(); + if (Valid()) { + Prev(); + } else { + SeekToLast(); + } + + // We are at the end of the iterator for this key. Search backwards for the + // last Put or Delete, accumulating merges along the way. + while (Valid()) { + const WriteEntry entry = Entry(); + if (comparator_->CompareKey(column_family_id_, entry.key, key) != 0) { + break; // Unexpected error or we've reached a different next key + } + + switch (entry.type) { + case kPutRecord: + return WBWIIteratorImpl::kFound; + case kDeleteRecord: + return WBWIIteratorImpl::kDeleted; + case kSingleDeleteRecord: + return WBWIIteratorImpl::kDeleted; + case kMergeRecord: + result = WBWIIteratorImpl::kMergeInProgress; + merge_context->PushOperand(entry.value); + break; + case kLogDataRecord: + break; // ignore + case kXIDRecord: + break; // ignore + default: + return WBWIIteratorImpl::kError; + } // end switch statement + Prev(); + } // End while Valid() + // At this point, we have been through the whole list and found no Puts or + // Deletes. The iterator points to the previous key. Move the iterator back + // onto this one. + if (Valid()) { + Next(); + } else { + SeekToFirst(); + } + } + return result; +} Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, @@ -479,6 +602,10 @@ bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) { } } +WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( + ColumnFamilyHandle* column_family) + : db_(nullptr), db_options_(nullptr), column_family_(column_family) {} + WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( DB* db, ColumnFamilyHandle* column_family) : db_(db), db_options_(nullptr), column_family_(column_family) { @@ -493,9 +620,9 @@ WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, const Slice* value, - MergeContext& merge_context, + const MergeContext& context, std::string* result, - Slice* result_operand) { + Slice* result_operand) const { if (column_family_ != nullptr) { auto cfh = static_cast_with_check(column_family_); const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get(); @@ -509,133 +636,66 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, Statistics* statistics = immutable_db_options.statistics.get(); Logger* logger = immutable_db_options.info_log.get(); SystemClock* clock = immutable_db_options.clock; - return MergeHelper::TimedFullMerge( - merge_operator, key, value, merge_context.GetOperands(), result, - logger, statistics, clock, result_operand); + return MergeHelper::TimedFullMerge(merge_operator, key, value, + context.GetOperands(), result, logger, + statistics, clock, result_operand); } else if (db_options_ != nullptr) { Statistics* statistics = db_options_->statistics.get(); Env* env = db_options_->env; Logger* logger = db_options_->info_log.get(); SystemClock* clock = env->GetSystemClock().get(); - return MergeHelper::TimedFullMerge( - merge_operator, key, value, merge_context.GetOperands(), result, - logger, statistics, clock, result_operand); + return MergeHelper::TimedFullMerge(merge_operator, key, value, + context.GetOperands(), result, logger, + statistics, clock, result_operand); } else { + const auto cf_opts = cfh->cfd()->ioptions(); return MergeHelper::TimedFullMerge( - merge_operator, key, value, merge_context.GetOperands(), result, - nullptr, nullptr, SystemClock::Default().get(), result_operand); + merge_operator, key, value, context.GetOperands(), result, + cf_opts->logger, cf_opts->stats, cf_opts->clock, result_operand); } } else { return Status::InvalidArgument("Must provide a column_family"); } } -WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( - WriteBatchWithIndex* batch, const Slice& key, MergeContext* merge_context, - std::string* value, bool overwrite_key, Status* s) { - uint32_t cf_id = GetColumnFamilyID(column_family_); +WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch( + WriteBatchWithIndex* batch, const Slice& key, MergeContext* context, + std::string* value, Status* s) { *s = Status::OK(); - Result result = kNotFound; std::unique_ptr iter( static_cast_with_check( batch->NewIterator(column_family_))); - // We want to iterate in the reverse order that the writes were added to the - // batch. Since we don't have a reverse iterator, we must seek past the end. - // TODO(agiardullo): consider adding support for reverse iteration + // Search the iterator for this key, and updates/merges to it. iter->Seek(key); - while (iter->Valid() && iter->MatchesKey(cf_id, key)) { - iter->Next(); - } - - if (!(*s).ok()) { - return WriteBatchWithIndexInternal::kError; - } - - if (!iter->Valid()) { - // Read past end of results. Reposition on last result. - iter->SeekToLast(); - } else { - iter->Prev(); - } - - Slice entry_value; - while (iter->Valid()) { - if (!iter->MatchesKey(cf_id, key)) { - // Unexpected error or we've reached a different next key - break; + auto result = iter->FindLatestUpdate(key, context); + if (result == WBWIIteratorImpl::kError) { + (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", + ToString(iter->Entry().type)); + return result; + } else if (result == WBWIIteratorImpl::kNotFound) { + return result; + } else if (result == WBWIIteratorImpl::Result::kFound) { // PUT + Slice entry_value = iter->Entry().value; + if (context->GetNumOperands() > 0) { + *s = MergeKey(key, &entry_value, *context, value); + if (!s->ok()) { + result = WBWIIteratorImpl::Result::kError; + } + } else { + value->assign(entry_value.data(), entry_value.size()); } - - const WriteEntry entry = iter->Entry(); - switch (entry.type) { - case kPutRecord: { - result = WriteBatchWithIndexInternal::Result::kFound; - entry_value = entry.value; - break; - } - case kMergeRecord: { - result = WriteBatchWithIndexInternal::Result::kMergeInProgress; - merge_context->PushOperand(entry.value); - break; - } - case kDeleteRecord: - case kSingleDeleteRecord: { - result = WriteBatchWithIndexInternal::Result::kDeleted; - break; - } - case kLogDataRecord: - case kXIDRecord: { - // ignore - break; - } - default: { - result = WriteBatchWithIndexInternal::Result::kError; - (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", - ToString(entry.type)); - break; - } - } - if (result == WriteBatchWithIndexInternal::Result::kFound || - result == WriteBatchWithIndexInternal::Result::kDeleted || - result == WriteBatchWithIndexInternal::Result::kError) { - // We can stop iterating once we find a PUT or DELETE - break; - } - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && - overwrite_key == true) { - // Since we've overwritten keys, we do not know what other operations are - // in this batch for this key, so we cannot do a Merge to compute the - // result. Instead, we will simply return MergeInProgress. - break; - } - - iter->Prev(); - } - - if ((*s).ok()) { - if (result == WriteBatchWithIndexInternal::Result::kFound || - result == WriteBatchWithIndexInternal::Result::kDeleted) { - // Found a Put or Delete. Merge if necessary. - if (merge_context->GetNumOperands() > 0) { - if (result == WriteBatchWithIndexInternal::Result::kFound) { - *s = MergeKey(key, &entry_value, *merge_context, value); - } else { - *s = MergeKey(key, nullptr, *merge_context, value); - } - if ((*s).ok()) { - result = WriteBatchWithIndexInternal::Result::kFound; - } else { - result = WriteBatchWithIndexInternal::Result::kError; - } - } else { // nothing to merge - if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT - value->assign(entry_value.data(), entry_value.size()); - } + } else if (result == WBWIIteratorImpl::kDeleted) { + if (context->GetNumOperands() > 0) { + *s = MergeKey(key, nullptr, *context, value); + if (s->ok()) { + result = WBWIIteratorImpl::Result::kFound; + } else { + result = WBWIIteratorImpl::Result::kError; } } } - return result; } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index c0bb81fba..60ec66e30 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -23,6 +23,8 @@ namespace ROCKSDB_NAMESPACE { class MergeContext; +class WBWIIteratorImpl; +class WriteBatchWithIndexInternal; struct Options; // when direction == forward @@ -33,7 +35,8 @@ struct Options; // * equal_keys_ <=> base_iterator == delta_iterator class BaseDeltaIterator : public Iterator { public: - BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, + BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator, + WBWIIteratorImpl* delta_iterator, const Comparator* comparator, const ReadOptions* read_options = nullptr); @@ -60,14 +63,16 @@ class BaseDeltaIterator : public Iterator { bool DeltaValid() const; void UpdateCurrent(); + std::unique_ptr wbwii_; bool forward_; bool current_at_base_; bool equal_keys_; - Status status_; + mutable Status status_; std::unique_ptr base_iterator_; - std::unique_ptr delta_iterator_; + std::unique_ptr delta_iterator_; const Comparator* comparator_; // not owned const Slice* iterate_upper_bound_; + mutable PinnableSlice merge_result_; }; // Key used by skip list, as the binary searchable index of WriteBatchWithIndex. @@ -174,6 +179,7 @@ typedef SkipList class WBWIIteratorImpl : public WBWIIterator { public: + enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; WBWIIteratorImpl(uint32_t column_family_id, WriteBatchEntrySkipList* skip_list, const ReadableWriteBatch* write_batch, @@ -245,6 +251,26 @@ class WBWIIteratorImpl : public WBWIIterator { bool MatchesKey(uint32_t cf_id, const Slice& key); + // Moves the to first entry of the previous key. + void PrevKey(); + // Moves the to first entry of the next key. + void NextKey(); + + // Moves the iterator to the Update (Put or Delete) for the current key + // If there are no Put/Delete, the Iterator will point to the first entry for + // this key + // @return kFound if a Put was found for the key + // @return kDeleted if a delete was found for the key + // @return kMergeInProgress if only merges were fouund for the key + // @return kError if an unsupported operation was found for the key + // @return kNotFound if no operations were found for this key + // + Result FindLatestUpdate(const Slice& key, MergeContext* merge_context); + Result FindLatestUpdate(MergeContext* merge_context); + + protected: + void AdvanceKey(bool forward); + private: uint32_t column_family_id_; WriteBatchEntrySkipList::Iterator skip_list_iter_; @@ -257,12 +283,12 @@ class WriteBatchWithIndexInternal { // For GetFromBatchAndDB or similar explicit WriteBatchWithIndexInternal(DB* db, ColumnFamilyHandle* column_family); + // For GetFromBatchAndDB or similar + explicit WriteBatchWithIndexInternal(ColumnFamilyHandle* column_family); // For GetFromBatch or similar explicit WriteBatchWithIndexInternal(const DBOptions* db_options, ColumnFamilyHandle* column_family); - enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; - // If batch contains a value for key, store it in *value and return kFound. // If batch contains a deletion for key, return Deleted. // If batch contains Merge operations as the most recent entry for a key, @@ -271,19 +297,25 @@ class WriteBatchWithIndexInternal { // and return kMergeInProgress // If batch does not contain this key, return kNotFound // Else, return kError on error with error Status stored in *s. - Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key, - std::string* value, bool overwrite_key, Status* s) { - return GetFromBatch(batch, key, &merge_context_, value, overwrite_key, s); + WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch, + const Slice& key, std::string* value, + Status* s) { + return GetFromBatch(batch, key, &merge_context_, value, s); } - Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key, - MergeContext* merge_context, std::string* value, - bool overwrite_key, Status* s); + WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch, + const Slice& key, + MergeContext* merge_context, + std::string* value, Status* s); Status MergeKey(const Slice& key, const Slice* value, std::string* result, - Slice* result_operand = nullptr) { + Slice* result_operand = nullptr) const { return MergeKey(key, value, merge_context_, result, result_operand); } - Status MergeKey(const Slice& key, const Slice* value, MergeContext& context, - std::string* result, Slice* result_operand = nullptr); + Status MergeKey(const Slice& key, const Slice* value, + const MergeContext& context, std::string* result, + Slice* result_operand = nullptr) const; + size_t GetNumOperands() const { return merge_context_.GetNumOperands(); } + MergeContext* GetMergeContext() { return &merge_context_; } + Slice GetOperand(int index) const { return merge_context_.GetOperand(index); } private: DB* db_; diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index dfd16866e..badfc471e 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -10,8 +10,10 @@ #ifndef ROCKSDB_LITE #include "rocksdb/utilities/write_batch_with_index.h" + #include #include + #include "db/column_family.h" #include "port/stack_trace.h" #include "test_util/testharness.h" @@ -19,6 +21,7 @@ #include "util/string_util.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace ROCKSDB_NAMESPACE { @@ -73,9 +76,241 @@ struct TestHandler : public WriteBatch::Handler { return Status::OK(); } }; + +using KVMap = std::map; + +class KVIter : public Iterator { + public: + explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {} + bool Valid() const override { return iter_ != map_->end(); } + void SeekToFirst() override { iter_ = map_->begin(); } + void SeekToLast() override { + if (map_->empty()) { + iter_ = map_->end(); + } else { + iter_ = map_->find(map_->rbegin()->first); + } + } + void Seek(const Slice& k) override { + iter_ = map_->lower_bound(k.ToString()); + } + void SeekForPrev(const Slice& k) override { + iter_ = map_->upper_bound(k.ToString()); + Prev(); + } + void Next() override { ++iter_; } + void Prev() override { + if (iter_ == map_->begin()) { + iter_ = map_->end(); + return; + } + --iter_; + } + Slice key() const override { return iter_->first; } + Slice value() const override { return iter_->second; } + Status status() const override { return Status::OK(); } + + private: + const KVMap* const map_; + KVMap::const_iterator iter_; +}; + +static std::string PrintContents(WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family) { + std::string result; + + WBWIIterator* iter; + if (column_family == nullptr) { + iter = batch->NewIterator(); + } else { + iter = batch->NewIterator(column_family); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + WriteEntry e = iter->Entry(); + + if (e.type == kPutRecord) { + result.append("PUT("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kMergeRecord) { + result.append("MERGE("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kSingleDeleteRecord) { + result.append("SINGLE-DEL("); + result.append(e.key.ToString()); + result.append(")"); + } else { + assert(e.type == kDeleteRecord); + result.append("DEL("); + result.append(e.key.ToString()); + result.append(")"); + } + + result.append(","); + iter->Next(); + } + + delete iter; + return result; +} + +static std::string PrintContents(WriteBatchWithIndex* batch, KVMap* base_map, + ColumnFamilyHandle* column_family) { + std::string result; + + Iterator* iter; + if (column_family == nullptr) { + iter = batch->NewIteratorWithBase(new KVIter(base_map)); + } else { + iter = batch->NewIteratorWithBase(column_family, new KVIter(base_map)); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + assert(iter->status().ok()); + + Slice key = iter->key(); + Slice value = iter->value(); + + result.append(key.ToString()); + result.append(":"); + result.append(value.ToString()); + result.append(","); + + iter->Next(); + } + + delete iter; + return result; +} + +void AssertIter(Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); +} + +void AssertItersMatch(Iterator* iter1, Iterator* iter2) { + ASSERT_EQ(iter1->Valid(), iter2->Valid()); + if (iter1->Valid()) { + ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString()); + ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); + } +} + +void AssertItersEqual(Iterator* iter1, Iterator* iter2) { + iter1->SeekToFirst(); + iter2->SeekToFirst(); + while (iter1->Valid()) { + ASSERT_EQ(iter1->Valid(), iter2->Valid()); + ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString()); + ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); + iter1->Next(); + iter2->Next(); + } + ASSERT_EQ(iter1->Valid(), iter2->Valid()); +} + +void AssertIterEqual(WBWIIteratorImpl* wbwii, + const std::vector& keys) { + wbwii->SeekToFirst(); + for (auto k : keys) { + ASSERT_TRUE(wbwii->Valid()); + ASSERT_EQ(wbwii->Entry().key, k); + wbwii->NextKey(); + } + ASSERT_FALSE(wbwii->Valid()); + wbwii->SeekToLast(); + for (auto kit = keys.rbegin(); kit != keys.rend(); ++kit) { + ASSERT_TRUE(wbwii->Valid()); + ASSERT_EQ(wbwii->Entry().key, *kit); + wbwii->PrevKey(); + } + ASSERT_FALSE(wbwii->Valid()); +} } // namespace anonymous -class WriteBatchWithIndexTest : public testing::Test {}; +class WBWIBaseTest : public testing::Test { + public: + explicit WBWIBaseTest(bool overwrite) : db_(nullptr) { + options_.merge_operator = + MergeOperators::CreateFromStringId("stringappend"); + options_.create_if_missing = true; + dbname_ = test::PerThreadDBPath("write_batch_with_index_test"); + DestroyDB(dbname_, options_); + batch_.reset(new WriteBatchWithIndex(BytewiseComparator(), 20, overwrite)); + } + + virtual ~WBWIBaseTest() { + if (db_ != nullptr) { + ReleaseSnapshot(); + delete db_; + DestroyDB(dbname_, options_); + } + } + + std::string AddToBatch(ColumnFamilyHandle* cf, const std::string& key) { + std::string result; + for (size_t i = 0; i < key.size(); i++) { + if (key[i] == 'd') { + batch_->Delete(cf, key); + result = ""; + } else if (key[i] == 'p') { + result = key + ToString(i); + batch_->Put(cf, key, result); + } else if (key[i] == 'm') { + std::string value = key + ToString(i); + batch_->Merge(cf, key, value); + if (result.empty()) { + result = value; + } else { + result = result + "," + value; + } + } + } + return result; + } + + virtual Status OpenDB() { return DB::Open(options_, dbname_, &db_); } + + void ReleaseSnapshot() { + if (read_opts_.snapshot != nullptr) { + EXPECT_NE(db_, nullptr); + db_->ReleaseSnapshot(read_opts_.snapshot); + read_opts_.snapshot = nullptr; + } + } + + public: + DB* db_; + std::string dbname_; + Options options_; + WriteOptions write_opts_; + ReadOptions read_opts_; + std::unique_ptr batch_; +}; + +class WBWIKeepTest : public WBWIBaseTest { + public: + WBWIKeepTest() : WBWIBaseTest(false) {} +}; + +class WBWIOverwriteTest : public WBWIBaseTest { + public: + WBWIOverwriteTest() : WBWIBaseTest(true) {} +}; +class WriteBatchWithIndexTest : public WBWIBaseTest, + public testing::WithParamInterface { + public: + WriteBatchWithIndexTest() : WBWIBaseTest(GetParam()) {} +}; void TestValueAsSecondaryIndexHelper(std::vector entries, WriteBatchWithIndex* batch) { @@ -273,7 +508,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, } } -TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { +TEST_F(WBWIKeepTest, TestValueAsSecondaryIndex) { Entry entries[] = { {"aaa", "0005", kPutRecord}, {"b", "0002", kPutRecord}, @@ -286,12 +521,12 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { }; std::vector entries_list(entries, entries + 8); - WriteBatchWithIndex batch(nullptr, 20); + batch_.reset(new WriteBatchWithIndex(nullptr, 20, false)); - TestValueAsSecondaryIndexHelper(entries_list, &batch); + TestValueAsSecondaryIndexHelper(entries_list, batch_.get()); // Clear batch and re-run test with new values - batch.Clear(); + batch_->Clear(); Entry new_entries[] = { {"aaa", "0005", kPutRecord}, @@ -306,30 +541,29 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { entries_list = std::vector(new_entries, new_entries + 8); - TestValueAsSecondaryIndexHelper(entries_list, &batch); + TestValueAsSecondaryIndexHelper(entries_list, batch_.get()); } -TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { +TEST_P(WriteBatchWithIndexTest, TestComparatorForCF) { ColumnFamilyHandleImplDummy cf1(6, nullptr); ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20); - ASSERT_OK(batch.Put(&cf1, "ddd", "")); - ASSERT_OK(batch.Put(&cf2, "aaa", "")); - ASSERT_OK(batch.Put(&cf2, "eee", "")); - ASSERT_OK(batch.Put(&cf1, "ccc", "")); - ASSERT_OK(batch.Put(&reverse_cf, "a11", "")); - ASSERT_OK(batch.Put(&cf1, "bbb", "")); + ASSERT_OK(batch_->Put(&cf1, "ddd", "")); + ASSERT_OK(batch_->Put(&cf2, "aaa", "")); + ASSERT_OK(batch_->Put(&cf2, "eee", "")); + ASSERT_OK(batch_->Put(&cf1, "ccc", "")); + ASSERT_OK(batch_->Put(&reverse_cf, "a11", "")); + ASSERT_OK(batch_->Put(&cf1, "bbb", "")); Slice key_slices[] = {"a", "3", "3"}; Slice value_slice = ""; - ASSERT_OK(batch.Put(&reverse_cf, SliceParts(key_slices, 3), - SliceParts(&value_slice, 1))); - ASSERT_OK(batch.Put(&reverse_cf, "a22", "")); + ASSERT_OK(batch_->Put(&reverse_cf, SliceParts(key_slices, 3), + SliceParts(&value_slice, 1))); + ASSERT_OK(batch_->Put(&reverse_cf, "a22", "")); { - std::unique_ptr iter(batch.NewIterator(&cf1)); + std::unique_ptr iter(batch_->NewIterator(&cf1)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -348,7 +582,7 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { } { - std::unique_ptr iter(batch.NewIterator(&cf2)); + std::unique_ptr iter(batch_->NewIterator(&cf2)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -363,7 +597,7 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { } { - std::unique_ptr iter(batch.NewIterator(&reverse_cf)); + std::unique_ptr iter(batch_->NewIterator(&reverse_cf)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(!iter->Valid()); @@ -396,29 +630,28 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { } } -TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { +TEST_F(WBWIOverwriteTest, TestOverwriteKey) { ColumnFamilyHandleImplDummy cf1(6, nullptr); ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); - ASSERT_OK(batch.Put(&cf1, "ddd", "")); - ASSERT_OK(batch.Merge(&cf1, "ddd", "")); - ASSERT_OK(batch.Delete(&cf1, "ddd")); - ASSERT_OK(batch.Put(&cf2, "aaa", "")); - ASSERT_OK(batch.Delete(&cf2, "aaa")); - ASSERT_OK(batch.Put(&cf2, "aaa", "aaa")); - ASSERT_OK(batch.Put(&cf2, "eee", "eee")); - ASSERT_OK(batch.Put(&cf1, "ccc", "")); - ASSERT_OK(batch.Put(&reverse_cf, "a11", "")); - ASSERT_OK(batch.Delete(&cf1, "ccc")); - ASSERT_OK(batch.Put(&reverse_cf, "a33", "a33")); - ASSERT_OK(batch.Put(&reverse_cf, "a11", "a11")); + ASSERT_OK(batch_->Merge(&cf1, "ddd", "")); + ASSERT_OK(batch_->Put(&cf1, "ddd", "")); + ASSERT_OK(batch_->Delete(&cf1, "ddd")); + ASSERT_OK(batch_->Put(&cf2, "aaa", "")); + ASSERT_OK(batch_->Delete(&cf2, "aaa")); + ASSERT_OK(batch_->Put(&cf2, "aaa", "aaa")); + ASSERT_OK(batch_->Put(&cf2, "eee", "eee")); + ASSERT_OK(batch_->Put(&cf1, "ccc", "")); + ASSERT_OK(batch_->Put(&reverse_cf, "a11", "")); + ASSERT_OK(batch_->Delete(&cf1, "ccc")); + ASSERT_OK(batch_->Put(&reverse_cf, "a33", "a33")); + ASSERT_OK(batch_->Put(&reverse_cf, "a11", "a11")); Slice slices[] = {"a", "3", "3"}; - ASSERT_OK(batch.Delete(&reverse_cf, SliceParts(slices, 3))); + ASSERT_OK(batch_->Delete(&reverse_cf, SliceParts(slices, 3))); { - std::unique_ptr iter(batch.NewIterator(&cf1)); + std::unique_ptr iter(batch_->NewIterator(&cf1)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -435,7 +668,7 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { } { - std::unique_ptr iter(batch.NewIterator(&cf2)); + std::unique_ptr iter(batch_->NewIterator(&cf2)); iter->SeekToLast(); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -466,7 +699,7 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { } { - std::unique_ptr iter(batch.NewIterator(&reverse_cf)); + std::unique_ptr iter(batch_->NewIterator(&reverse_cf)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(!iter->Valid()); @@ -500,64 +733,33 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { } } -namespace { -typedef std::map KVMap; - -class KVIter : public Iterator { - public: - explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {} - bool Valid() const override { return iter_ != map_->end(); } - void SeekToFirst() override { iter_ = map_->begin(); } - void SeekToLast() override { - if (map_->empty()) { - iter_ = map_->end(); - } else { - iter_ = map_->find(map_->rbegin()->first); - } - } - void Seek(const Slice& k) override { - iter_ = map_->lower_bound(k.ToString()); - } - void SeekForPrev(const Slice& k) override { - iter_ = map_->upper_bound(k.ToString()); - Prev(); - } - void Next() override { ++iter_; } - void Prev() override { - if (iter_ == map_->begin()) { - iter_ = map_->end(); - return; - } - --iter_; - } - - Slice key() const override { return iter_->first; } - Slice value() const override { return iter_->second; } - Status status() const override { return Status::OK(); } - - private: - const KVMap* const map_; - KVMap::const_iterator iter_; -}; - -void AssertIter(Iterator* iter, const std::string& key, - const std::string& value) { - ASSERT_OK(iter->status()); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(key, iter->key().ToString()); - ASSERT_EQ(value, iter->value().ToString()); +TEST_P(WriteBatchWithIndexTest, TestWBWIIterator) { + ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); + ASSERT_OK(batch_->Put(&cf1, "a", "a1")); + ASSERT_OK(batch_->Put(&cf1, "c", "c1")); + ASSERT_OK(batch_->Put(&cf1, "c", "c2")); + ASSERT_OK(batch_->Put(&cf1, "e", "e1")); + ASSERT_OK(batch_->Put(&cf1, "e", "e2")); + ASSERT_OK(batch_->Put(&cf1, "e", "e3")); + std::unique_ptr iter1( + static_cast(batch_->NewIterator(&cf1))); + std::unique_ptr iter2( + static_cast(batch_->NewIterator(&cf2))); + AssertIterEqual(iter1.get(), {"a", "c", "e"}); + AssertIterEqual(iter2.get(), {}); + ASSERT_OK(batch_->Put(&cf2, "a", "a2")); + ASSERT_OK(batch_->Merge(&cf2, "b", "b1")); + ASSERT_OK(batch_->Merge(&cf2, "b", "b2")); + ASSERT_OK(batch_->Delete(&cf2, "d")); + ASSERT_OK(batch_->Merge(&cf2, "d", "d2")); + ASSERT_OK(batch_->Merge(&cf2, "d", "d3")); + ASSERT_OK(batch_->Delete(&cf2, "f")); + AssertIterEqual(iter1.get(), {"a", "c", "e"}); + AssertIterEqual(iter2.get(), {"a", "b", "d", "f"}); } -void AssertItersEqual(Iterator* iter1, Iterator* iter2) { - ASSERT_EQ(iter1->Valid(), iter2->Valid()); - if (iter1->Valid()) { - ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString()); - ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); - } -} -} // namespace - -TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { +TEST_P(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { std::vector source_strings = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}; for (int rand_seed = 301; rand_seed < 366; rand_seed++) { @@ -566,14 +768,13 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator()); - - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + batch_->Clear(); if (rand_seed % 2 == 0) { - ASSERT_OK(batch.Put(&cf2, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf2, "zoo", "bar")); } if (rand_seed % 4 == 1) { - ASSERT_OK(batch.Put(&cf3, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf3, "zoo", "bar")); } KVMap map; @@ -589,24 +790,24 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { break; case 1: // only delta has it - ASSERT_OK(batch.Put(&cf1, key, value)); + ASSERT_OK(batch_->Put(&cf1, key, value)); map[key] = value; merged_map[key] = value; break; case 2: // both has it. Delta should win - ASSERT_OK(batch.Put(&cf1, key, value)); + ASSERT_OK(batch_->Put(&cf1, key, value)); map[key] = "wrong_value"; merged_map[key] = value; break; case 3: // both has it. Delta is delete - ASSERT_OK(batch.Delete(&cf1, key)); + ASSERT_OK(batch_->Delete(&cf1, key)); map[key] = "wrong_value"; break; case 4: // only delta has it. Delta is delete - ASSERT_OK(batch.Delete(&cf1, key)); + ASSERT_OK(batch_->Delete(&cf1, key)); map[key] = "wrong_value"; break; default: @@ -616,7 +817,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { } std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); std::unique_ptr result_iter(new KVIter(&merged_map)); bool is_valid = false; @@ -672,7 +873,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { } break; } - AssertItersEqual(iter.get(), result_iter.get()); + AssertItersMatch(iter.get(), result_iter.get()); is_valid = iter->Valid(); } @@ -680,18 +881,16 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { } } -TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { +TEST_P(WriteBatchWithIndexTest, TestIteraratorWithBase) { ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); - { KVMap map; map["a"] = "aa"; map["c"] = "cc"; map["e"] = "ee"; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -724,12 +923,12 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { } // Test the case that there is one element in the write batch - ASSERT_OK(batch.Put(&cf2, "zoo", "bar")); - ASSERT_OK(batch.Put(&cf1, "a", "aa")); + ASSERT_OK(batch_->Put(&cf2, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf1, "a", "aa")); { KVMap empty_map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -738,10 +937,10 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { ASSERT_TRUE(!iter->Valid()); } - ASSERT_OK(batch.Delete(&cf1, "b")); - ASSERT_OK(batch.Put(&cf1, "c", "cc")); - ASSERT_OK(batch.Put(&cf1, "d", "dd")); - ASSERT_OK(batch.Delete(&cf1, "e")); + ASSERT_OK(batch_->Delete(&cf1, "b")); + ASSERT_OK(batch_->Put(&cf1, "c", "cc")); + ASSERT_OK(batch_->Put(&cf1, "d", "dd")); + ASSERT_OK(batch_->Delete(&cf1, "e")); { KVMap map; @@ -749,7 +948,7 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { map["cc"] = "cccc"; map["f"] = "ff"; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -807,7 +1006,7 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { { KVMap empty_map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -843,18 +1042,17 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { } } -TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { +TEST_P(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator()); ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); // Test the case that there is one element in the write batch - ASSERT_OK(batch.Put(&cf2, "zoo", "bar")); - ASSERT_OK(batch.Put(&cf1, "a", "aa")); + ASSERT_OK(batch_->Put(&cf2, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf1, "a", "aa")); { KVMap empty_map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -863,11 +1061,11 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { ASSERT_TRUE(!iter->Valid()); } - ASSERT_OK(batch.Put(&cf1, "c", "cc")); + ASSERT_OK(batch_->Put(&cf1, "c", "cc")); { KVMap map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "c", "cc"); @@ -896,11 +1094,12 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { } // default column family - ASSERT_OK(batch.Put("a", "b")); + ASSERT_OK(batch_->Put("a", "b")); { KVMap map; map["b"] = ""; - std::unique_ptr iter(batch.NewIteratorWithBase(new KVIter(&map))); + std::unique_ptr iter( + batch_->NewIteratorWithBase(new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "b"); @@ -929,416 +1128,288 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { } } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatch) { +TEST_P(WriteBatchWithIndexTest, TestGetFromBatch) { Options options; - WriteBatchWithIndex batch; Status s; std::string value; - s = batch.GetFromBatch(options, "b", &value); + s = batch_->GetFromBatch(options_, "b", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Put("a", "a")); - ASSERT_OK(batch.Put("b", "b")); - ASSERT_OK(batch.Put("c", "c")); - ASSERT_OK(batch.Put("a", "z")); - ASSERT_OK(batch.Delete("c")); - ASSERT_OK(batch.Delete("d")); - ASSERT_OK(batch.Delete("e")); - ASSERT_OK(batch.Put("e", "e")); + ASSERT_OK(batch_->Put("a", "a")); + ASSERT_OK(batch_->Put("b", "b")); + ASSERT_OK(batch_->Put("c", "c")); + ASSERT_OK(batch_->Put("a", "z")); + ASSERT_OK(batch_->Delete("c")); + ASSERT_OK(batch_->Delete("d")); + ASSERT_OK(batch_->Delete("e")); + ASSERT_OK(batch_->Put("e", "e")); - s = batch.GetFromBatch(options, "b", &value); + s = batch_->GetFromBatch(options_, "b", &value); ASSERT_OK(s); ASSERT_EQ("b", value); - s = batch.GetFromBatch(options, "a", &value); + s = batch_->GetFromBatch(options_, "a", &value); ASSERT_OK(s); ASSERT_EQ("z", value); - s = batch.GetFromBatch(options, "c", &value); + s = batch_->GetFromBatch(options_, "c", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(options, "d", &value); + s = batch_->GetFromBatch(options_, "d", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(options, "x", &value); + s = batch_->GetFromBatch(options_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(options, "e", &value); + s = batch_->GetFromBatch(options_, "e", &value); ASSERT_OK(s); ASSERT_EQ("e", value); - ASSERT_OK(batch.Merge("z", "z")); + ASSERT_OK(batch_->Merge("z", "z")); - s = batch.GetFromBatch(options, "z", &value); + s = batch_->GetFromBatch(options_, "z", &value); ASSERT_NOK(s); // No merge operator specified. - s = batch.GetFromBatch(options, "b", &value); + s = batch_->GetFromBatch(options_, "b", &value); ASSERT_OK(s); ASSERT_EQ("b", value); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { - DB* db; - Options options; - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - options.create_if_missing = true; - - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchMerge) { + Status s = OpenDB(); ASSERT_OK(s); - ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); - WriteBatchWithIndex batch; + ColumnFamilyHandle* column_family = db_->DefaultColumnFamily(); std::string value; - s = batch.GetFromBatch(options, "x", &value); + s = batch_->GetFromBatch(options_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Put("x", "X")); + ASSERT_OK(batch_->Put("x", "X")); std::string expected = "X"; for (int i = 0; i < 5; i++) { - ASSERT_OK(batch.Merge("x", ToString(i))); + ASSERT_OK(batch_->Merge("x", ToString(i))); expected = expected + "," + ToString(i); if (i % 2 == 0) { - ASSERT_OK(batch.Put("y", ToString(i / 2))); + ASSERT_OK(batch_->Put("y", ToString(i / 2))); } - ASSERT_OK(batch.Merge("z", "z")); + ASSERT_OK(batch_->Merge("z", "z")); - s = batch.GetFromBatch(column_family, options, "x", &value); + s = batch_->GetFromBatch(column_family, options_, "x", &value); ASSERT_OK(s); ASSERT_EQ(expected, value); - s = batch.GetFromBatch(column_family, options, "y", &value); + s = batch_->GetFromBatch(column_family, options_, "y", &value); ASSERT_OK(s); ASSERT_EQ(ToString(i / 2), value); - s = batch.GetFromBatch(column_family, options, "z", &value); + s = batch_->GetFromBatch(column_family, options_, "z", &value); ASSERT_TRUE(s.IsMergeInProgress()); } - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) { - DB* db; - Options options; - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - options.create_if_missing = true; - - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_F(WBWIOverwriteTest, TestGetFromBatchMerge2) { + Status s = OpenDB(); ASSERT_OK(s); - ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); - - // Test batch with overwrite_key=true - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + ColumnFamilyHandle* column_family = db_->DefaultColumnFamily(); std::string value; - s = batch.GetFromBatch(column_family, options, "X", &value); + s = batch_->GetFromBatch(column_family, options_, "X", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Put(column_family, "X", "x")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->Put(column_family, "X", "x")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); ASSERT_EQ("x", value); - ASSERT_OK(batch.Put(column_family, "X", "x2")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->Put(column_family, "X", "x2")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); ASSERT_EQ("x2", value); - ASSERT_OK(batch.Merge(column_family, "X", "aaa")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge(column_family, "X", "aaa")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("x2,aaa", value); - ASSERT_OK(batch.Merge(column_family, "X", "bbb")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge(column_family, "X", "bbb")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("x2,aaa,bbb", value); - ASSERT_OK(batch.Put(column_family, "X", "x3")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->Put(column_family, "X", "x3")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); ASSERT_EQ("x3", value); - ASSERT_OK(batch.Merge(column_family, "X", "ccc")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge(column_family, "X", "ccc")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("x3,ccc", value); - ASSERT_OK(batch.Delete(column_family, "X")); - s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(batch_->Delete(column_family, "X")); + s = batch_->GetFromBatch(column_family, options_, "X", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Merge(column_family, "X", "ddd")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); + batch_->Merge(column_family, "X", "ddd"); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("ddd", value); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { - DB* db; - Options options; - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { + ASSERT_OK(OpenDB()); - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); - ASSERT_OK(s); - - WriteBatchWithIndex batch; - ReadOptions read_options; - WriteOptions write_options; std::string value; - s = db->Put(write_options, "a", "a"); - ASSERT_OK(s); + ASSERT_OK(db_->Put(write_opts_, "a", "a")); + ASSERT_OK(db_->Put(write_opts_, "b", "b")); + ASSERT_OK(db_->Put(write_opts_, "c", "c")); - s = db->Put(write_options, "b", "b"); - ASSERT_OK(s); + ASSERT_OK(batch_->Put("a", "batch_->a")); + ASSERT_OK(batch_->Delete("b")); - s = db->Put(write_options, "c", "c"); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); + ASSERT_EQ("batch_->a", value); - ASSERT_OK(batch.Put("a", "batch.a")); - ASSERT_OK(batch.Delete("b")); - - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); - ASSERT_OK(s); - ASSERT_EQ("batch.a", value); - - s = batch.GetFromBatchAndDB(db, read_options, "b", &value); + Status s = batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatchAndDB(db, read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value)); ASSERT_EQ("c", value); - s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(db->Delete(write_options, "x")); + ASSERT_OK(db_->Delete(write_opts_, "x")); - s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { - DB* db; - Options options; - - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { + Status s = OpenDB(); ASSERT_OK(s); - WriteBatchWithIndex batch; - ReadOptions read_options; - WriteOptions write_options; std::string value; - s = db->Put(write_options, "a", "a0"); - ASSERT_OK(s); + ASSERT_OK(db_->Put(write_opts_, "a", "a0")); + ASSERT_OK(db_->Put(write_opts_, "b", "b0")); + ASSERT_OK(db_->Merge(write_opts_, "b", "b1")); + ASSERT_OK(db_->Merge(write_opts_, "c", "c0")); + ASSERT_OK(db_->Merge(write_opts_, "d", "d0")); - s = db->Put(write_options, "b", "b0"); - ASSERT_OK(s); + ASSERT_OK(batch_->Merge("a", "a1")); + ASSERT_OK(batch_->Merge("a", "a2")); + ASSERT_OK(batch_->Merge("b", "b2")); + ASSERT_OK(batch_->Merge("d", "d1")); + ASSERT_OK(batch_->Merge("e", "e0")); - s = db->Merge(write_options, "b", "b1"); - ASSERT_OK(s); - - s = db->Merge(write_options, "c", "c0"); - ASSERT_OK(s); - - s = db->Merge(write_options, "d", "d0"); - ASSERT_OK(s); - - ASSERT_OK(batch.Merge("a", "a1")); - ASSERT_OK(batch.Merge("a", "a2")); - ASSERT_OK(batch.Merge("b", "b2")); - ASSERT_OK(batch.Merge("d", "d1")); - ASSERT_OK(batch.Merge("e", "e0")); - - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); ASSERT_EQ("a0,a1,a2", value); - s = batch.GetFromBatchAndDB(db, read_options, "b", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value)); ASSERT_EQ("b0,b1,b2", value); - s = batch.GetFromBatchAndDB(db, read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value)); ASSERT_EQ("c0", value); - s = batch.GetFromBatchAndDB(db, read_options, "d", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "d", &value)); ASSERT_EQ("d0,d1", value); - s = batch.GetFromBatchAndDB(db, read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value)); ASSERT_EQ("e0", value); - s = db->Delete(write_options, "x"); - ASSERT_OK(s); + ASSERT_OK(db_->Delete(write_opts_, "x")); - s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - const Snapshot* snapshot = db->GetSnapshot(); + const Snapshot* snapshot = db_->GetSnapshot(); ReadOptions snapshot_read_options; snapshot_read_options.snapshot = snapshot; - s = db->Delete(write_options, "a"); - ASSERT_OK(s); + ASSERT_OK(db_->Delete(write_opts_, "a")); - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); ASSERT_EQ("a1,a2", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); - ASSERT_OK(s); + ASSERT_OK( + s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "a", &value)); ASSERT_EQ("a0,a1,a2", value); - ASSERT_OK(batch.Delete("a")); + ASSERT_OK(batch_->Delete("a")); - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); + s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "a", &value); ASSERT_TRUE(s.IsNotFound()); - s = db->Merge(write_options, "c", "c1"); - ASSERT_OK(s); + ASSERT_OK(s = db_->Merge(write_opts_, "c", "c1")); - s = batch.GetFromBatchAndDB(db, read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK(s = batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value)); ASSERT_EQ("c0,c1", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK( + s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "c", &value)); ASSERT_EQ("c0", value); - s = db->Put(write_options, "e", "e1"); - ASSERT_OK(s); - - s = batch.GetFromBatchAndDB(db, read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(db_->Put(write_opts_, "e", "e1")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value)); ASSERT_EQ("e1,e0", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, snapshot_read_options, "e", &value)); ASSERT_EQ("e0", value); - s = db->Delete(write_options, "e"); - ASSERT_OK(s); - - s = batch.GetFromBatchAndDB(db, read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(s = db_->Delete(write_opts_, "e")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value)); ASSERT_EQ("e0", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, snapshot_read_options, "e", &value)); ASSERT_EQ("e0", value); - db->ReleaseSnapshot(snapshot); - delete db; - EXPECT_OK(DestroyDB(dbname, options)); + db_->ReleaseSnapshot(snapshot); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) { - DB* db; - Options options; - - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_F(WBWIOverwriteTest, TestGetFromBatchAndDBMerge2) { + Status s = OpenDB(); ASSERT_OK(s); - // Test batch with overwrite_key=true - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); - - ReadOptions read_options; - WriteOptions write_options; std::string value; - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Merge("A", "xxx")); + ASSERT_OK(batch_->Merge("A", "xxx")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); + ASSERT_EQ(value, "xxx"); - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge("A", "yyy")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); + ASSERT_EQ(value, "xxx,yyy"); - ASSERT_OK(batch.Merge("A", "yyy")); + ASSERT_OK(db_->Put(write_opts_, "A", "a0")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); + ASSERT_EQ(value, "a0,xxx,yyy"); - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Delete("A")); - s = db->Put(write_options, "A", "a0"); - ASSERT_OK(s); - - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); - - ASSERT_OK(batch.Delete("A")); - - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) { - DB* db; - Options options; - - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) { + Status s = OpenDB(); ASSERT_OK(s); - ReadOptions read_options; - WriteOptions write_options; FlushOptions flush_options; std::string value; - WriteBatchWithIndex batch; + ASSERT_OK(db_->Put(write_opts_, "A", "1")); + ASSERT_OK(db_->Flush(flush_options, db_->DefaultColumnFamily())); + ASSERT_OK(batch_->Merge("A", "2")); - ASSERT_OK(db->Put(write_options, "A", "1")); - ASSERT_OK(db->Flush(flush_options, db->DefaultColumnFamily())); - ASSERT_OK(batch.Merge("A", "2")); - - ASSERT_OK(batch.GetFromBatchAndDB(db, read_options, "A", &value)); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); ASSERT_EQ(value, "1,2"); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } void AssertKey(std::string key, WBWIIterator* iter) { @@ -1353,25 +1424,24 @@ void AssertValue(std::string value, WBWIIterator* iter) { // Tests that we can write to the WBWI while we iterate (from a single thread). // iteration should see the newest writes -TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingCorrectnessTest) { - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); +TEST_F(WBWIOverwriteTest, MutateWhileIteratingCorrectnessTest) { for (char c = 'a'; c <= 'z'; ++c) { - ASSERT_OK(batch.Put(std::string(1, c), std::string(1, c))); + ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c))); } - std::unique_ptr iter(batch.NewIterator()); + std::unique_ptr iter(batch_->NewIterator()); iter->Seek("k"); AssertKey("k", iter.get()); iter->Next(); AssertKey("l", iter.get()); - ASSERT_OK(batch.Put("ab", "cc")); + ASSERT_OK(batch_->Put("ab", "cc")); iter->Next(); AssertKey("m", iter.get()); - ASSERT_OK(batch.Put("mm", "kk")); + ASSERT_OK(batch_->Put("mm", "kk")); iter->Next(); AssertKey("mm", iter.get()); AssertValue("kk", iter.get()); - ASSERT_OK(batch.Delete("mm")); + ASSERT_OK(batch_->Delete("mm")); iter->Next(); AssertKey("n", iter.get()); @@ -1381,7 +1451,7 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingCorrectnessTest) { iter->Seek("ab"); AssertKey("ab", iter.get()); - ASSERT_OK(batch.Delete("x")); + ASSERT_OK(batch_->Delete("x")); iter->Seek("x"); AssertKey("x", iter.get()); ASSERT_EQ(kDeleteRecord, iter->Entry().type); @@ -1400,10 +1470,10 @@ void AssertIterValue(std::string value, Iterator* iter) { } // same thing as above, but testing IteratorWithBase -TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { +TEST_F(WBWIOverwriteTest, MutateWhileIteratingBaseCorrectnessTest) { WriteBatchWithIndex batch(BytewiseComparator(), 0, true); for (char c = 'a'; c <= 'z'; ++c) { - ASSERT_OK(batch.Put(std::string(1, c), std::string(1, c))); + ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c))); } KVMap map; @@ -1412,20 +1482,19 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { map["ee"] = "ee"; map["em"] = "me"; - std::unique_ptr iter( - batch.NewIteratorWithBase(new KVIter(&map))); + std::unique_ptr iter(batch_->NewIteratorWithBase(new KVIter(&map))); iter->Seek("k"); AssertIterKey("k", iter.get()); iter->Next(); AssertIterKey("l", iter.get()); - ASSERT_OK(batch.Put("ab", "cc")); + ASSERT_OK(batch_->Put("ab", "cc")); iter->Next(); AssertIterKey("m", iter.get()); - ASSERT_OK(batch.Put("mm", "kk")); + ASSERT_OK(batch_->Put("mm", "kk")); iter->Next(); AssertIterKey("mm", iter.get()); AssertIterValue("kk", iter.get()); - ASSERT_OK(batch.Delete("mm")); + ASSERT_OK(batch_->Delete("mm")); iter->Next(); AssertIterKey("n", iter.get()); iter->Prev(); @@ -1438,13 +1507,13 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { AssertIterKey("aa", iter.get()); iter->Prev(); AssertIterKey("a", iter.get()); - ASSERT_OK(batch.Delete("aa")); + ASSERT_OK(batch_->Delete("aa")); iter->Next(); AssertIterKey("ab", iter.get()); iter->Prev(); AssertIterKey("a", iter.get()); - ASSERT_OK(batch.Delete("x")); + ASSERT_OK(batch_->Delete("x")); iter->Seek("x"); AssertIterKey("y", iter.get()); iter->Next(); @@ -1453,11 +1522,11 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { iter->Prev(); AssertIterKey("w", iter.get()); - ASSERT_OK(batch.Delete("e")); + ASSERT_OK(batch_->Delete("e")); iter->Seek("e"); AssertIterKey("ee", iter.get()); AssertIterValue("ee", iter.get()); - ASSERT_OK(batch.Put("ee", "xx")); + ASSERT_OK(batch_->Put("ee", "xx")); // still the same value AssertIterValue("ee", iter.get()); iter->Next(); @@ -1470,10 +1539,9 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { } // stress testing mutations with IteratorWithBase -TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); +TEST_F(WBWIOverwriteTest, MutateWhileIteratingBaseStressTest) { for (char c = 'a'; c <= 'z'; ++c) { - ASSERT_OK(batch.Put(std::string(1, c), std::string(1, c))); + ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c))); } KVMap map; @@ -1481,8 +1549,7 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { map[std::string(2, c)] = std::string(2, c); } - std::unique_ptr iter( - batch.NewIteratorWithBase(new KVIter(&map))); + std::unique_ptr iter(batch_->NewIteratorWithBase(new KVIter(&map))); Random rnd(301); for (int i = 0; i < 1000000; ++i) { @@ -1490,16 +1557,16 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { char c = static_cast(rnd.Uniform(26) + 'a'); switch (random) { case 0: - ASSERT_OK(batch.Put(std::string(1, c), "xxx")); + ASSERT_OK(batch_->Put(std::string(1, c), "xxx")); break; case 1: - ASSERT_OK(batch.Put(std::string(2, c), "xxx")); + ASSERT_OK(batch_->Put(std::string(2, c), "xxx")); break; case 2: - ASSERT_OK(batch.Delete(std::string(1, c))); + ASSERT_OK(batch_->Delete(std::string(1, c))); break; case 3: - ASSERT_OK(batch.Delete(std::string(2, c))); + ASSERT_OK(batch_->Delete(std::string(2, c))); break; case 4: iter->Seek(std::string(1, c)); @@ -1524,330 +1591,523 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { ASSERT_OK(iter->status()); } -static void PrintContents(WriteBatchWithIndex* batch, - ColumnFamilyHandle* column_family, - std::string* result) { - WBWIIterator* iter; - if (column_family == nullptr) { - iter = batch->NewIterator(); - } else { - iter = batch->NewIterator(column_family); - } - - iter->SeekToFirst(); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - - WriteEntry e = iter->Entry(); - - if (e.type == kPutRecord) { - result->append("PUT("); - result->append(e.key.ToString()); - result->append("):"); - result->append(e.value.ToString()); - } else if (e.type == kMergeRecord) { - result->append("MERGE("); - result->append(e.key.ToString()); - result->append("):"); - result->append(e.value.ToString()); - } else if (e.type == kSingleDeleteRecord) { - result->append("SINGLE-DEL("); - result->append(e.key.ToString()); - result->append(")"); - } else { - assert(e.type == kDeleteRecord); - result->append("DEL("); - result->append(e.key.ToString()); - result->append(")"); - } - - result->append(","); - iter->Next(); - } - - ASSERT_OK(iter->status()); - - delete iter; -} - -static std::string PrintContents(WriteBatchWithIndex* batch, - ColumnFamilyHandle* column_family) { - std::string result; - PrintContents(batch, column_family, &result); - return result; -} - -static void PrintContents(WriteBatchWithIndex* batch, KVMap* base_map, - ColumnFamilyHandle* column_family, - std::string* result) { - Iterator* iter; - if (column_family == nullptr) { - iter = batch->NewIteratorWithBase(new KVIter(base_map)); - } else { - iter = batch->NewIteratorWithBase(column_family, new KVIter(base_map)); - } - - iter->SeekToFirst(); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - - Slice key = iter->key(); - Slice value = iter->value(); - - result->append(key.ToString()); - result->append(":"); - result->append(value.ToString()); - result->append(","); - - iter->Next(); - } - - ASSERT_OK(iter->status()); - - delete iter; -} - -static std::string PrintContents(WriteBatchWithIndex* batch, KVMap* base_map, - ColumnFamilyHandle* column_family) { - std::string result; - PrintContents(batch, base_map, column_family, &result); - return result; -} - -TEST_F(WriteBatchWithIndexTest, SavePointTest) { - WriteBatchWithIndex batch; +TEST_P(WriteBatchWithIndexTest, SavePointTest) { ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + KVMap empty_map; + std::unique_ptr cf0_iter( + batch_->NewIteratorWithBase(new KVIter(&empty_map))); + std::unique_ptr cf1_iter( + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); Status s; + KVMap kvm_cf0_0 = {{"A", "aa"}, {"B", "b"}}; + KVMap kvm_cf1_0 = {{"A", "a1"}, {"C", "c1"}, {"E", "e1"}}; + KVIter kvi_cf0_0(&kvm_cf0_0); + KVIter kvi_cf1_0(&kvm_cf1_0); - ASSERT_OK(batch.Put("A", "a")); - ASSERT_OK(batch.Put("B", "b")); - ASSERT_OK(batch.Put("A", "aa")); - ASSERT_OK(batch.Put(&cf1, "A", "a1")); - ASSERT_OK(batch.Delete(&cf1, "B")); - ASSERT_OK(batch.Put(&cf1, "C", "c1")); - ASSERT_OK(batch.Put(&cf1, "E", "e1")); + ASSERT_OK(batch_->Put("A", "a")); + ASSERT_OK(batch_->Put("B", "b")); + ASSERT_OK(batch_->Put("A", "aa")); + ASSERT_OK(batch_->Put(&cf1, "A", "a1")); + ASSERT_OK(batch_->Delete(&cf1, "B")); + ASSERT_OK(batch_->Put(&cf1, "C", "c1")); + ASSERT_OK(batch_->Put(&cf1, "E", "e1")); - batch.SetSavePoint(); // 1 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_0); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_0); + batch_->SetSavePoint(); // 1 - ASSERT_OK(batch.Put("C", "cc")); - ASSERT_OK(batch.Put("B", "bb")); - ASSERT_OK(batch.Delete("A")); - ASSERT_OK(batch.Put(&cf1, "B", "b1")); - ASSERT_OK(batch.Delete(&cf1, "A")); - ASSERT_OK(batch.SingleDelete(&cf1, "E")); - batch.SetSavePoint(); // 2 + KVMap kvm_cf0_1 = {{"B", "bb"}, {"C", "cc"}}; + KVMap kvm_cf1_1 = {{"B", "b1"}, {"C", "c1"}}; + KVIter kvi_cf0_1(&kvm_cf0_1); + KVIter kvi_cf1_1(&kvm_cf1_1); - ASSERT_OK(batch.Put("A", "aaa")); - ASSERT_OK(batch.Put("A", "xxx")); - ASSERT_OK(batch.Delete("B")); - ASSERT_OK(batch.Put(&cf1, "B", "b2")); - ASSERT_OK(batch.Delete(&cf1, "C")); - batch.SetSavePoint(); // 3 - batch.SetSavePoint(); // 4 - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.Delete(&cf1, "D")); - ASSERT_OK(batch.Delete(&cf1, "E")); + ASSERT_OK(batch_->Put("C", "cc")); + ASSERT_OK(batch_->Put("B", "bb")); + ASSERT_OK(batch_->Delete("A")); + ASSERT_OK(batch_->Put(&cf1, "B", "b1")); + ASSERT_OK(batch_->Delete(&cf1, "A")); + ASSERT_OK(batch_->SingleDelete(&cf1, "E")); + batch_->SetSavePoint(); // 2 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_1); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_1); - ASSERT_EQ( - "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" - "B)" - ",PUT(C):cc,SINGLE-DEL(D),", - PrintContents(&batch, nullptr)); + KVMap kvm_cf0_2 = {{"A", "xxx"}, {"C", "cc"}}; + KVMap kvm_cf1_2 = {{"B", "b2"}}; + KVIter kvi_cf0_2(&kvm_cf0_2); + KVIter kvi_cf1_2(&kvm_cf1_2); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," - "DEL(D),PUT(E):e1,SINGLE-DEL(E),DEL(E),", - PrintContents(&batch, &cf1)); + ASSERT_OK(batch_->Put("A", "aaa")); + ASSERT_OK(batch_->Put("A", "xxx")); + ASSERT_OK(batch_->Delete("B")); + ASSERT_OK(batch_->Put(&cf1, "B", "b2")); + ASSERT_OK(batch_->Delete(&cf1, "C")); + batch_->SetSavePoint(); // 3 + batch_->SetSavePoint(); // 4 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_2); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_2); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 4 - ASSERT_EQ( - "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" - "B)" - ",PUT(C):cc,", - PrintContents(&batch, nullptr)); + KVMap kvm_cf0_4 = {{"A", "xxx"}, {"C", "cc"}}; + KVMap kvm_cf1_4 = {{"B", "b2"}}; + KVIter kvi_cf0_4(&kvm_cf0_4); + KVIter kvi_cf1_4(&kvm_cf1_4); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->Delete(&cf1, "D")); + ASSERT_OK(batch_->Delete(&cf1, "E")); + AssertItersEqual(cf0_iter.get(), &kvi_cf0_4); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_4); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 4 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_2); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_2); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 3 - ASSERT_EQ( - "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" - "B)" - ",PUT(C):cc,", - PrintContents(&batch, nullptr)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 3 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_2); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_2); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 2 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_1); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_1); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 2 - ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", - PrintContents(&batch, nullptr)); + batch_->SetSavePoint(); // 5 + ASSERT_OK(batch_->Put("X", "x")); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); + KVMap kvm_cf0_5 = {{"B", "bb"}, {"C", "cc"}, {"X", "x"}}; + KVIter kvi_cf0_5(&kvm_cf0_5); + KVIter kvi_cf1_5(&kvm_cf1_1); + AssertItersEqual(cf0_iter.get(), &kvi_cf0_5); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_5); - batch.SetSavePoint(); // 5 - ASSERT_OK(batch.Put("X", "x")); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 5 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_1); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_1); - ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,PUT(X):x,", - PrintContents(&batch, nullptr)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 1 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_0); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_0); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 5 - ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", - PrintContents(&batch, nullptr)); - - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); - - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 1 - ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); - - ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,", - PrintContents(&batch, &cf1)); - - s = batch.RollbackToSavePoint(); // no savepoint found + s = batch_->RollbackToSavePoint(); // no savepoint found ASSERT_TRUE(s.IsNotFound()); - ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + AssertItersEqual(cf0_iter.get(), &kvi_cf0_0); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_0); - ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,", - PrintContents(&batch, &cf1)); + batch_->SetSavePoint(); // 6 - batch.SetSavePoint(); // 6 + batch_->Clear(); + ASSERT_EQ("", PrintContents(batch_.get(), nullptr)); + ASSERT_EQ("", PrintContents(batch_.get(), &cf1)); - batch.Clear(); - ASSERT_EQ("", PrintContents(&batch, nullptr)); - ASSERT_EQ("", PrintContents(&batch, &cf1)); - - s = batch.RollbackToSavePoint(); // rollback to 6 + s = batch_->RollbackToSavePoint(); // rollback to 6 ASSERT_TRUE(s.IsNotFound()); } -TEST_F(WriteBatchWithIndexTest, SingleDeleteTest) { - WriteBatchWithIndex batch; +TEST_P(WriteBatchWithIndexTest, SingleDeleteTest) { Status s; std::string value; - DBOptions db_options; - ASSERT_OK(batch.SingleDelete("A")); + ASSERT_OK(batch_->SingleDelete("A")); - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_TRUE(s.IsNotFound()); - value = PrintContents(&batch, nullptr); - ASSERT_EQ("SINGLE-DEL(A),", value); - batch.Clear(); - ASSERT_OK(batch.Put("A", "a")); - ASSERT_OK(batch.Put("A", "a2")); - ASSERT_OK(batch.Put("B", "b")); - ASSERT_OK(batch.SingleDelete("A")); + batch_->Clear(); + ASSERT_OK(batch_->Put("A", "a")); + ASSERT_OK(batch_->Put("A", "a2")); + ASSERT_OK(batch_->Put("B", "b")); + ASSERT_OK(batch_->SingleDelete("A")); - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_OK(s); ASSERT_EQ("b", value); - value = PrintContents(&batch, nullptr); - ASSERT_EQ("PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(B):b,", value); + ASSERT_OK(batch_->Put("C", "c")); + ASSERT_OK(batch_->Put("A", "a3")); + ASSERT_OK(batch_->Delete("B")); + ASSERT_OK(batch_->SingleDelete("B")); + ASSERT_OK(batch_->SingleDelete("C")); - ASSERT_OK(batch.Put("C", "c")); - ASSERT_OK(batch.Put("A", "a3")); - ASSERT_OK(batch.Delete("B")); - ASSERT_OK(batch.SingleDelete("B")); - ASSERT_OK(batch.SingleDelete("C")); - - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_OK(s); ASSERT_EQ("a3", value); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "C", &value); + s = batch_->GetFromBatch(options_, "C", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "D", &value); + s = batch_->GetFromBatch(options_, "D", &value); ASSERT_TRUE(s.IsNotFound()); - value = PrintContents(&batch, nullptr); - ASSERT_EQ( - "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,PUT(B):b,DEL(B),SINGLE-DEL(B)" - ",PUT(C):c,SINGLE-DEL(C),", - value); + ASSERT_OK(batch_->Put("B", "b4")); + ASSERT_OK(batch_->Put("C", "c4")); + ASSERT_OK(batch_->Put("D", "d4")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->Delete("A")); - ASSERT_OK(batch.Put("B", "b4")); - ASSERT_OK(batch.Put("C", "c4")); - ASSERT_OK(batch.Put("D", "d4")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.Delete("A")); - - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_OK(s); ASSERT_EQ("b4", value); - s = batch.GetFromBatch(db_options, "C", &value); + s = batch_->GetFromBatch(options_, "C", &value); ASSERT_OK(s); ASSERT_EQ("c4", value); - s = batch.GetFromBatch(db_options, "D", &value); + s = batch_->GetFromBatch(options_, "D", &value); ASSERT_TRUE(s.IsNotFound()); - - value = PrintContents(&batch, nullptr); - ASSERT_EQ( - "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,DEL(A),PUT(B):b,DEL(B)," - "SINGLE-DEL(B),PUT(B):b4,PUT(C):c,SINGLE-DEL(C),PUT(C):c4,PUT(D):d4," - "SINGLE-DEL(D),SINGLE-DEL(D),", - value); } -TEST_F(WriteBatchWithIndexTest, SingleDeleteDeltaIterTest) { +TEST_P(WriteBatchWithIndexTest, SingleDeleteDeltaIterTest) { std::string value; - DBOptions db_options; - WriteBatchWithIndex batch(BytewiseComparator(), 20, true /* overwrite_key */); - - ASSERT_OK(batch.Put("A", "a")); - ASSERT_OK(batch.Put("A", "a2")); - ASSERT_OK(batch.Put("B", "b")); - ASSERT_OK(batch.SingleDelete("A")); - ASSERT_OK(batch.Delete("B")); + ASSERT_OK(batch_->Put("A", "a")); + ASSERT_OK(batch_->Put("A", "a2")); + ASSERT_OK(batch_->Put("B", "b")); + ASSERT_OK(batch_->SingleDelete("A")); + ASSERT_OK(batch_->Delete("B")); KVMap map; - value = PrintContents(&batch, &map, nullptr); + value = PrintContents(batch_.get(), &map, nullptr); ASSERT_EQ("", value); map["A"] = "aa"; map["C"] = "cc"; map["D"] = "dd"; - ASSERT_OK(batch.SingleDelete("B")); - ASSERT_OK(batch.SingleDelete("C")); - ASSERT_OK(batch.SingleDelete("Z")); + ASSERT_OK(batch_->SingleDelete("B")); + ASSERT_OK(batch_->SingleDelete("C")); + ASSERT_OK(batch_->SingleDelete("Z")); - value = PrintContents(&batch, &map, nullptr); + value = PrintContents(batch_.get(), &map, nullptr); ASSERT_EQ("D:dd,", value); - ASSERT_OK(batch.Put("A", "a3")); - ASSERT_OK(batch.Put("B", "b3")); - ASSERT_OK(batch.SingleDelete("A")); - ASSERT_OK(batch.SingleDelete("A")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.Delete("D")); + ASSERT_OK(batch_->Put("A", "a3")); + ASSERT_OK(batch_->Put("B", "b3")); + ASSERT_OK(batch_->SingleDelete("A")); + ASSERT_OK(batch_->SingleDelete("A")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->Delete("D")); map["E"] = "ee"; - value = PrintContents(&batch, &map, nullptr); + value = PrintContents(batch_.get(), &map, nullptr); ASSERT_EQ("B:b3,E:ee,", value); } +TEST_P(WriteBatchWithIndexTest, MultiGetTest) { + // MultiGet a lot of keys in order to force std::vector reallocations + std::vector keys; + for (int i = 0; i < 100; ++i) { + keys.emplace_back(std::to_string(i)); + } + + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + // Write some data to the db for the even numbered keys + { + WriteBatch wb; + for (size_t i = 1; i < keys.size(); ++i) { + std::string val = "val" + std::to_string(i); + ASSERT_OK(wb.Put(cf0, keys[i], val)); + } + ASSERT_OK(db_->Write(write_opts_, &wb)); + for (size_t i = 1; i < keys.size(); ++i) { + std::string value; + ASSERT_OK(db_->Get(read_opts_, cf0, keys[i], &value)); + } + } + + // Write some data to the batch + for (size_t i = 0; i < keys.size(); ++i) { + if ((i % 5) == 0) { + ASSERT_OK(batch_->Delete(cf0, keys[i])); + } else if ((i % 7) == 0) { + std::string val = "new" + std::to_string(i); + ASSERT_OK(batch_->Put(cf0, keys[i], val)); + } + if (i > 0 && (i % 3) == 0) { + ASSERT_OK(batch_->Merge(cf0, keys[i], "merge")); + } + } + + std::vector key_slices; + for (size_t i = 0; i < keys.size(); ++i) { + key_slices.emplace_back(keys[i]); + } + std::vector values(keys.size()); + std::vector statuses(keys.size()); + + batch_->MultiGetFromBatchAndDB(db_, read_opts_, cf0, key_slices.size(), + key_slices.data(), values.data(), + statuses.data(), false); + for (size_t i = 0; i < keys.size(); ++i) { + if (i == 0) { + ASSERT_TRUE(statuses[i].IsNotFound()); + } else if ((i % 3) == 0) { + ASSERT_OK(statuses[i]); + if ((i % 5) == 0) { // Merge after Delete + ASSERT_EQ(values[i], "merge"); + } else if ((i % 7) == 0) { // Merge after Put + std::string val = "new" + std::to_string(i); + ASSERT_EQ(values[i], val + ",merge"); + } else { + std::string val = "val" + std::to_string(i); + ASSERT_EQ(values[i], val + ",merge"); + } + } else if ((i % 5) == 0) { + ASSERT_TRUE(statuses[i].IsNotFound()); + } else if ((i % 7) == 0) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], "new" + std::to_string(i)); + } else { + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], "val" + std::to_string(i)); + } + } +} + +// This test has merges, but the merge does not play into the final result +TEST_P(WriteBatchWithIndexTest, FakeMergeWithIteratorTest) { + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + // The map we are starting with + KVMap input = { + {"odm", "odm0"}, + {"omd", "omd0"}, + {"omp", "omp0"}, + }; + KVMap result = { + {"odm", "odm2"}, // Orig, Delete, Merge + {"mp", "mp1"}, // Merge, Put + {"omp", "omp2"}, // Origi, Merge, Put + {"mmp", "mmp2"} // Merge, Merge, Put + }; + + for (auto& iter : result) { + EXPECT_EQ(AddToBatch(cf0, iter.first), iter.second); + } + AddToBatch(cf0, "md"); // Merge, Delete + AddToBatch(cf0, "mmd"); // Merge, Merge, Delete + AddToBatch(cf0, "omd"); // Orig, Merge, Delete + + KVIter kvi(&result); + // First try just the batch + std::unique_ptr iter( + batch_->NewIteratorWithBase(cf0, new KVIter(&input))); + AssertItersEqual(iter.get(), &kvi); +} + +TEST_P(WriteBatchWithIndexTest, IteratorMergeTest) { + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + KVMap result = { + {"m", "m0"}, // Merge + {"mm", "mm0,mm1"}, // Merge, Merge + {"dm", "dm1"}, // Delete, Merge + {"dmm", "dmm1,dmm2"}, // Delete, Merge, Merge + {"mdm", "mdm2"}, // Merge, Delete, Merge + {"mpm", "mpm1,mpm2"}, // Merge, Put, Merge + {"pm", "pm0,pm1"}, // Put, Merge + {"pmm", "pmm0,pmm1,pmm2"}, // Put, Merge, Merge + }; + + for (auto& iter : result) { + EXPECT_EQ(AddToBatch(cf0, iter.first), iter.second); + } + + KVIter kvi(&result); + // First try just the batch + KVMap empty_map; + std::unique_ptr iter( + batch_->NewIteratorWithBase(cf0, new KVIter(&empty_map))); + AssertItersEqual(iter.get(), &kvi); +} + +TEST_P(WriteBatchWithIndexTest, IteratorMergeTestWithOrig) { + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + KVMap original; + KVMap results = { + {"m", "om,m0"}, // Merge + {"mm", "omm,mm0,mm1"}, // Merge, Merge + {"dm", "dm1"}, // Delete, Merge + {"dmm", "dmm1,dmm2"}, // Delete, Merge, Merge + {"mdm", "mdm2"}, // Merge, Delete, Merge + {"mpm", "mpm1,mpm2"}, // Merge, Put, Merge + {"pm", "pm0,pm1"}, // Put, Merge + {"pmm", "pmm0,pmm1,pmm2"}, // Put, Merge, Merge + }; + + for (auto& iter : results) { + AddToBatch(cf0, iter.first); + original[iter.first] = "o" + iter.first; + } + + KVIter kvi(&results); + // First try just the batch + std::unique_ptr iter( + batch_->NewIteratorWithBase(cf0, new KVIter(&original))); + AssertItersEqual(iter.get(), &kvi); +} + +TEST_P(WriteBatchWithIndexTest, GetFromBatchAfterMerge) { + std::string value; + Status s; + + ASSERT_OK(OpenDB()); + ASSERT_OK(db_->Put(write_opts_, "o", "aa")); + batch_->Merge("o", "bb"); // Merging bb under key "o" + batch_->Merge("m", "cc"); // Merging bc under key "m" + s = batch_->GetFromBatch(options_, "m", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + s = batch_->GetFromBatch(options_, "o", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + + ASSERT_OK(db_->Write(write_opts_, batch_->GetWriteBatch())); + ASSERT_OK(db_->Get(read_opts_, "o", &value)); + ASSERT_EQ(value, "aa,bb"); + ASSERT_OK(db_->Get(read_opts_, "m", &value)); + ASSERT_EQ(value, "cc"); +} + +TEST_P(WriteBatchWithIndexTest, GetFromBatchAndDBAfterMerge) { + std::string value; + + ASSERT_OK(OpenDB()); + ASSERT_OK(db_->Put(write_opts_, "o", "aa")); + ASSERT_OK(batch_->Merge("o", "bb")); // Merging bb under key "o" + ASSERT_OK(batch_->Merge("m", "cc")); // Merging bc under key "m" + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "o", &value)); + ASSERT_EQ(value, "aa,bb"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "m", &value)); + ASSERT_EQ(value, "cc"); +} + +TEST_F(WBWIKeepTest, GetAfterPut) { + std::string value; + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + ASSERT_OK(db_->Put(write_opts_, "key", "orig")); + + ASSERT_OK(batch_->Put("key", "aa")); // Writing aa under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "aa"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa"); + + ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "aa,bb"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa,bb"); + + ASSERT_OK(batch_->Merge("key", "cc")); // Merging cc under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "aa,bb,cc"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa,bb,cc"); +} + +TEST_P(WriteBatchWithIndexTest, GetAfterMergePut) { + std::string value; + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + ASSERT_OK(db_->Put(write_opts_, "key", "orig")); + + ASSERT_OK(batch_->Merge("key", "aa")); // Merging aa under key + Status s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "orig,aa"); + + ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key + s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "orig,aa,bb"); + + ASSERT_OK(batch_->Put("key", "cc")); // Writing cc under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc"); + + ASSERT_OK(batch_->Merge("key", "dd")); // Merging dd under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); +} + +TEST_P(WriteBatchWithIndexTest, GetAfterMergeDelete) { + std::string value; + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + ASSERT_OK(batch_->Merge("key", "aa")); // Merging aa under key + Status s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa"); + + ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key + s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa,bb"); + + ASSERT_OK(batch_->Delete("key")); // Delete key from batch + s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_TRUE(s.IsNotFound()); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value); + ASSERT_TRUE(s.IsNotFound()); + + ASSERT_OK(batch_->Merge("key", "cc")); // Merging cc under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc"); + ASSERT_OK(batch_->Merge("key", "dd")); // Merging dd under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); +} + +TEST_F(WBWIOverwriteTest, TestBadMergeOperator) { + class FailingMergeOperator : public MergeOperator { + public: + FailingMergeOperator() {} + + bool FullMergeV2(const MergeOperationInput& /*merge_in*/, + MergeOperationOutput* /*merge_out*/) const override { + return false; + } + + const char* Name() const override { return "Failing"; } + }; + options_.merge_operator.reset(new FailingMergeOperator()); + ASSERT_OK(OpenDB()); + + ColumnFamilyHandle* column_family = db_->DefaultColumnFamily(); + std::string value; + + ASSERT_OK(db_->Put(write_opts_, "a", "a0")); + ASSERT_OK(batch_->Put("b", "b0")); + + ASSERT_OK(batch_->Merge("a", "a1")); + ASSERT_NOK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); + ASSERT_NOK(batch_->GetFromBatch(column_family, options_, "a", &value)); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value)); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "b", &value)); +} + +INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {