diff --git a/HISTORY.md b/HISTORY.md index e0e4ffec3..163c6788f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -34,6 +34,8 @@ * `CompactFiles()` can no longer compact files from lower level to up level, which has the risk to corrupt DB (details: #8063). The validation is also added to all compactions. * Fixed some cases in which DB::OpenForReadOnly() could write to the filesystem. If you want a Logger with a read-only DB, you must now set DBOptions::info_log yourself, such as using CreateLoggerFromOptions(). * get_iostats_context() will never return nullptr. If thread-local support is not available, and user does not opt-out iostats context, then compilation will fail. The same applies to perf context as well. +* Added support for WriteBatchWithIndex::NewIteratorWithBase when overwrite_key=false. Previously, this combination was not supported and would assert or return nullptr. +* Improve the behavior of WriteBatchWithIndex for Merge operations. Now more operations may be stored in order to return the correct merged result. ### Bug Fixes * Use thread-safe `strerror_r()` to get error messages. diff --git a/db/merge_context.h b/db/merge_context.h index e1869a341..925bfc0e0 100644 --- a/db/merge_context.h +++ b/db/merge_context.h @@ -68,7 +68,7 @@ class MergeContext { } // Get the operand at the index. - Slice GetOperand(int index) { + Slice GetOperand(int index) const { assert(operand_list_); SetDirectionForward(); @@ -76,13 +76,21 @@ class MergeContext { } // Same as GetOperandsDirectionForward - const std::vector& GetOperands() { + // + // Note that the returned reference is only good until another call + // to this MergeContext. If the returned value is needed for longer, + // a copy must be made. + const std::vector& GetOperands() const { return GetOperandsDirectionForward(); } // Return all the operands in the order as they were merged (passed to // FullMerge or FullMergeV2) - const std::vector& GetOperandsDirectionForward() { + // + // Note that the returned reference is only good until another call + // to this MergeContext. If the returned value is needed for longer, + // a copy must be made. + const std::vector& GetOperandsDirectionForward() const { if (!operand_list_) { return empty_operand_list; } @@ -93,7 +101,11 @@ class MergeContext { // Return all the operands in the reversed order relative to how they were // merged (passed to FullMerge or FullMergeV2) - const std::vector& GetOperandsDirectionBackward() { + // + // Note that the returned reference is only good until another call + // to this MergeContext. If the returned value is needed for longer, + // a copy must be made. + const std::vector& GetOperandsDirectionBackward() const { if (!operand_list_) { return empty_operand_list; } @@ -110,14 +122,14 @@ class MergeContext { } } - void SetDirectionForward() { + void SetDirectionForward() const { if (operands_reversed_ == true) { std::reverse(operand_list_->begin(), operand_list_->end()); operands_reversed_ = false; } } - void SetDirectionBackward() { + void SetDirectionBackward() const { if (operands_reversed_ == false) { std::reverse(operand_list_->begin(), operand_list_->end()); operands_reversed_ = true; @@ -125,10 +137,10 @@ class MergeContext { } // List of operands - std::unique_ptr> operand_list_; + mutable std::unique_ptr> operand_list_; // Copy of operands that are not pinned. std::unique_ptr>> copied_operands_; - bool operands_reversed_ = true; + mutable bool operands_reversed_ = true; }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 49fa99d7d..1b713acc8 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -306,7 +306,8 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) { Iterator* db_iter = db_->NewIterator(read_options); assert(db_iter); - return write_batch_.NewIteratorWithBase(db_iter); + return write_batch_.NewIteratorWithBase(db_->DefaultColumnFamily(), db_iter, + &read_options); } Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index def6d0f6b..81a1e3a61 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2814,7 +2814,8 @@ TEST_P(TransactionTest, MultiGetBatchedTest) { ASSERT_TRUE(statuses[1].IsNotFound()); ASSERT_TRUE(statuses[2].ok()); ASSERT_EQ(values[2], "val3_new"); - ASSERT_TRUE(statuses[3].IsMergeInProgress()); + ASSERT_TRUE(statuses[3].ok()); + ASSERT_EQ(values[3], "foo,bar"); ASSERT_TRUE(statuses[4].ok()); ASSERT_EQ(values[4], "val5"); ASSERT_TRUE(statuses[5].ok()); @@ -4839,7 +4840,8 @@ TEST_P(TransactionTest, MergeTest) { ASSERT_OK(s); s = txn->Get(read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(s); + ASSERT_EQ("a0,1,2", value); s = txn->Put("A", "a"); ASSERT_OK(s); @@ -4852,7 +4854,8 @@ TEST_P(TransactionTest, MergeTest) { ASSERT_OK(s); s = txn->Get(read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(s); + ASSERT_EQ("a,3", value); TransactionOptions txn_options; txn_options.lock_timeout = 1; // 1 ms diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 809d3d04c..97b565a13 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -53,13 +53,16 @@ struct WriteBatchWithIndex::Rep { // In overwrite mode, find the existing entry for the same key and update it // to point to the current entry. // Return true if the key is found and updated. - bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); - bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); + bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key, + WriteType type); + bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key, + WriteType type); // Add the recent entry to the update. // In overwrite mode, if key already exists in the index, update it. - void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); - void AddOrUpdateIndex(const Slice& key); + void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key, + WriteType type); + void AddOrUpdateIndex(const Slice& key, WriteType type); // Allocate an index entry pointing to the last entry in the write batch and // put it to skip list. @@ -75,13 +78,13 @@ struct WriteBatchWithIndex::Rep { }; bool WriteBatchWithIndex::Rep::UpdateExistingEntry( - ColumnFamilyHandle* column_family, const Slice& key) { + ColumnFamilyHandle* column_family, const Slice& key, WriteType type) { uint32_t cf_id = GetColumnFamilyID(column_family); - return UpdateExistingEntryWithCfId(cf_id, key); + return UpdateExistingEntryWithCfId(cf_id, key, type); } bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( - uint32_t column_family_id, const Slice& key) { + uint32_t column_family_id, const Slice& key, WriteType type) { if (!overwrite_key) { return false; } @@ -91,9 +94,16 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( iter.Seek(key); if (!iter.Valid()) { return false; - } - if (!iter.MatchesKey(column_family_id, key)) { + } else if (!iter.MatchesKey(column_family_id, key)) { return false; + } else { + // Move to the end of this key (NextKey-Prev) + iter.NextKey(); // Move to the next key + if (iter.Valid()) { + iter.Prev(); // Move back one entry + } else { + iter.SeekToLast(); + } } WriteBatchIndexEntry* non_const_entry = const_cast(iter.GetRawEntry()); @@ -101,13 +111,17 @@ bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( last_sub_batch_offset = last_entry_offset; sub_batch_cnt++; } - non_const_entry->offset = last_entry_offset; - return true; + if (type == kMergeRecord) { + return false; + } else { + non_const_entry->offset = last_entry_offset; + return true; + } } void WriteBatchWithIndex::Rep::AddOrUpdateIndex( - ColumnFamilyHandle* column_family, const Slice& key) { - if (!UpdateExistingEntry(column_family, key)) { + ColumnFamilyHandle* column_family, const Slice& key, WriteType type) { + if (!UpdateExistingEntry(column_family, key, type)) { uint32_t cf_id = GetColumnFamilyID(column_family); const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); if (cf_cmp != nullptr) { @@ -117,8 +131,9 @@ void WriteBatchWithIndex::Rep::AddOrUpdateIndex( } } -void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { - if (!UpdateExistingEntryWithCfId(0, key)) { +void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key, + WriteType type) { + if (!UpdateExistingEntryWithCfId(0, key, type)) { AddNewEntry(0); } } @@ -190,14 +205,31 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() { switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key, kPutRecord)) { + AddNewEntry(column_family_id); + } + break; case kTypeColumnFamilyDeletion: case kTypeDeletion: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key, + kDeleteRecord)) { + AddNewEntry(column_family_id); + } + break; case kTypeColumnFamilySingleDeletion: case kTypeSingleDeletion: + found++; + if (!UpdateExistingEntryWithCfId(column_family_id, key, + kSingleDeleteRecord)) { + AddNewEntry(column_family_id); + } + break; case kTypeColumnFamilyMerge: case kTypeMerge: found++; - if (!UpdateExistingEntryWithCfId(column_family_id, key)) { + if (!UpdateExistingEntryWithCfId(column_family_id, key, kMergeRecord)) { AddNewEntry(column_family_id); } break; @@ -255,22 +287,19 @@ WBWIIterator* WriteBatchWithIndex::NewIterator( Iterator* WriteBatchWithIndex::NewIteratorWithBase( ColumnFamilyHandle* column_family, Iterator* base_iterator, const ReadOptions* read_options) { - if (rep->overwrite_key == false) { - assert(false); - return nullptr; - } - return new BaseDeltaIterator(base_iterator, NewIterator(column_family), + auto wbwiii = + new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list), + &rep->write_batch, &rep->comparator); + return new BaseDeltaIterator(column_family, base_iterator, wbwiii, GetColumnFamilyUserComparator(column_family), read_options); } Iterator* WriteBatchWithIndex::NewIteratorWithBase(Iterator* base_iterator) { - if (rep->overwrite_key == false) { - assert(false); - return nullptr; - } // default column family's comparator - return new BaseDeltaIterator(base_iterator, NewIterator(), + auto wbwiii = new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch, + &rep->comparator); + return new BaseDeltaIterator(nullptr, base_iterator, wbwiii, rep->comparator.default_comparator()); } @@ -279,7 +308,7 @@ Status WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Put(column_family, key, value); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kPutRecord); } return s; } @@ -288,7 +317,7 @@ Status WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Put(key, value); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kPutRecord); } return s; } @@ -298,7 +327,7 @@ Status WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Delete(column_family, key); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kDeleteRecord); } return s; } @@ -307,7 +336,7 @@ Status WriteBatchWithIndex::Delete(const Slice& key) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Delete(key); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kDeleteRecord); } return s; } @@ -317,7 +346,7 @@ Status WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.SingleDelete(column_family, key); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kSingleDeleteRecord); } return s; } @@ -326,7 +355,7 @@ Status WriteBatchWithIndex::SingleDelete(const Slice& key) { rep->SetLastEntryOffset(); auto s = rep->write_batch.SingleDelete(key); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kSingleDeleteRecord); } return s; } @@ -336,7 +365,7 @@ Status WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(column_family, key, value); if (s.ok()) { - rep->AddOrUpdateIndex(column_family, key); + rep->AddOrUpdateIndex(column_family, key, kMergeRecord); } return s; } @@ -345,7 +374,7 @@ Status WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); auto s = rep->write_batch.Merge(key, value); if (s.ok()) { - rep->AddOrUpdateIndex(key); + rep->AddOrUpdateIndex(key, kMergeRecord); } return s; } @@ -361,18 +390,18 @@ Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { Status s; WriteBatchWithIndexInternal wbwii(&options, column_family); - auto result = wbwii.GetFromBatch(this, key, value, rep->overwrite_key, &s); + auto result = wbwii.GetFromBatch(this, key, value, &s); switch (result) { - case WriteBatchWithIndexInternal::Result::kFound: - case WriteBatchWithIndexInternal::Result::kError: + case WBWIIteratorImpl::kFound: + case WBWIIteratorImpl::kError: // use returned status break; - case WriteBatchWithIndexInternal::Result::kDeleted: - case WriteBatchWithIndexInternal::Result::kNotFound: + case WBWIIteratorImpl::kDeleted: + case WBWIIteratorImpl::kNotFound: s = Status::NotFound(); break; - case WriteBatchWithIndexInternal::Result::kMergeInProgress: + case WBWIIteratorImpl::kMergeInProgress: s = Status::MergeInProgress(); break; default: @@ -440,29 +469,18 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( // we cannot pin it as otherwise the returned value will not be available // after the transaction finishes. std::string& batch_value = *pinnable_val->GetSelf(); - auto result = - wbwii.GetFromBatch(this, key, &batch_value, rep->overwrite_key, &s); + auto result = wbwii.GetFromBatch(this, key, &batch_value, &s); - if (result == WriteBatchWithIndexInternal::Result::kFound) { + if (result == WBWIIteratorImpl::kFound) { pinnable_val->PinSelf(); return s; - } - if (result == WriteBatchWithIndexInternal::Result::kDeleted) { + } else if (!s.ok() || result == WBWIIteratorImpl::kError) { + return s; + } else if (result == WBWIIteratorImpl::kDeleted) { return Status::NotFound(); } - if (result == WriteBatchWithIndexInternal::Result::kError) { - return s; - } - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && - rep->overwrite_key == true) { - // Since we've overwritten keys, we do not know what other operations are - // in this batch for this key, so we cannot do a Merge to compute the - // result. Instead, we will simply return MergeInProgress. - return Status::MergeInProgress(); - } - - assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || - result == WriteBatchWithIndexInternal::Result::kNotFound); + assert(result == WBWIIteratorImpl::kMergeInProgress || + result == WBWIIteratorImpl::kNotFound); // Did not find key in batch OR could not resolve Merges. Try DB. if (!callback) { @@ -477,7 +495,7 @@ Status WriteBatchWithIndex::GetFromBatchAndDB( } if (s.ok() || s.IsNotFound()) { // DB Get Succeeded - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { + if (result == WBWIIteratorImpl::kMergeInProgress) { // Merge result from DB with merges in Batch std::string merge_result; if (s.ok()) { @@ -513,7 +531,7 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( autovector key_context; autovector sorted_keys; // To hold merges from the write batch - autovector, + autovector, MultiGetContext::MAX_BATCH_SIZE> merges; // Since the lifetime of the WriteBatch is the same as that of the transaction @@ -524,31 +542,22 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( PinnableSlice* pinnable_val = &values[i]; std::string& batch_value = *pinnable_val->GetSelf(); Status* s = &statuses[i]; - auto result = wbwii.GetFromBatch(this, keys[i], &merge_context, - &batch_value, rep->overwrite_key, s); + auto result = + wbwii.GetFromBatch(this, keys[i], &merge_context, &batch_value, s); - if (result == WriteBatchWithIndexInternal::Result::kFound) { + if (result == WBWIIteratorImpl::kFound) { pinnable_val->PinSelf(); continue; } - if (result == WriteBatchWithIndexInternal::Result::kDeleted) { + if (result == WBWIIteratorImpl::kDeleted) { *s = Status::NotFound(); continue; } - if (result == WriteBatchWithIndexInternal::Result::kError) { + if (result == WBWIIteratorImpl::kError) { continue; } - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && - rep->overwrite_key == true) { - // Since we've overwritten keys, we do not know what other operations are - // in this batch for this key, so we cannot do a Merge to compute the - // result. Instead, we will simply return MergeInProgress. - *s = Status::MergeInProgress(); - continue; - } - - assert(result == WriteBatchWithIndexInternal::Result::kMergeInProgress || - result == WriteBatchWithIndexInternal::Result::kNotFound); + assert(result == WBWIIteratorImpl::kMergeInProgress || + result == WBWIIteratorImpl::kNotFound); key_context.emplace_back(column_family, keys[i], &values[i], /*timestamp*/ nullptr, &statuses[i]); merges.emplace_back(result, std::move(merge_context)); @@ -569,10 +578,9 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( KeyContext& key = *iter; if (key.s->ok() || key.s->IsNotFound()) { // DB Get Succeeded size_t index = iter - key_context.begin(); - std::pair& - merge_result = merges[index]; - if (merge_result.first == - WriteBatchWithIndexInternal::Result::kMergeInProgress) { + std::pair& merge_result = + merges[index]; + if (merge_result.first == WBWIIteratorImpl::kMergeInProgress) { // Merge result from DB with merges in Batch if (key.s->ok()) { *key.s = wbwii.MergeKey(*key.key, iter->value, merge_result.second, diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index fb706bf88..81674cf09 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -19,8 +19,9 @@ #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { -BaseDeltaIterator::BaseDeltaIterator(Iterator* base_iterator, - WBWIIterator* delta_iterator, +BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family, + Iterator* base_iterator, + WBWIIteratorImpl* delta_iterator, const Comparator* comparator, const ReadOptions* read_options) : forward_(true), @@ -31,7 +32,9 @@ BaseDeltaIterator::BaseDeltaIterator(Iterator* base_iterator, delta_iterator_(delta_iterator), comparator_(comparator), iterate_upper_bound_(read_options ? read_options->iterate_upper_bound - : nullptr) {} + : nullptr) { + wbwii_.reset(new WriteBatchWithIndexInternal(column_family)); +} bool BaseDeltaIterator::Valid() const { return status_.ok() ? (current_at_base_ ? BaseValid() : DeltaValid()) : false; @@ -144,8 +147,32 @@ Slice BaseDeltaIterator::key() const { } Slice BaseDeltaIterator::value() const { - return current_at_base_ ? base_iterator_->value() - : delta_iterator_->Entry().value; + if (current_at_base_) { + return base_iterator_->value(); + } else { + WriteEntry delta_entry = delta_iterator_->Entry(); + if (wbwii_->GetNumOperands() == 0) { + return delta_entry.value; + } else if (delta_entry.type == kDeleteRecord || + delta_entry.type == kSingleDeleteRecord) { + status_ = + wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf()); + } else if (delta_entry.type == kPutRecord) { + status_ = wbwii_->MergeKey(delta_entry.key, &delta_entry.value, + merge_result_.GetSelf()); + } else if (delta_entry.type == kMergeRecord) { + if (equal_keys_) { + Slice base_value = base_iterator_->value(); + status_ = wbwii_->MergeKey(delta_entry.key, &base_value, + merge_result_.GetSelf()); + } else { + status_ = + wbwii_->MergeKey(delta_entry.key, nullptr, merge_result_.GetSelf()); + } + } + merge_result_.PinSelf(); + return merge_result_; + } } Status BaseDeltaIterator::status() const { @@ -228,12 +255,11 @@ void BaseDeltaIterator::Advance() { void BaseDeltaIterator::AdvanceDelta() { if (forward_) { - delta_iterator_->Next(); + delta_iterator_->NextKey(); } else { - delta_iterator_->Prev(); + delta_iterator_->PrevKey(); } } - void BaseDeltaIterator::AdvanceBase() { if (forward_) { base_iterator_->Next(); @@ -243,17 +269,18 @@ void BaseDeltaIterator::AdvanceBase() { } bool BaseDeltaIterator::BaseValid() const { return base_iterator_->Valid(); } - bool BaseDeltaIterator::DeltaValid() const { return delta_iterator_->Valid(); } - void BaseDeltaIterator::UpdateCurrent() { // Suppress false positive clang analyzer warnings. #ifndef __clang_analyzer__ status_ = Status::OK(); while (true) { + auto delta_result = WBWIIteratorImpl::kNotFound; WriteEntry delta_entry; if (DeltaValid()) { assert(delta_iterator_->status().ok()); + delta_result = + delta_iterator_->FindLatestUpdate(wbwii_->GetMergeContext()); delta_entry = delta_iterator_->Entry(); } else if (!delta_iterator_->status().ok()) { // Expose the error status and stop. @@ -279,8 +306,8 @@ void BaseDeltaIterator::UpdateCurrent() { return; } } - if (delta_entry.type == kDeleteRecord || - delta_entry.type == kSingleDeleteRecord) { + if (delta_result == WBWIIteratorImpl::kDeleted && + wbwii_->GetNumOperands() == 0) { AdvanceDelta(); } else { current_at_base_ = false; @@ -298,8 +325,8 @@ void BaseDeltaIterator::UpdateCurrent() { if (compare == 0) { equal_keys_ = true; } - if (delta_entry.type != kDeleteRecord && - delta_entry.type != kSingleDeleteRecord) { + if (delta_result != WBWIIteratorImpl::kDeleted || + wbwii_->GetNumOperands() > 0) { current_at_base_ = false; return; } @@ -319,9 +346,105 @@ void BaseDeltaIterator::UpdateCurrent() { #endif // __clang_analyzer__ } -class Env; -class Logger; -class Statistics; +void WBWIIteratorImpl::AdvanceKey(bool forward) { + if (Valid()) { + Slice key = Entry().key; + do { + if (forward) { + Next(); + } else { + Prev(); + } + } while (MatchesKey(column_family_id_, key)); + } +} + +void WBWIIteratorImpl::NextKey() { AdvanceKey(true); } + +void WBWIIteratorImpl::PrevKey() { + AdvanceKey(false); // Move to the tail of the previous key + if (Valid()) { + AdvanceKey(false); // Move back another key. Now we are at the start of + // the previous key + if (Valid()) { // Still a valid + Next(); // Move forward one onto this key + } else { + SeekToFirst(); // Not valid, move to the start + } + } +} + +WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate( + MergeContext* merge_context) { + if (Valid()) { + Slice key = Entry().key; + return FindLatestUpdate(key, merge_context); + } else { + merge_context->Clear(); // Clear any entries in the MergeContext + return WBWIIteratorImpl::kNotFound; + } +} + +WBWIIteratorImpl::Result WBWIIteratorImpl::FindLatestUpdate( + const Slice& key, MergeContext* merge_context) { + Result result = WBWIIteratorImpl::kNotFound; + merge_context->Clear(); // Clear any entries in the MergeContext + // TODO(agiardullo): consider adding support for reverse iteration + if (!Valid()) { + return result; + } else if (comparator_->CompareKey(column_family_id_, Entry().key, key) != + 0) { + return result; + } else { + // We want to iterate in the reverse order that the writes were added to the + // batch. Since we don't have a reverse iterator, we must seek past the + // end. We do this by seeking to the next key, and then back one step + NextKey(); + if (Valid()) { + Prev(); + } else { + SeekToLast(); + } + + // We are at the end of the iterator for this key. Search backwards for the + // last Put or Delete, accumulating merges along the way. + while (Valid()) { + const WriteEntry entry = Entry(); + if (comparator_->CompareKey(column_family_id_, entry.key, key) != 0) { + break; // Unexpected error or we've reached a different next key + } + + switch (entry.type) { + case kPutRecord: + return WBWIIteratorImpl::kFound; + case kDeleteRecord: + return WBWIIteratorImpl::kDeleted; + case kSingleDeleteRecord: + return WBWIIteratorImpl::kDeleted; + case kMergeRecord: + result = WBWIIteratorImpl::kMergeInProgress; + merge_context->PushOperand(entry.value); + break; + case kLogDataRecord: + break; // ignore + case kXIDRecord: + break; // ignore + default: + return WBWIIteratorImpl::kError; + } // end switch statement + Prev(); + } // End while Valid() + // At this point, we have been through the whole list and found no Puts or + // Deletes. The iterator points to the previous key. Move the iterator back + // onto this one. + if (Valid()) { + Next(); + } else { + SeekToFirst(); + } + } + return result; +} Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, @@ -479,6 +602,10 @@ bool WBWIIteratorImpl::MatchesKey(uint32_t cf_id, const Slice& key) { } } +WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( + ColumnFamilyHandle* column_family) + : db_(nullptr), db_options_(nullptr), column_family_(column_family) {} + WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( DB* db, ColumnFamilyHandle* column_family) : db_(db), db_options_(nullptr), column_family_(column_family) { @@ -493,9 +620,9 @@ WriteBatchWithIndexInternal::WriteBatchWithIndexInternal( Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, const Slice* value, - MergeContext& merge_context, + const MergeContext& context, std::string* result, - Slice* result_operand) { + Slice* result_operand) const { if (column_family_ != nullptr) { auto cfh = static_cast_with_check(column_family_); const auto merge_operator = cfh->cfd()->ioptions()->merge_operator.get(); @@ -509,133 +636,66 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key, Statistics* statistics = immutable_db_options.statistics.get(); Logger* logger = immutable_db_options.info_log.get(); SystemClock* clock = immutable_db_options.clock; - return MergeHelper::TimedFullMerge( - merge_operator, key, value, merge_context.GetOperands(), result, - logger, statistics, clock, result_operand); + return MergeHelper::TimedFullMerge(merge_operator, key, value, + context.GetOperands(), result, logger, + statistics, clock, result_operand); } else if (db_options_ != nullptr) { Statistics* statistics = db_options_->statistics.get(); Env* env = db_options_->env; Logger* logger = db_options_->info_log.get(); SystemClock* clock = env->GetSystemClock().get(); - return MergeHelper::TimedFullMerge( - merge_operator, key, value, merge_context.GetOperands(), result, - logger, statistics, clock, result_operand); + return MergeHelper::TimedFullMerge(merge_operator, key, value, + context.GetOperands(), result, logger, + statistics, clock, result_operand); } else { + const auto cf_opts = cfh->cfd()->ioptions(); return MergeHelper::TimedFullMerge( - merge_operator, key, value, merge_context.GetOperands(), result, - nullptr, nullptr, SystemClock::Default().get(), result_operand); + merge_operator, key, value, context.GetOperands(), result, + cf_opts->logger, cf_opts->stats, cf_opts->clock, result_operand); } } else { return Status::InvalidArgument("Must provide a column_family"); } } -WriteBatchWithIndexInternal::Result WriteBatchWithIndexInternal::GetFromBatch( - WriteBatchWithIndex* batch, const Slice& key, MergeContext* merge_context, - std::string* value, bool overwrite_key, Status* s) { - uint32_t cf_id = GetColumnFamilyID(column_family_); +WBWIIteratorImpl::Result WriteBatchWithIndexInternal::GetFromBatch( + WriteBatchWithIndex* batch, const Slice& key, MergeContext* context, + std::string* value, Status* s) { *s = Status::OK(); - Result result = kNotFound; std::unique_ptr iter( static_cast_with_check( batch->NewIterator(column_family_))); - // We want to iterate in the reverse order that the writes were added to the - // batch. Since we don't have a reverse iterator, we must seek past the end. - // TODO(agiardullo): consider adding support for reverse iteration + // Search the iterator for this key, and updates/merges to it. iter->Seek(key); - while (iter->Valid() && iter->MatchesKey(cf_id, key)) { - iter->Next(); - } - - if (!(*s).ok()) { - return WriteBatchWithIndexInternal::kError; - } - - if (!iter->Valid()) { - // Read past end of results. Reposition on last result. - iter->SeekToLast(); - } else { - iter->Prev(); - } - - Slice entry_value; - while (iter->Valid()) { - if (!iter->MatchesKey(cf_id, key)) { - // Unexpected error or we've reached a different next key - break; + auto result = iter->FindLatestUpdate(key, context); + if (result == WBWIIteratorImpl::kError) { + (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", + ToString(iter->Entry().type)); + return result; + } else if (result == WBWIIteratorImpl::kNotFound) { + return result; + } else if (result == WBWIIteratorImpl::Result::kFound) { // PUT + Slice entry_value = iter->Entry().value; + if (context->GetNumOperands() > 0) { + *s = MergeKey(key, &entry_value, *context, value); + if (!s->ok()) { + result = WBWIIteratorImpl::Result::kError; + } + } else { + value->assign(entry_value.data(), entry_value.size()); } - - const WriteEntry entry = iter->Entry(); - switch (entry.type) { - case kPutRecord: { - result = WriteBatchWithIndexInternal::Result::kFound; - entry_value = entry.value; - break; - } - case kMergeRecord: { - result = WriteBatchWithIndexInternal::Result::kMergeInProgress; - merge_context->PushOperand(entry.value); - break; - } - case kDeleteRecord: - case kSingleDeleteRecord: { - result = WriteBatchWithIndexInternal::Result::kDeleted; - break; - } - case kLogDataRecord: - case kXIDRecord: { - // ignore - break; - } - default: { - result = WriteBatchWithIndexInternal::Result::kError; - (*s) = Status::Corruption("Unexpected entry in WriteBatchWithIndex:", - ToString(entry.type)); - break; - } - } - if (result == WriteBatchWithIndexInternal::Result::kFound || - result == WriteBatchWithIndexInternal::Result::kDeleted || - result == WriteBatchWithIndexInternal::Result::kError) { - // We can stop iterating once we find a PUT or DELETE - break; - } - if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress && - overwrite_key == true) { - // Since we've overwritten keys, we do not know what other operations are - // in this batch for this key, so we cannot do a Merge to compute the - // result. Instead, we will simply return MergeInProgress. - break; - } - - iter->Prev(); - } - - if ((*s).ok()) { - if (result == WriteBatchWithIndexInternal::Result::kFound || - result == WriteBatchWithIndexInternal::Result::kDeleted) { - // Found a Put or Delete. Merge if necessary. - if (merge_context->GetNumOperands() > 0) { - if (result == WriteBatchWithIndexInternal::Result::kFound) { - *s = MergeKey(key, &entry_value, *merge_context, value); - } else { - *s = MergeKey(key, nullptr, *merge_context, value); - } - if ((*s).ok()) { - result = WriteBatchWithIndexInternal::Result::kFound; - } else { - result = WriteBatchWithIndexInternal::Result::kError; - } - } else { // nothing to merge - if (result == WriteBatchWithIndexInternal::Result::kFound) { // PUT - value->assign(entry_value.data(), entry_value.size()); - } + } else if (result == WBWIIteratorImpl::kDeleted) { + if (context->GetNumOperands() > 0) { + *s = MergeKey(key, nullptr, *context, value); + if (s->ok()) { + result = WBWIIteratorImpl::Result::kFound; + } else { + result = WBWIIteratorImpl::Result::kError; } } } - return result; } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index c0bb81fba..60ec66e30 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -23,6 +23,8 @@ namespace ROCKSDB_NAMESPACE { class MergeContext; +class WBWIIteratorImpl; +class WriteBatchWithIndexInternal; struct Options; // when direction == forward @@ -33,7 +35,8 @@ struct Options; // * equal_keys_ <=> base_iterator == delta_iterator class BaseDeltaIterator : public Iterator { public: - BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, + BaseDeltaIterator(ColumnFamilyHandle* column_family, Iterator* base_iterator, + WBWIIteratorImpl* delta_iterator, const Comparator* comparator, const ReadOptions* read_options = nullptr); @@ -60,14 +63,16 @@ class BaseDeltaIterator : public Iterator { bool DeltaValid() const; void UpdateCurrent(); + std::unique_ptr wbwii_; bool forward_; bool current_at_base_; bool equal_keys_; - Status status_; + mutable Status status_; std::unique_ptr base_iterator_; - std::unique_ptr delta_iterator_; + std::unique_ptr delta_iterator_; const Comparator* comparator_; // not owned const Slice* iterate_upper_bound_; + mutable PinnableSlice merge_result_; }; // Key used by skip list, as the binary searchable index of WriteBatchWithIndex. @@ -174,6 +179,7 @@ typedef SkipList class WBWIIteratorImpl : public WBWIIterator { public: + enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; WBWIIteratorImpl(uint32_t column_family_id, WriteBatchEntrySkipList* skip_list, const ReadableWriteBatch* write_batch, @@ -245,6 +251,26 @@ class WBWIIteratorImpl : public WBWIIterator { bool MatchesKey(uint32_t cf_id, const Slice& key); + // Moves the to first entry of the previous key. + void PrevKey(); + // Moves the to first entry of the next key. + void NextKey(); + + // Moves the iterator to the Update (Put or Delete) for the current key + // If there are no Put/Delete, the Iterator will point to the first entry for + // this key + // @return kFound if a Put was found for the key + // @return kDeleted if a delete was found for the key + // @return kMergeInProgress if only merges were fouund for the key + // @return kError if an unsupported operation was found for the key + // @return kNotFound if no operations were found for this key + // + Result FindLatestUpdate(const Slice& key, MergeContext* merge_context); + Result FindLatestUpdate(MergeContext* merge_context); + + protected: + void AdvanceKey(bool forward); + private: uint32_t column_family_id_; WriteBatchEntrySkipList::Iterator skip_list_iter_; @@ -257,12 +283,12 @@ class WriteBatchWithIndexInternal { // For GetFromBatchAndDB or similar explicit WriteBatchWithIndexInternal(DB* db, ColumnFamilyHandle* column_family); + // For GetFromBatchAndDB or similar + explicit WriteBatchWithIndexInternal(ColumnFamilyHandle* column_family); // For GetFromBatch or similar explicit WriteBatchWithIndexInternal(const DBOptions* db_options, ColumnFamilyHandle* column_family); - enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; - // If batch contains a value for key, store it in *value and return kFound. // If batch contains a deletion for key, return Deleted. // If batch contains Merge operations as the most recent entry for a key, @@ -271,19 +297,25 @@ class WriteBatchWithIndexInternal { // and return kMergeInProgress // If batch does not contain this key, return kNotFound // Else, return kError on error with error Status stored in *s. - Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key, - std::string* value, bool overwrite_key, Status* s) { - return GetFromBatch(batch, key, &merge_context_, value, overwrite_key, s); + WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch, + const Slice& key, std::string* value, + Status* s) { + return GetFromBatch(batch, key, &merge_context_, value, s); } - Result GetFromBatch(WriteBatchWithIndex* batch, const Slice& key, - MergeContext* merge_context, std::string* value, - bool overwrite_key, Status* s); + WBWIIteratorImpl::Result GetFromBatch(WriteBatchWithIndex* batch, + const Slice& key, + MergeContext* merge_context, + std::string* value, Status* s); Status MergeKey(const Slice& key, const Slice* value, std::string* result, - Slice* result_operand = nullptr) { + Slice* result_operand = nullptr) const { return MergeKey(key, value, merge_context_, result, result_operand); } - Status MergeKey(const Slice& key, const Slice* value, MergeContext& context, - std::string* result, Slice* result_operand = nullptr); + Status MergeKey(const Slice& key, const Slice* value, + const MergeContext& context, std::string* result, + Slice* result_operand = nullptr) const; + size_t GetNumOperands() const { return merge_context_.GetNumOperands(); } + MergeContext* GetMergeContext() { return &merge_context_; } + Slice GetOperand(int index) const { return merge_context_.GetOperand(index); } private: DB* db_; diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index dfd16866e..badfc471e 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -10,8 +10,10 @@ #ifndef ROCKSDB_LITE #include "rocksdb/utilities/write_batch_with_index.h" + #include #include + #include "db/column_family.h" #include "port/stack_trace.h" #include "test_util/testharness.h" @@ -19,6 +21,7 @@ #include "util/string_util.h" #include "utilities/merge_operators.h" #include "utilities/merge_operators/string_append/stringappend.h" +#include "utilities/write_batch_with_index/write_batch_with_index_internal.h" namespace ROCKSDB_NAMESPACE { @@ -73,9 +76,241 @@ struct TestHandler : public WriteBatch::Handler { return Status::OK(); } }; + +using KVMap = std::map; + +class KVIter : public Iterator { + public: + explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {} + bool Valid() const override { return iter_ != map_->end(); } + void SeekToFirst() override { iter_ = map_->begin(); } + void SeekToLast() override { + if (map_->empty()) { + iter_ = map_->end(); + } else { + iter_ = map_->find(map_->rbegin()->first); + } + } + void Seek(const Slice& k) override { + iter_ = map_->lower_bound(k.ToString()); + } + void SeekForPrev(const Slice& k) override { + iter_ = map_->upper_bound(k.ToString()); + Prev(); + } + void Next() override { ++iter_; } + void Prev() override { + if (iter_ == map_->begin()) { + iter_ = map_->end(); + return; + } + --iter_; + } + Slice key() const override { return iter_->first; } + Slice value() const override { return iter_->second; } + Status status() const override { return Status::OK(); } + + private: + const KVMap* const map_; + KVMap::const_iterator iter_; +}; + +static std::string PrintContents(WriteBatchWithIndex* batch, + ColumnFamilyHandle* column_family) { + std::string result; + + WBWIIterator* iter; + if (column_family == nullptr) { + iter = batch->NewIterator(); + } else { + iter = batch->NewIterator(column_family); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + WriteEntry e = iter->Entry(); + + if (e.type == kPutRecord) { + result.append("PUT("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kMergeRecord) { + result.append("MERGE("); + result.append(e.key.ToString()); + result.append("):"); + result.append(e.value.ToString()); + } else if (e.type == kSingleDeleteRecord) { + result.append("SINGLE-DEL("); + result.append(e.key.ToString()); + result.append(")"); + } else { + assert(e.type == kDeleteRecord); + result.append("DEL("); + result.append(e.key.ToString()); + result.append(")"); + } + + result.append(","); + iter->Next(); + } + + delete iter; + return result; +} + +static std::string PrintContents(WriteBatchWithIndex* batch, KVMap* base_map, + ColumnFamilyHandle* column_family) { + std::string result; + + Iterator* iter; + if (column_family == nullptr) { + iter = batch->NewIteratorWithBase(new KVIter(base_map)); + } else { + iter = batch->NewIteratorWithBase(column_family, new KVIter(base_map)); + } + + iter->SeekToFirst(); + while (iter->Valid()) { + assert(iter->status().ok()); + + Slice key = iter->key(); + Slice value = iter->value(); + + result.append(key.ToString()); + result.append(":"); + result.append(value.ToString()); + result.append(","); + + iter->Next(); + } + + delete iter; + return result; +} + +void AssertIter(Iterator* iter, const std::string& key, + const std::string& value) { + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(key, iter->key().ToString()); + ASSERT_EQ(value, iter->value().ToString()); +} + +void AssertItersMatch(Iterator* iter1, Iterator* iter2) { + ASSERT_EQ(iter1->Valid(), iter2->Valid()); + if (iter1->Valid()) { + ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString()); + ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); + } +} + +void AssertItersEqual(Iterator* iter1, Iterator* iter2) { + iter1->SeekToFirst(); + iter2->SeekToFirst(); + while (iter1->Valid()) { + ASSERT_EQ(iter1->Valid(), iter2->Valid()); + ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString()); + ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); + iter1->Next(); + iter2->Next(); + } + ASSERT_EQ(iter1->Valid(), iter2->Valid()); +} + +void AssertIterEqual(WBWIIteratorImpl* wbwii, + const std::vector& keys) { + wbwii->SeekToFirst(); + for (auto k : keys) { + ASSERT_TRUE(wbwii->Valid()); + ASSERT_EQ(wbwii->Entry().key, k); + wbwii->NextKey(); + } + ASSERT_FALSE(wbwii->Valid()); + wbwii->SeekToLast(); + for (auto kit = keys.rbegin(); kit != keys.rend(); ++kit) { + ASSERT_TRUE(wbwii->Valid()); + ASSERT_EQ(wbwii->Entry().key, *kit); + wbwii->PrevKey(); + } + ASSERT_FALSE(wbwii->Valid()); +} } // namespace anonymous -class WriteBatchWithIndexTest : public testing::Test {}; +class WBWIBaseTest : public testing::Test { + public: + explicit WBWIBaseTest(bool overwrite) : db_(nullptr) { + options_.merge_operator = + MergeOperators::CreateFromStringId("stringappend"); + options_.create_if_missing = true; + dbname_ = test::PerThreadDBPath("write_batch_with_index_test"); + DestroyDB(dbname_, options_); + batch_.reset(new WriteBatchWithIndex(BytewiseComparator(), 20, overwrite)); + } + + virtual ~WBWIBaseTest() { + if (db_ != nullptr) { + ReleaseSnapshot(); + delete db_; + DestroyDB(dbname_, options_); + } + } + + std::string AddToBatch(ColumnFamilyHandle* cf, const std::string& key) { + std::string result; + for (size_t i = 0; i < key.size(); i++) { + if (key[i] == 'd') { + batch_->Delete(cf, key); + result = ""; + } else if (key[i] == 'p') { + result = key + ToString(i); + batch_->Put(cf, key, result); + } else if (key[i] == 'm') { + std::string value = key + ToString(i); + batch_->Merge(cf, key, value); + if (result.empty()) { + result = value; + } else { + result = result + "," + value; + } + } + } + return result; + } + + virtual Status OpenDB() { return DB::Open(options_, dbname_, &db_); } + + void ReleaseSnapshot() { + if (read_opts_.snapshot != nullptr) { + EXPECT_NE(db_, nullptr); + db_->ReleaseSnapshot(read_opts_.snapshot); + read_opts_.snapshot = nullptr; + } + } + + public: + DB* db_; + std::string dbname_; + Options options_; + WriteOptions write_opts_; + ReadOptions read_opts_; + std::unique_ptr batch_; +}; + +class WBWIKeepTest : public WBWIBaseTest { + public: + WBWIKeepTest() : WBWIBaseTest(false) {} +}; + +class WBWIOverwriteTest : public WBWIBaseTest { + public: + WBWIOverwriteTest() : WBWIBaseTest(true) {} +}; +class WriteBatchWithIndexTest : public WBWIBaseTest, + public testing::WithParamInterface { + public: + WriteBatchWithIndexTest() : WBWIBaseTest(GetParam()) {} +}; void TestValueAsSecondaryIndexHelper(std::vector entries, WriteBatchWithIndex* batch) { @@ -273,7 +508,7 @@ void TestValueAsSecondaryIndexHelper(std::vector entries, } } -TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { +TEST_F(WBWIKeepTest, TestValueAsSecondaryIndex) { Entry entries[] = { {"aaa", "0005", kPutRecord}, {"b", "0002", kPutRecord}, @@ -286,12 +521,12 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { }; std::vector entries_list(entries, entries + 8); - WriteBatchWithIndex batch(nullptr, 20); + batch_.reset(new WriteBatchWithIndex(nullptr, 20, false)); - TestValueAsSecondaryIndexHelper(entries_list, &batch); + TestValueAsSecondaryIndexHelper(entries_list, batch_.get()); // Clear batch and re-run test with new values - batch.Clear(); + batch_->Clear(); Entry new_entries[] = { {"aaa", "0005", kPutRecord}, @@ -306,30 +541,29 @@ TEST_F(WriteBatchWithIndexTest, TestValueAsSecondaryIndex) { entries_list = std::vector(new_entries, new_entries + 8); - TestValueAsSecondaryIndexHelper(entries_list, &batch); + TestValueAsSecondaryIndexHelper(entries_list, batch_.get()); } -TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { +TEST_P(WriteBatchWithIndexTest, TestComparatorForCF) { ColumnFamilyHandleImplDummy cf1(6, nullptr); ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20); - ASSERT_OK(batch.Put(&cf1, "ddd", "")); - ASSERT_OK(batch.Put(&cf2, "aaa", "")); - ASSERT_OK(batch.Put(&cf2, "eee", "")); - ASSERT_OK(batch.Put(&cf1, "ccc", "")); - ASSERT_OK(batch.Put(&reverse_cf, "a11", "")); - ASSERT_OK(batch.Put(&cf1, "bbb", "")); + ASSERT_OK(batch_->Put(&cf1, "ddd", "")); + ASSERT_OK(batch_->Put(&cf2, "aaa", "")); + ASSERT_OK(batch_->Put(&cf2, "eee", "")); + ASSERT_OK(batch_->Put(&cf1, "ccc", "")); + ASSERT_OK(batch_->Put(&reverse_cf, "a11", "")); + ASSERT_OK(batch_->Put(&cf1, "bbb", "")); Slice key_slices[] = {"a", "3", "3"}; Slice value_slice = ""; - ASSERT_OK(batch.Put(&reverse_cf, SliceParts(key_slices, 3), - SliceParts(&value_slice, 1))); - ASSERT_OK(batch.Put(&reverse_cf, "a22", "")); + ASSERT_OK(batch_->Put(&reverse_cf, SliceParts(key_slices, 3), + SliceParts(&value_slice, 1))); + ASSERT_OK(batch_->Put(&reverse_cf, "a22", "")); { - std::unique_ptr iter(batch.NewIterator(&cf1)); + std::unique_ptr iter(batch_->NewIterator(&cf1)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -348,7 +582,7 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { } { - std::unique_ptr iter(batch.NewIterator(&cf2)); + std::unique_ptr iter(batch_->NewIterator(&cf2)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -363,7 +597,7 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { } { - std::unique_ptr iter(batch.NewIterator(&reverse_cf)); + std::unique_ptr iter(batch_->NewIterator(&reverse_cf)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(!iter->Valid()); @@ -396,29 +630,28 @@ TEST_F(WriteBatchWithIndexTest, TestComparatorForCF) { } } -TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { +TEST_F(WBWIOverwriteTest, TestOverwriteKey) { ColumnFamilyHandleImplDummy cf1(6, nullptr); ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); - ASSERT_OK(batch.Put(&cf1, "ddd", "")); - ASSERT_OK(batch.Merge(&cf1, "ddd", "")); - ASSERT_OK(batch.Delete(&cf1, "ddd")); - ASSERT_OK(batch.Put(&cf2, "aaa", "")); - ASSERT_OK(batch.Delete(&cf2, "aaa")); - ASSERT_OK(batch.Put(&cf2, "aaa", "aaa")); - ASSERT_OK(batch.Put(&cf2, "eee", "eee")); - ASSERT_OK(batch.Put(&cf1, "ccc", "")); - ASSERT_OK(batch.Put(&reverse_cf, "a11", "")); - ASSERT_OK(batch.Delete(&cf1, "ccc")); - ASSERT_OK(batch.Put(&reverse_cf, "a33", "a33")); - ASSERT_OK(batch.Put(&reverse_cf, "a11", "a11")); + ASSERT_OK(batch_->Merge(&cf1, "ddd", "")); + ASSERT_OK(batch_->Put(&cf1, "ddd", "")); + ASSERT_OK(batch_->Delete(&cf1, "ddd")); + ASSERT_OK(batch_->Put(&cf2, "aaa", "")); + ASSERT_OK(batch_->Delete(&cf2, "aaa")); + ASSERT_OK(batch_->Put(&cf2, "aaa", "aaa")); + ASSERT_OK(batch_->Put(&cf2, "eee", "eee")); + ASSERT_OK(batch_->Put(&cf1, "ccc", "")); + ASSERT_OK(batch_->Put(&reverse_cf, "a11", "")); + ASSERT_OK(batch_->Delete(&cf1, "ccc")); + ASSERT_OK(batch_->Put(&reverse_cf, "a33", "a33")); + ASSERT_OK(batch_->Put(&reverse_cf, "a11", "a11")); Slice slices[] = {"a", "3", "3"}; - ASSERT_OK(batch.Delete(&reverse_cf, SliceParts(slices, 3))); + ASSERT_OK(batch_->Delete(&reverse_cf, SliceParts(slices, 3))); { - std::unique_ptr iter(batch.NewIterator(&cf1)); + std::unique_ptr iter(batch_->NewIterator(&cf1)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -435,7 +668,7 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { } { - std::unique_ptr iter(batch.NewIterator(&cf2)); + std::unique_ptr iter(batch_->NewIterator(&cf2)); iter->SeekToLast(); ASSERT_OK(iter->status()); ASSERT_TRUE(iter->Valid()); @@ -466,7 +699,7 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { } { - std::unique_ptr iter(batch.NewIterator(&reverse_cf)); + std::unique_ptr iter(batch_->NewIterator(&reverse_cf)); iter->Seek(""); ASSERT_OK(iter->status()); ASSERT_TRUE(!iter->Valid()); @@ -500,64 +733,33 @@ TEST_F(WriteBatchWithIndexTest, TestOverwriteKey) { } } -namespace { -typedef std::map KVMap; - -class KVIter : public Iterator { - public: - explicit KVIter(const KVMap* map) : map_(map), iter_(map_->end()) {} - bool Valid() const override { return iter_ != map_->end(); } - void SeekToFirst() override { iter_ = map_->begin(); } - void SeekToLast() override { - if (map_->empty()) { - iter_ = map_->end(); - } else { - iter_ = map_->find(map_->rbegin()->first); - } - } - void Seek(const Slice& k) override { - iter_ = map_->lower_bound(k.ToString()); - } - void SeekForPrev(const Slice& k) override { - iter_ = map_->upper_bound(k.ToString()); - Prev(); - } - void Next() override { ++iter_; } - void Prev() override { - if (iter_ == map_->begin()) { - iter_ = map_->end(); - return; - } - --iter_; - } - - Slice key() const override { return iter_->first; } - Slice value() const override { return iter_->second; } - Status status() const override { return Status::OK(); } - - private: - const KVMap* const map_; - KVMap::const_iterator iter_; -}; - -void AssertIter(Iterator* iter, const std::string& key, - const std::string& value) { - ASSERT_OK(iter->status()); - ASSERT_TRUE(iter->Valid()); - ASSERT_EQ(key, iter->key().ToString()); - ASSERT_EQ(value, iter->value().ToString()); +TEST_P(WriteBatchWithIndexTest, TestWBWIIterator) { + ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); + ASSERT_OK(batch_->Put(&cf1, "a", "a1")); + ASSERT_OK(batch_->Put(&cf1, "c", "c1")); + ASSERT_OK(batch_->Put(&cf1, "c", "c2")); + ASSERT_OK(batch_->Put(&cf1, "e", "e1")); + ASSERT_OK(batch_->Put(&cf1, "e", "e2")); + ASSERT_OK(batch_->Put(&cf1, "e", "e3")); + std::unique_ptr iter1( + static_cast(batch_->NewIterator(&cf1))); + std::unique_ptr iter2( + static_cast(batch_->NewIterator(&cf2))); + AssertIterEqual(iter1.get(), {"a", "c", "e"}); + AssertIterEqual(iter2.get(), {}); + ASSERT_OK(batch_->Put(&cf2, "a", "a2")); + ASSERT_OK(batch_->Merge(&cf2, "b", "b1")); + ASSERT_OK(batch_->Merge(&cf2, "b", "b2")); + ASSERT_OK(batch_->Delete(&cf2, "d")); + ASSERT_OK(batch_->Merge(&cf2, "d", "d2")); + ASSERT_OK(batch_->Merge(&cf2, "d", "d3")); + ASSERT_OK(batch_->Delete(&cf2, "f")); + AssertIterEqual(iter1.get(), {"a", "c", "e"}); + AssertIterEqual(iter2.get(), {"a", "b", "d", "f"}); } -void AssertItersEqual(Iterator* iter1, Iterator* iter2) { - ASSERT_EQ(iter1->Valid(), iter2->Valid()); - if (iter1->Valid()) { - ASSERT_EQ(iter1->key().ToString(), iter2->key().ToString()); - ASSERT_EQ(iter1->value().ToString(), iter2->value().ToString()); - } -} -} // namespace - -TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { +TEST_P(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { std::vector source_strings = {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}; for (int rand_seed = 301; rand_seed < 366; rand_seed++) { @@ -566,14 +768,13 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator()); - - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + batch_->Clear(); if (rand_seed % 2 == 0) { - ASSERT_OK(batch.Put(&cf2, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf2, "zoo", "bar")); } if (rand_seed % 4 == 1) { - ASSERT_OK(batch.Put(&cf3, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf3, "zoo", "bar")); } KVMap map; @@ -589,24 +790,24 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { break; case 1: // only delta has it - ASSERT_OK(batch.Put(&cf1, key, value)); + ASSERT_OK(batch_->Put(&cf1, key, value)); map[key] = value; merged_map[key] = value; break; case 2: // both has it. Delta should win - ASSERT_OK(batch.Put(&cf1, key, value)); + ASSERT_OK(batch_->Put(&cf1, key, value)); map[key] = "wrong_value"; merged_map[key] = value; break; case 3: // both has it. Delta is delete - ASSERT_OK(batch.Delete(&cf1, key)); + ASSERT_OK(batch_->Delete(&cf1, key)); map[key] = "wrong_value"; break; case 4: // only delta has it. Delta is delete - ASSERT_OK(batch.Delete(&cf1, key)); + ASSERT_OK(batch_->Delete(&cf1, key)); map[key] = "wrong_value"; break; default: @@ -616,7 +817,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { } std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); std::unique_ptr result_iter(new KVIter(&merged_map)); bool is_valid = false; @@ -672,7 +873,7 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { } break; } - AssertItersEqual(iter.get(), result_iter.get()); + AssertItersMatch(iter.get(), result_iter.get()); is_valid = iter->Valid(); } @@ -680,18 +881,16 @@ TEST_F(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { } } -TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { +TEST_P(WriteBatchWithIndexTest, TestIteraratorWithBase) { ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); - { KVMap map; map["a"] = "aa"; map["c"] = "cc"; map["e"] = "ee"; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -724,12 +923,12 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { } // Test the case that there is one element in the write batch - ASSERT_OK(batch.Put(&cf2, "zoo", "bar")); - ASSERT_OK(batch.Put(&cf1, "a", "aa")); + ASSERT_OK(batch_->Put(&cf2, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf1, "a", "aa")); { KVMap empty_map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -738,10 +937,10 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { ASSERT_TRUE(!iter->Valid()); } - ASSERT_OK(batch.Delete(&cf1, "b")); - ASSERT_OK(batch.Put(&cf1, "c", "cc")); - ASSERT_OK(batch.Put(&cf1, "d", "dd")); - ASSERT_OK(batch.Delete(&cf1, "e")); + ASSERT_OK(batch_->Delete(&cf1, "b")); + ASSERT_OK(batch_->Put(&cf1, "c", "cc")); + ASSERT_OK(batch_->Put(&cf1, "d", "dd")); + ASSERT_OK(batch_->Delete(&cf1, "e")); { KVMap map; @@ -749,7 +948,7 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { map["cc"] = "cccc"; map["f"] = "ff"; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -807,7 +1006,7 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { { KVMap empty_map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -843,18 +1042,17 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBase) { } } -TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { +TEST_P(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator()); ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator()); - WriteBatchWithIndex batch(BytewiseComparator(), 20, true); // Test the case that there is one element in the write batch - ASSERT_OK(batch.Put(&cf2, "zoo", "bar")); - ASSERT_OK(batch.Put(&cf1, "a", "aa")); + ASSERT_OK(batch_->Put(&cf2, "zoo", "bar")); + ASSERT_OK(batch_->Put(&cf1, "a", "aa")); { KVMap empty_map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "aa"); @@ -863,11 +1061,11 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { ASSERT_TRUE(!iter->Valid()); } - ASSERT_OK(batch.Put(&cf1, "c", "cc")); + ASSERT_OK(batch_->Put(&cf1, "c", "cc")); { KVMap map; std::unique_ptr iter( - batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + batch_->NewIteratorWithBase(&cf1, new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "c", "cc"); @@ -896,11 +1094,12 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { } // default column family - ASSERT_OK(batch.Put("a", "b")); + ASSERT_OK(batch_->Put("a", "b")); { KVMap map; map["b"] = ""; - std::unique_ptr iter(batch.NewIteratorWithBase(new KVIter(&map))); + std::unique_ptr iter( + batch_->NewIteratorWithBase(new KVIter(&map))); iter->SeekToFirst(); AssertIter(iter.get(), "a", "b"); @@ -929,416 +1128,288 @@ TEST_F(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { } } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatch) { +TEST_P(WriteBatchWithIndexTest, TestGetFromBatch) { Options options; - WriteBatchWithIndex batch; Status s; std::string value; - s = batch.GetFromBatch(options, "b", &value); + s = batch_->GetFromBatch(options_, "b", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Put("a", "a")); - ASSERT_OK(batch.Put("b", "b")); - ASSERT_OK(batch.Put("c", "c")); - ASSERT_OK(batch.Put("a", "z")); - ASSERT_OK(batch.Delete("c")); - ASSERT_OK(batch.Delete("d")); - ASSERT_OK(batch.Delete("e")); - ASSERT_OK(batch.Put("e", "e")); + ASSERT_OK(batch_->Put("a", "a")); + ASSERT_OK(batch_->Put("b", "b")); + ASSERT_OK(batch_->Put("c", "c")); + ASSERT_OK(batch_->Put("a", "z")); + ASSERT_OK(batch_->Delete("c")); + ASSERT_OK(batch_->Delete("d")); + ASSERT_OK(batch_->Delete("e")); + ASSERT_OK(batch_->Put("e", "e")); - s = batch.GetFromBatch(options, "b", &value); + s = batch_->GetFromBatch(options_, "b", &value); ASSERT_OK(s); ASSERT_EQ("b", value); - s = batch.GetFromBatch(options, "a", &value); + s = batch_->GetFromBatch(options_, "a", &value); ASSERT_OK(s); ASSERT_EQ("z", value); - s = batch.GetFromBatch(options, "c", &value); + s = batch_->GetFromBatch(options_, "c", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(options, "d", &value); + s = batch_->GetFromBatch(options_, "d", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(options, "x", &value); + s = batch_->GetFromBatch(options_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(options, "e", &value); + s = batch_->GetFromBatch(options_, "e", &value); ASSERT_OK(s); ASSERT_EQ("e", value); - ASSERT_OK(batch.Merge("z", "z")); + ASSERT_OK(batch_->Merge("z", "z")); - s = batch.GetFromBatch(options, "z", &value); + s = batch_->GetFromBatch(options_, "z", &value); ASSERT_NOK(s); // No merge operator specified. - s = batch.GetFromBatch(options, "b", &value); + s = batch_->GetFromBatch(options_, "b", &value); ASSERT_OK(s); ASSERT_EQ("b", value); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge) { - DB* db; - Options options; - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - options.create_if_missing = true; - - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchMerge) { + Status s = OpenDB(); ASSERT_OK(s); - ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); - WriteBatchWithIndex batch; + ColumnFamilyHandle* column_family = db_->DefaultColumnFamily(); std::string value; - s = batch.GetFromBatch(options, "x", &value); + s = batch_->GetFromBatch(options_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Put("x", "X")); + ASSERT_OK(batch_->Put("x", "X")); std::string expected = "X"; for (int i = 0; i < 5; i++) { - ASSERT_OK(batch.Merge("x", ToString(i))); + ASSERT_OK(batch_->Merge("x", ToString(i))); expected = expected + "," + ToString(i); if (i % 2 == 0) { - ASSERT_OK(batch.Put("y", ToString(i / 2))); + ASSERT_OK(batch_->Put("y", ToString(i / 2))); } - ASSERT_OK(batch.Merge("z", "z")); + ASSERT_OK(batch_->Merge("z", "z")); - s = batch.GetFromBatch(column_family, options, "x", &value); + s = batch_->GetFromBatch(column_family, options_, "x", &value); ASSERT_OK(s); ASSERT_EQ(expected, value); - s = batch.GetFromBatch(column_family, options, "y", &value); + s = batch_->GetFromBatch(column_family, options_, "y", &value); ASSERT_OK(s); ASSERT_EQ(ToString(i / 2), value); - s = batch.GetFromBatch(column_family, options, "z", &value); + s = batch_->GetFromBatch(column_family, options_, "z", &value); ASSERT_TRUE(s.IsMergeInProgress()); } - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchMerge2) { - DB* db; - Options options; - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - options.create_if_missing = true; - - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_F(WBWIOverwriteTest, TestGetFromBatchMerge2) { + Status s = OpenDB(); ASSERT_OK(s); - ColumnFamilyHandle* column_family = db->DefaultColumnFamily(); - - // Test batch with overwrite_key=true - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); + ColumnFamilyHandle* column_family = db_->DefaultColumnFamily(); std::string value; - s = batch.GetFromBatch(column_family, options, "X", &value); + s = batch_->GetFromBatch(column_family, options_, "X", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Put(column_family, "X", "x")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->Put(column_family, "X", "x")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); ASSERT_EQ("x", value); - ASSERT_OK(batch.Put(column_family, "X", "x2")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->Put(column_family, "X", "x2")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); ASSERT_EQ("x2", value); - ASSERT_OK(batch.Merge(column_family, "X", "aaa")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge(column_family, "X", "aaa")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("x2,aaa", value); - ASSERT_OK(batch.Merge(column_family, "X", "bbb")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge(column_family, "X", "bbb")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("x2,aaa,bbb", value); - ASSERT_OK(batch.Put(column_family, "X", "x3")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->Put(column_family, "X", "x3")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); ASSERT_EQ("x3", value); - ASSERT_OK(batch.Merge(column_family, "X", "ccc")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge(column_family, "X", "ccc")); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("x3,ccc", value); - ASSERT_OK(batch.Delete(column_family, "X")); - s = batch.GetFromBatch(column_family, options, "X", &value); + ASSERT_OK(batch_->Delete(column_family, "X")); + s = batch_->GetFromBatch(column_family, options_, "X", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Merge(column_family, "X", "ddd")); - s = batch.GetFromBatch(column_family, options, "X", &value); - ASSERT_TRUE(s.IsMergeInProgress()); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); + batch_->Merge(column_family, "X", "ddd"); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "X", &value)); + ASSERT_EQ("ddd", value); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { - DB* db; - Options options; - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDB) { + ASSERT_OK(OpenDB()); - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); - ASSERT_OK(s); - - WriteBatchWithIndex batch; - ReadOptions read_options; - WriteOptions write_options; std::string value; - s = db->Put(write_options, "a", "a"); - ASSERT_OK(s); + ASSERT_OK(db_->Put(write_opts_, "a", "a")); + ASSERT_OK(db_->Put(write_opts_, "b", "b")); + ASSERT_OK(db_->Put(write_opts_, "c", "c")); - s = db->Put(write_options, "b", "b"); - ASSERT_OK(s); + ASSERT_OK(batch_->Put("a", "batch_->a")); + ASSERT_OK(batch_->Delete("b")); - s = db->Put(write_options, "c", "c"); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); + ASSERT_EQ("batch_->a", value); - ASSERT_OK(batch.Put("a", "batch.a")); - ASSERT_OK(batch.Delete("b")); - - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); - ASSERT_OK(s); - ASSERT_EQ("batch.a", value); - - s = batch.GetFromBatchAndDB(db, read_options, "b", &value); + Status s = batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatchAndDB(db, read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value)); ASSERT_EQ("c", value); - s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(db->Delete(write_options, "x")); + ASSERT_OK(db_->Delete(write_opts_, "x")); - s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { - DB* db; - Options options; - - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge) { + Status s = OpenDB(); ASSERT_OK(s); - WriteBatchWithIndex batch; - ReadOptions read_options; - WriteOptions write_options; std::string value; - s = db->Put(write_options, "a", "a0"); - ASSERT_OK(s); + ASSERT_OK(db_->Put(write_opts_, "a", "a0")); + ASSERT_OK(db_->Put(write_opts_, "b", "b0")); + ASSERT_OK(db_->Merge(write_opts_, "b", "b1")); + ASSERT_OK(db_->Merge(write_opts_, "c", "c0")); + ASSERT_OK(db_->Merge(write_opts_, "d", "d0")); - s = db->Put(write_options, "b", "b0"); - ASSERT_OK(s); + ASSERT_OK(batch_->Merge("a", "a1")); + ASSERT_OK(batch_->Merge("a", "a2")); + ASSERT_OK(batch_->Merge("b", "b2")); + ASSERT_OK(batch_->Merge("d", "d1")); + ASSERT_OK(batch_->Merge("e", "e0")); - s = db->Merge(write_options, "b", "b1"); - ASSERT_OK(s); - - s = db->Merge(write_options, "c", "c0"); - ASSERT_OK(s); - - s = db->Merge(write_options, "d", "d0"); - ASSERT_OK(s); - - ASSERT_OK(batch.Merge("a", "a1")); - ASSERT_OK(batch.Merge("a", "a2")); - ASSERT_OK(batch.Merge("b", "b2")); - ASSERT_OK(batch.Merge("d", "d1")); - ASSERT_OK(batch.Merge("e", "e0")); - - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); ASSERT_EQ("a0,a1,a2", value); - s = batch.GetFromBatchAndDB(db, read_options, "b", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value)); ASSERT_EQ("b0,b1,b2", value); - s = batch.GetFromBatchAndDB(db, read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value)); ASSERT_EQ("c0", value); - s = batch.GetFromBatchAndDB(db, read_options, "d", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "d", &value)); ASSERT_EQ("d0,d1", value); - s = batch.GetFromBatchAndDB(db, read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value)); ASSERT_EQ("e0", value); - s = db->Delete(write_options, "x"); - ASSERT_OK(s); + ASSERT_OK(db_->Delete(write_opts_, "x")); - s = batch.GetFromBatchAndDB(db, read_options, "x", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "x", &value); ASSERT_TRUE(s.IsNotFound()); - const Snapshot* snapshot = db->GetSnapshot(); + const Snapshot* snapshot = db_->GetSnapshot(); ReadOptions snapshot_read_options; snapshot_read_options.snapshot = snapshot; - s = db->Delete(write_options, "a"); - ASSERT_OK(s); + ASSERT_OK(db_->Delete(write_opts_, "a")); - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); ASSERT_EQ("a1,a2", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); - ASSERT_OK(s); + ASSERT_OK( + s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "a", &value)); ASSERT_EQ("a0,a1,a2", value); - ASSERT_OK(batch.Delete("a")); + ASSERT_OK(batch_->Delete("a")); - s = batch.GetFromBatchAndDB(db, read_options, "a", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "a", &value); + s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "a", &value); ASSERT_TRUE(s.IsNotFound()); - s = db->Merge(write_options, "c", "c1"); - ASSERT_OK(s); + ASSERT_OK(s = db_->Merge(write_opts_, "c", "c1")); - s = batch.GetFromBatchAndDB(db, read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK(s = batch_->GetFromBatchAndDB(db_, read_opts_, "c", &value)); ASSERT_EQ("c0,c1", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "c", &value); - ASSERT_OK(s); + ASSERT_OK( + s = batch_->GetFromBatchAndDB(db_, snapshot_read_options, "c", &value)); ASSERT_EQ("c0", value); - s = db->Put(write_options, "e", "e1"); - ASSERT_OK(s); - - s = batch.GetFromBatchAndDB(db, read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(db_->Put(write_opts_, "e", "e1")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value)); ASSERT_EQ("e1,e0", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, snapshot_read_options, "e", &value)); ASSERT_EQ("e0", value); - s = db->Delete(write_options, "e"); - ASSERT_OK(s); - - s = batch.GetFromBatchAndDB(db, read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(s = db_->Delete(write_opts_, "e")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "e", &value)); ASSERT_EQ("e0", value); - s = batch.GetFromBatchAndDB(db, snapshot_read_options, "e", &value); - ASSERT_OK(s); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, snapshot_read_options, "e", &value)); ASSERT_EQ("e0", value); - db->ReleaseSnapshot(snapshot); - delete db; - EXPECT_OK(DestroyDB(dbname, options)); + db_->ReleaseSnapshot(snapshot); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge2) { - DB* db; - Options options; - - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_F(WBWIOverwriteTest, TestGetFromBatchAndDBMerge2) { + Status s = OpenDB(); ASSERT_OK(s); - // Test batch with overwrite_key=true - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); - - ReadOptions read_options; - WriteOptions write_options; std::string value; - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - ASSERT_OK(batch.Merge("A", "xxx")); + ASSERT_OK(batch_->Merge("A", "xxx")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); + ASSERT_EQ(value, "xxx"); - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Merge("A", "yyy")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); + ASSERT_EQ(value, "xxx,yyy"); - ASSERT_OK(batch.Merge("A", "yyy")); + ASSERT_OK(db_->Put(write_opts_, "A", "a0")); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); + ASSERT_EQ(value, "a0,xxx,yyy"); - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); + ASSERT_OK(batch_->Delete("A")); - s = db->Put(write_options, "A", "a0"); - ASSERT_OK(s); - - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); - ASSERT_TRUE(s.IsMergeInProgress()); - - ASSERT_OK(batch.Delete("A")); - - s = batch.GetFromBatchAndDB(db, read_options, "A", &value); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } -TEST_F(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) { - DB* db; - Options options; - - options.create_if_missing = true; - std::string dbname = test::PerThreadDBPath("write_batch_with_index_test"); - - options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); - - EXPECT_OK(DestroyDB(dbname, options)); - Status s = DB::Open(options, dbname, &db); +TEST_P(WriteBatchWithIndexTest, TestGetFromBatchAndDBMerge3) { + Status s = OpenDB(); ASSERT_OK(s); - ReadOptions read_options; - WriteOptions write_options; FlushOptions flush_options; std::string value; - WriteBatchWithIndex batch; + ASSERT_OK(db_->Put(write_opts_, "A", "1")); + ASSERT_OK(db_->Flush(flush_options, db_->DefaultColumnFamily())); + ASSERT_OK(batch_->Merge("A", "2")); - ASSERT_OK(db->Put(write_options, "A", "1")); - ASSERT_OK(db->Flush(flush_options, db->DefaultColumnFamily())); - ASSERT_OK(batch.Merge("A", "2")); - - ASSERT_OK(batch.GetFromBatchAndDB(db, read_options, "A", &value)); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "A", &value)); ASSERT_EQ(value, "1,2"); - - delete db; - EXPECT_OK(DestroyDB(dbname, options)); } void AssertKey(std::string key, WBWIIterator* iter) { @@ -1353,25 +1424,24 @@ void AssertValue(std::string value, WBWIIterator* iter) { // Tests that we can write to the WBWI while we iterate (from a single thread). // iteration should see the newest writes -TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingCorrectnessTest) { - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); +TEST_F(WBWIOverwriteTest, MutateWhileIteratingCorrectnessTest) { for (char c = 'a'; c <= 'z'; ++c) { - ASSERT_OK(batch.Put(std::string(1, c), std::string(1, c))); + ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c))); } - std::unique_ptr iter(batch.NewIterator()); + std::unique_ptr iter(batch_->NewIterator()); iter->Seek("k"); AssertKey("k", iter.get()); iter->Next(); AssertKey("l", iter.get()); - ASSERT_OK(batch.Put("ab", "cc")); + ASSERT_OK(batch_->Put("ab", "cc")); iter->Next(); AssertKey("m", iter.get()); - ASSERT_OK(batch.Put("mm", "kk")); + ASSERT_OK(batch_->Put("mm", "kk")); iter->Next(); AssertKey("mm", iter.get()); AssertValue("kk", iter.get()); - ASSERT_OK(batch.Delete("mm")); + ASSERT_OK(batch_->Delete("mm")); iter->Next(); AssertKey("n", iter.get()); @@ -1381,7 +1451,7 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingCorrectnessTest) { iter->Seek("ab"); AssertKey("ab", iter.get()); - ASSERT_OK(batch.Delete("x")); + ASSERT_OK(batch_->Delete("x")); iter->Seek("x"); AssertKey("x", iter.get()); ASSERT_EQ(kDeleteRecord, iter->Entry().type); @@ -1400,10 +1470,10 @@ void AssertIterValue(std::string value, Iterator* iter) { } // same thing as above, but testing IteratorWithBase -TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { +TEST_F(WBWIOverwriteTest, MutateWhileIteratingBaseCorrectnessTest) { WriteBatchWithIndex batch(BytewiseComparator(), 0, true); for (char c = 'a'; c <= 'z'; ++c) { - ASSERT_OK(batch.Put(std::string(1, c), std::string(1, c))); + ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c))); } KVMap map; @@ -1412,20 +1482,19 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { map["ee"] = "ee"; map["em"] = "me"; - std::unique_ptr iter( - batch.NewIteratorWithBase(new KVIter(&map))); + std::unique_ptr iter(batch_->NewIteratorWithBase(new KVIter(&map))); iter->Seek("k"); AssertIterKey("k", iter.get()); iter->Next(); AssertIterKey("l", iter.get()); - ASSERT_OK(batch.Put("ab", "cc")); + ASSERT_OK(batch_->Put("ab", "cc")); iter->Next(); AssertIterKey("m", iter.get()); - ASSERT_OK(batch.Put("mm", "kk")); + ASSERT_OK(batch_->Put("mm", "kk")); iter->Next(); AssertIterKey("mm", iter.get()); AssertIterValue("kk", iter.get()); - ASSERT_OK(batch.Delete("mm")); + ASSERT_OK(batch_->Delete("mm")); iter->Next(); AssertIterKey("n", iter.get()); iter->Prev(); @@ -1438,13 +1507,13 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { AssertIterKey("aa", iter.get()); iter->Prev(); AssertIterKey("a", iter.get()); - ASSERT_OK(batch.Delete("aa")); + ASSERT_OK(batch_->Delete("aa")); iter->Next(); AssertIterKey("ab", iter.get()); iter->Prev(); AssertIterKey("a", iter.get()); - ASSERT_OK(batch.Delete("x")); + ASSERT_OK(batch_->Delete("x")); iter->Seek("x"); AssertIterKey("y", iter.get()); iter->Next(); @@ -1453,11 +1522,11 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { iter->Prev(); AssertIterKey("w", iter.get()); - ASSERT_OK(batch.Delete("e")); + ASSERT_OK(batch_->Delete("e")); iter->Seek("e"); AssertIterKey("ee", iter.get()); AssertIterValue("ee", iter.get()); - ASSERT_OK(batch.Put("ee", "xx")); + ASSERT_OK(batch_->Put("ee", "xx")); // still the same value AssertIterValue("ee", iter.get()); iter->Next(); @@ -1470,10 +1539,9 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseCorrectnessTest) { } // stress testing mutations with IteratorWithBase -TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { - WriteBatchWithIndex batch(BytewiseComparator(), 0, true); +TEST_F(WBWIOverwriteTest, MutateWhileIteratingBaseStressTest) { for (char c = 'a'; c <= 'z'; ++c) { - ASSERT_OK(batch.Put(std::string(1, c), std::string(1, c))); + ASSERT_OK(batch_->Put(std::string(1, c), std::string(1, c))); } KVMap map; @@ -1481,8 +1549,7 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { map[std::string(2, c)] = std::string(2, c); } - std::unique_ptr iter( - batch.NewIteratorWithBase(new KVIter(&map))); + std::unique_ptr iter(batch_->NewIteratorWithBase(new KVIter(&map))); Random rnd(301); for (int i = 0; i < 1000000; ++i) { @@ -1490,16 +1557,16 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { char c = static_cast(rnd.Uniform(26) + 'a'); switch (random) { case 0: - ASSERT_OK(batch.Put(std::string(1, c), "xxx")); + ASSERT_OK(batch_->Put(std::string(1, c), "xxx")); break; case 1: - ASSERT_OK(batch.Put(std::string(2, c), "xxx")); + ASSERT_OK(batch_->Put(std::string(2, c), "xxx")); break; case 2: - ASSERT_OK(batch.Delete(std::string(1, c))); + ASSERT_OK(batch_->Delete(std::string(1, c))); break; case 3: - ASSERT_OK(batch.Delete(std::string(2, c))); + ASSERT_OK(batch_->Delete(std::string(2, c))); break; case 4: iter->Seek(std::string(1, c)); @@ -1524,330 +1591,523 @@ TEST_F(WriteBatchWithIndexTest, MutateWhileIteratingBaseStressTest) { ASSERT_OK(iter->status()); } -static void PrintContents(WriteBatchWithIndex* batch, - ColumnFamilyHandle* column_family, - std::string* result) { - WBWIIterator* iter; - if (column_family == nullptr) { - iter = batch->NewIterator(); - } else { - iter = batch->NewIterator(column_family); - } - - iter->SeekToFirst(); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - - WriteEntry e = iter->Entry(); - - if (e.type == kPutRecord) { - result->append("PUT("); - result->append(e.key.ToString()); - result->append("):"); - result->append(e.value.ToString()); - } else if (e.type == kMergeRecord) { - result->append("MERGE("); - result->append(e.key.ToString()); - result->append("):"); - result->append(e.value.ToString()); - } else if (e.type == kSingleDeleteRecord) { - result->append("SINGLE-DEL("); - result->append(e.key.ToString()); - result->append(")"); - } else { - assert(e.type == kDeleteRecord); - result->append("DEL("); - result->append(e.key.ToString()); - result->append(")"); - } - - result->append(","); - iter->Next(); - } - - ASSERT_OK(iter->status()); - - delete iter; -} - -static std::string PrintContents(WriteBatchWithIndex* batch, - ColumnFamilyHandle* column_family) { - std::string result; - PrintContents(batch, column_family, &result); - return result; -} - -static void PrintContents(WriteBatchWithIndex* batch, KVMap* base_map, - ColumnFamilyHandle* column_family, - std::string* result) { - Iterator* iter; - if (column_family == nullptr) { - iter = batch->NewIteratorWithBase(new KVIter(base_map)); - } else { - iter = batch->NewIteratorWithBase(column_family, new KVIter(base_map)); - } - - iter->SeekToFirst(); - while (iter->Valid()) { - ASSERT_OK(iter->status()); - - Slice key = iter->key(); - Slice value = iter->value(); - - result->append(key.ToString()); - result->append(":"); - result->append(value.ToString()); - result->append(","); - - iter->Next(); - } - - ASSERT_OK(iter->status()); - - delete iter; -} - -static std::string PrintContents(WriteBatchWithIndex* batch, KVMap* base_map, - ColumnFamilyHandle* column_family) { - std::string result; - PrintContents(batch, base_map, column_family, &result); - return result; -} - -TEST_F(WriteBatchWithIndexTest, SavePointTest) { - WriteBatchWithIndex batch; +TEST_P(WriteBatchWithIndexTest, SavePointTest) { ColumnFamilyHandleImplDummy cf1(1, BytewiseComparator()); + KVMap empty_map; + std::unique_ptr cf0_iter( + batch_->NewIteratorWithBase(new KVIter(&empty_map))); + std::unique_ptr cf1_iter( + batch_->NewIteratorWithBase(&cf1, new KVIter(&empty_map))); Status s; + KVMap kvm_cf0_0 = {{"A", "aa"}, {"B", "b"}}; + KVMap kvm_cf1_0 = {{"A", "a1"}, {"C", "c1"}, {"E", "e1"}}; + KVIter kvi_cf0_0(&kvm_cf0_0); + KVIter kvi_cf1_0(&kvm_cf1_0); - ASSERT_OK(batch.Put("A", "a")); - ASSERT_OK(batch.Put("B", "b")); - ASSERT_OK(batch.Put("A", "aa")); - ASSERT_OK(batch.Put(&cf1, "A", "a1")); - ASSERT_OK(batch.Delete(&cf1, "B")); - ASSERT_OK(batch.Put(&cf1, "C", "c1")); - ASSERT_OK(batch.Put(&cf1, "E", "e1")); + ASSERT_OK(batch_->Put("A", "a")); + ASSERT_OK(batch_->Put("B", "b")); + ASSERT_OK(batch_->Put("A", "aa")); + ASSERT_OK(batch_->Put(&cf1, "A", "a1")); + ASSERT_OK(batch_->Delete(&cf1, "B")); + ASSERT_OK(batch_->Put(&cf1, "C", "c1")); + ASSERT_OK(batch_->Put(&cf1, "E", "e1")); - batch.SetSavePoint(); // 1 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_0); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_0); + batch_->SetSavePoint(); // 1 - ASSERT_OK(batch.Put("C", "cc")); - ASSERT_OK(batch.Put("B", "bb")); - ASSERT_OK(batch.Delete("A")); - ASSERT_OK(batch.Put(&cf1, "B", "b1")); - ASSERT_OK(batch.Delete(&cf1, "A")); - ASSERT_OK(batch.SingleDelete(&cf1, "E")); - batch.SetSavePoint(); // 2 + KVMap kvm_cf0_1 = {{"B", "bb"}, {"C", "cc"}}; + KVMap kvm_cf1_1 = {{"B", "b1"}, {"C", "c1"}}; + KVIter kvi_cf0_1(&kvm_cf0_1); + KVIter kvi_cf1_1(&kvm_cf1_1); - ASSERT_OK(batch.Put("A", "aaa")); - ASSERT_OK(batch.Put("A", "xxx")); - ASSERT_OK(batch.Delete("B")); - ASSERT_OK(batch.Put(&cf1, "B", "b2")); - ASSERT_OK(batch.Delete(&cf1, "C")); - batch.SetSavePoint(); // 3 - batch.SetSavePoint(); // 4 - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.Delete(&cf1, "D")); - ASSERT_OK(batch.Delete(&cf1, "E")); + ASSERT_OK(batch_->Put("C", "cc")); + ASSERT_OK(batch_->Put("B", "bb")); + ASSERT_OK(batch_->Delete("A")); + ASSERT_OK(batch_->Put(&cf1, "B", "b1")); + ASSERT_OK(batch_->Delete(&cf1, "A")); + ASSERT_OK(batch_->SingleDelete(&cf1, "E")); + batch_->SetSavePoint(); // 2 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_1); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_1); - ASSERT_EQ( - "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" - "B)" - ",PUT(C):cc,SINGLE-DEL(D),", - PrintContents(&batch, nullptr)); + KVMap kvm_cf0_2 = {{"A", "xxx"}, {"C", "cc"}}; + KVMap kvm_cf1_2 = {{"B", "b2"}}; + KVIter kvi_cf0_2(&kvm_cf0_2); + KVIter kvi_cf1_2(&kvm_cf1_2); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," - "DEL(D),PUT(E):e1,SINGLE-DEL(E),DEL(E),", - PrintContents(&batch, &cf1)); + ASSERT_OK(batch_->Put("A", "aaa")); + ASSERT_OK(batch_->Put("A", "xxx")); + ASSERT_OK(batch_->Delete("B")); + ASSERT_OK(batch_->Put(&cf1, "B", "b2")); + ASSERT_OK(batch_->Delete(&cf1, "C")); + batch_->SetSavePoint(); // 3 + batch_->SetSavePoint(); // 4 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_2); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_2); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 4 - ASSERT_EQ( - "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" - "B)" - ",PUT(C):cc,", - PrintContents(&batch, nullptr)); + KVMap kvm_cf0_4 = {{"A", "xxx"}, {"C", "cc"}}; + KVMap kvm_cf1_4 = {{"B", "b2"}}; + KVIter kvi_cf0_4(&kvm_cf0_4); + KVIter kvi_cf1_4(&kvm_cf1_4); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->Delete(&cf1, "D")); + ASSERT_OK(batch_->Delete(&cf1, "E")); + AssertItersEqual(cf0_iter.get(), &kvi_cf0_4); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_4); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 4 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_2); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_2); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 3 - ASSERT_EQ( - "PUT(A):a,PUT(A):aa,DEL(A),PUT(A):aaa,PUT(A):xxx,PUT(B):b,PUT(B):bb,DEL(" - "B)" - ",PUT(C):cc,", - PrintContents(&batch, nullptr)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 3 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_2); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_2); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(B):b2,PUT(C):c1,DEL(C)," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 2 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_1); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_1); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 2 - ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", - PrintContents(&batch, nullptr)); + batch_->SetSavePoint(); // 5 + ASSERT_OK(batch_->Put("X", "x")); - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); + KVMap kvm_cf0_5 = {{"B", "bb"}, {"C", "cc"}, {"X", "x"}}; + KVIter kvi_cf0_5(&kvm_cf0_5); + KVIter kvi_cf1_5(&kvm_cf1_1); + AssertItersEqual(cf0_iter.get(), &kvi_cf0_5); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_5); - batch.SetSavePoint(); // 5 - ASSERT_OK(batch.Put("X", "x")); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 5 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_1); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_1); - ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,PUT(X):x,", - PrintContents(&batch, nullptr)); + ASSERT_OK(batch_->RollbackToSavePoint()); // rollback to 1 + AssertItersEqual(cf0_iter.get(), &kvi_cf0_0); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_0); - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 5 - ASSERT_EQ("PUT(A):a,PUT(A):aa,DEL(A),PUT(B):b,PUT(B):bb,PUT(C):cc,", - PrintContents(&batch, nullptr)); - - ASSERT_EQ( - "PUT(A):a1,DEL(A),DEL(B),PUT(B):b1,PUT(C):c1," - "PUT(E):e1,SINGLE-DEL(E),", - PrintContents(&batch, &cf1)); - - ASSERT_OK(batch.RollbackToSavePoint()); // rollback to 1 - ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); - - ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,", - PrintContents(&batch, &cf1)); - - s = batch.RollbackToSavePoint(); // no savepoint found + s = batch_->RollbackToSavePoint(); // no savepoint found ASSERT_TRUE(s.IsNotFound()); - ASSERT_EQ("PUT(A):a,PUT(A):aa,PUT(B):b,", PrintContents(&batch, nullptr)); + AssertItersEqual(cf0_iter.get(), &kvi_cf0_0); + AssertItersEqual(cf1_iter.get(), &kvi_cf1_0); - ASSERT_EQ("PUT(A):a1,DEL(B),PUT(C):c1,PUT(E):e1,", - PrintContents(&batch, &cf1)); + batch_->SetSavePoint(); // 6 - batch.SetSavePoint(); // 6 + batch_->Clear(); + ASSERT_EQ("", PrintContents(batch_.get(), nullptr)); + ASSERT_EQ("", PrintContents(batch_.get(), &cf1)); - batch.Clear(); - ASSERT_EQ("", PrintContents(&batch, nullptr)); - ASSERT_EQ("", PrintContents(&batch, &cf1)); - - s = batch.RollbackToSavePoint(); // rollback to 6 + s = batch_->RollbackToSavePoint(); // rollback to 6 ASSERT_TRUE(s.IsNotFound()); } -TEST_F(WriteBatchWithIndexTest, SingleDeleteTest) { - WriteBatchWithIndex batch; +TEST_P(WriteBatchWithIndexTest, SingleDeleteTest) { Status s; std::string value; - DBOptions db_options; - ASSERT_OK(batch.SingleDelete("A")); + ASSERT_OK(batch_->SingleDelete("A")); - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_TRUE(s.IsNotFound()); - value = PrintContents(&batch, nullptr); - ASSERT_EQ("SINGLE-DEL(A),", value); - batch.Clear(); - ASSERT_OK(batch.Put("A", "a")); - ASSERT_OK(batch.Put("A", "a2")); - ASSERT_OK(batch.Put("B", "b")); - ASSERT_OK(batch.SingleDelete("A")); + batch_->Clear(); + ASSERT_OK(batch_->Put("A", "a")); + ASSERT_OK(batch_->Put("A", "a2")); + ASSERT_OK(batch_->Put("B", "b")); + ASSERT_OK(batch_->SingleDelete("A")); - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_OK(s); ASSERT_EQ("b", value); - value = PrintContents(&batch, nullptr); - ASSERT_EQ("PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(B):b,", value); + ASSERT_OK(batch_->Put("C", "c")); + ASSERT_OK(batch_->Put("A", "a3")); + ASSERT_OK(batch_->Delete("B")); + ASSERT_OK(batch_->SingleDelete("B")); + ASSERT_OK(batch_->SingleDelete("C")); - ASSERT_OK(batch.Put("C", "c")); - ASSERT_OK(batch.Put("A", "a3")); - ASSERT_OK(batch.Delete("B")); - ASSERT_OK(batch.SingleDelete("B")); - ASSERT_OK(batch.SingleDelete("C")); - - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_OK(s); ASSERT_EQ("a3", value); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "C", &value); + s = batch_->GetFromBatch(options_, "C", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "D", &value); + s = batch_->GetFromBatch(options_, "D", &value); ASSERT_TRUE(s.IsNotFound()); - value = PrintContents(&batch, nullptr); - ASSERT_EQ( - "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,PUT(B):b,DEL(B),SINGLE-DEL(B)" - ",PUT(C):c,SINGLE-DEL(C),", - value); + ASSERT_OK(batch_->Put("B", "b4")); + ASSERT_OK(batch_->Put("C", "c4")); + ASSERT_OK(batch_->Put("D", "d4")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->Delete("A")); - ASSERT_OK(batch.Put("B", "b4")); - ASSERT_OK(batch.Put("C", "c4")); - ASSERT_OK(batch.Put("D", "d4")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.Delete("A")); - - s = batch.GetFromBatch(db_options, "A", &value); + s = batch_->GetFromBatch(options_, "A", &value); ASSERT_TRUE(s.IsNotFound()); - s = batch.GetFromBatch(db_options, "B", &value); + s = batch_->GetFromBatch(options_, "B", &value); ASSERT_OK(s); ASSERT_EQ("b4", value); - s = batch.GetFromBatch(db_options, "C", &value); + s = batch_->GetFromBatch(options_, "C", &value); ASSERT_OK(s); ASSERT_EQ("c4", value); - s = batch.GetFromBatch(db_options, "D", &value); + s = batch_->GetFromBatch(options_, "D", &value); ASSERT_TRUE(s.IsNotFound()); - - value = PrintContents(&batch, nullptr); - ASSERT_EQ( - "PUT(A):a,PUT(A):a2,SINGLE-DEL(A),PUT(A):a3,DEL(A),PUT(B):b,DEL(B)," - "SINGLE-DEL(B),PUT(B):b4,PUT(C):c,SINGLE-DEL(C),PUT(C):c4,PUT(D):d4," - "SINGLE-DEL(D),SINGLE-DEL(D),", - value); } -TEST_F(WriteBatchWithIndexTest, SingleDeleteDeltaIterTest) { +TEST_P(WriteBatchWithIndexTest, SingleDeleteDeltaIterTest) { std::string value; - DBOptions db_options; - WriteBatchWithIndex batch(BytewiseComparator(), 20, true /* overwrite_key */); - - ASSERT_OK(batch.Put("A", "a")); - ASSERT_OK(batch.Put("A", "a2")); - ASSERT_OK(batch.Put("B", "b")); - ASSERT_OK(batch.SingleDelete("A")); - ASSERT_OK(batch.Delete("B")); + ASSERT_OK(batch_->Put("A", "a")); + ASSERT_OK(batch_->Put("A", "a2")); + ASSERT_OK(batch_->Put("B", "b")); + ASSERT_OK(batch_->SingleDelete("A")); + ASSERT_OK(batch_->Delete("B")); KVMap map; - value = PrintContents(&batch, &map, nullptr); + value = PrintContents(batch_.get(), &map, nullptr); ASSERT_EQ("", value); map["A"] = "aa"; map["C"] = "cc"; map["D"] = "dd"; - ASSERT_OK(batch.SingleDelete("B")); - ASSERT_OK(batch.SingleDelete("C")); - ASSERT_OK(batch.SingleDelete("Z")); + ASSERT_OK(batch_->SingleDelete("B")); + ASSERT_OK(batch_->SingleDelete("C")); + ASSERT_OK(batch_->SingleDelete("Z")); - value = PrintContents(&batch, &map, nullptr); + value = PrintContents(batch_.get(), &map, nullptr); ASSERT_EQ("D:dd,", value); - ASSERT_OK(batch.Put("A", "a3")); - ASSERT_OK(batch.Put("B", "b3")); - ASSERT_OK(batch.SingleDelete("A")); - ASSERT_OK(batch.SingleDelete("A")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.SingleDelete("D")); - ASSERT_OK(batch.Delete("D")); + ASSERT_OK(batch_->Put("A", "a3")); + ASSERT_OK(batch_->Put("B", "b3")); + ASSERT_OK(batch_->SingleDelete("A")); + ASSERT_OK(batch_->SingleDelete("A")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->SingleDelete("D")); + ASSERT_OK(batch_->Delete("D")); map["E"] = "ee"; - value = PrintContents(&batch, &map, nullptr); + value = PrintContents(batch_.get(), &map, nullptr); ASSERT_EQ("B:b3,E:ee,", value); } +TEST_P(WriteBatchWithIndexTest, MultiGetTest) { + // MultiGet a lot of keys in order to force std::vector reallocations + std::vector keys; + for (int i = 0; i < 100; ++i) { + keys.emplace_back(std::to_string(i)); + } + + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + // Write some data to the db for the even numbered keys + { + WriteBatch wb; + for (size_t i = 1; i < keys.size(); ++i) { + std::string val = "val" + std::to_string(i); + ASSERT_OK(wb.Put(cf0, keys[i], val)); + } + ASSERT_OK(db_->Write(write_opts_, &wb)); + for (size_t i = 1; i < keys.size(); ++i) { + std::string value; + ASSERT_OK(db_->Get(read_opts_, cf0, keys[i], &value)); + } + } + + // Write some data to the batch + for (size_t i = 0; i < keys.size(); ++i) { + if ((i % 5) == 0) { + ASSERT_OK(batch_->Delete(cf0, keys[i])); + } else if ((i % 7) == 0) { + std::string val = "new" + std::to_string(i); + ASSERT_OK(batch_->Put(cf0, keys[i], val)); + } + if (i > 0 && (i % 3) == 0) { + ASSERT_OK(batch_->Merge(cf0, keys[i], "merge")); + } + } + + std::vector key_slices; + for (size_t i = 0; i < keys.size(); ++i) { + key_slices.emplace_back(keys[i]); + } + std::vector values(keys.size()); + std::vector statuses(keys.size()); + + batch_->MultiGetFromBatchAndDB(db_, read_opts_, cf0, key_slices.size(), + key_slices.data(), values.data(), + statuses.data(), false); + for (size_t i = 0; i < keys.size(); ++i) { + if (i == 0) { + ASSERT_TRUE(statuses[i].IsNotFound()); + } else if ((i % 3) == 0) { + ASSERT_OK(statuses[i]); + if ((i % 5) == 0) { // Merge after Delete + ASSERT_EQ(values[i], "merge"); + } else if ((i % 7) == 0) { // Merge after Put + std::string val = "new" + std::to_string(i); + ASSERT_EQ(values[i], val + ",merge"); + } else { + std::string val = "val" + std::to_string(i); + ASSERT_EQ(values[i], val + ",merge"); + } + } else if ((i % 5) == 0) { + ASSERT_TRUE(statuses[i].IsNotFound()); + } else if ((i % 7) == 0) { + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], "new" + std::to_string(i)); + } else { + ASSERT_OK(statuses[i]); + ASSERT_EQ(values[i], "val" + std::to_string(i)); + } + } +} + +// This test has merges, but the merge does not play into the final result +TEST_P(WriteBatchWithIndexTest, FakeMergeWithIteratorTest) { + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + // The map we are starting with + KVMap input = { + {"odm", "odm0"}, + {"omd", "omd0"}, + {"omp", "omp0"}, + }; + KVMap result = { + {"odm", "odm2"}, // Orig, Delete, Merge + {"mp", "mp1"}, // Merge, Put + {"omp", "omp2"}, // Origi, Merge, Put + {"mmp", "mmp2"} // Merge, Merge, Put + }; + + for (auto& iter : result) { + EXPECT_EQ(AddToBatch(cf0, iter.first), iter.second); + } + AddToBatch(cf0, "md"); // Merge, Delete + AddToBatch(cf0, "mmd"); // Merge, Merge, Delete + AddToBatch(cf0, "omd"); // Orig, Merge, Delete + + KVIter kvi(&result); + // First try just the batch + std::unique_ptr iter( + batch_->NewIteratorWithBase(cf0, new KVIter(&input))); + AssertItersEqual(iter.get(), &kvi); +} + +TEST_P(WriteBatchWithIndexTest, IteratorMergeTest) { + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + KVMap result = { + {"m", "m0"}, // Merge + {"mm", "mm0,mm1"}, // Merge, Merge + {"dm", "dm1"}, // Delete, Merge + {"dmm", "dmm1,dmm2"}, // Delete, Merge, Merge + {"mdm", "mdm2"}, // Merge, Delete, Merge + {"mpm", "mpm1,mpm2"}, // Merge, Put, Merge + {"pm", "pm0,pm1"}, // Put, Merge + {"pmm", "pmm0,pmm1,pmm2"}, // Put, Merge, Merge + }; + + for (auto& iter : result) { + EXPECT_EQ(AddToBatch(cf0, iter.first), iter.second); + } + + KVIter kvi(&result); + // First try just the batch + KVMap empty_map; + std::unique_ptr iter( + batch_->NewIteratorWithBase(cf0, new KVIter(&empty_map))); + AssertItersEqual(iter.get(), &kvi); +} + +TEST_P(WriteBatchWithIndexTest, IteratorMergeTestWithOrig) { + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + KVMap original; + KVMap results = { + {"m", "om,m0"}, // Merge + {"mm", "omm,mm0,mm1"}, // Merge, Merge + {"dm", "dm1"}, // Delete, Merge + {"dmm", "dmm1,dmm2"}, // Delete, Merge, Merge + {"mdm", "mdm2"}, // Merge, Delete, Merge + {"mpm", "mpm1,mpm2"}, // Merge, Put, Merge + {"pm", "pm0,pm1"}, // Put, Merge + {"pmm", "pmm0,pmm1,pmm2"}, // Put, Merge, Merge + }; + + for (auto& iter : results) { + AddToBatch(cf0, iter.first); + original[iter.first] = "o" + iter.first; + } + + KVIter kvi(&results); + // First try just the batch + std::unique_ptr iter( + batch_->NewIteratorWithBase(cf0, new KVIter(&original))); + AssertItersEqual(iter.get(), &kvi); +} + +TEST_P(WriteBatchWithIndexTest, GetFromBatchAfterMerge) { + std::string value; + Status s; + + ASSERT_OK(OpenDB()); + ASSERT_OK(db_->Put(write_opts_, "o", "aa")); + batch_->Merge("o", "bb"); // Merging bb under key "o" + batch_->Merge("m", "cc"); // Merging bc under key "m" + s = batch_->GetFromBatch(options_, "m", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + s = batch_->GetFromBatch(options_, "o", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + + ASSERT_OK(db_->Write(write_opts_, batch_->GetWriteBatch())); + ASSERT_OK(db_->Get(read_opts_, "o", &value)); + ASSERT_EQ(value, "aa,bb"); + ASSERT_OK(db_->Get(read_opts_, "m", &value)); + ASSERT_EQ(value, "cc"); +} + +TEST_P(WriteBatchWithIndexTest, GetFromBatchAndDBAfterMerge) { + std::string value; + + ASSERT_OK(OpenDB()); + ASSERT_OK(db_->Put(write_opts_, "o", "aa")); + ASSERT_OK(batch_->Merge("o", "bb")); // Merging bb under key "o" + ASSERT_OK(batch_->Merge("m", "cc")); // Merging bc under key "m" + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "o", &value)); + ASSERT_EQ(value, "aa,bb"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "m", &value)); + ASSERT_EQ(value, "cc"); +} + +TEST_F(WBWIKeepTest, GetAfterPut) { + std::string value; + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + ASSERT_OK(db_->Put(write_opts_, "key", "orig")); + + ASSERT_OK(batch_->Put("key", "aa")); // Writing aa under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "aa"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa"); + + ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "aa,bb"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa,bb"); + + ASSERT_OK(batch_->Merge("key", "cc")); // Merging cc under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "aa,bb,cc"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa,bb,cc"); +} + +TEST_P(WriteBatchWithIndexTest, GetAfterMergePut) { + std::string value; + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + ASSERT_OK(db_->Put(write_opts_, "key", "orig")); + + ASSERT_OK(batch_->Merge("key", "aa")); // Merging aa under key + Status s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "orig,aa"); + + ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key + s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "orig,aa,bb"); + + ASSERT_OK(batch_->Put("key", "cc")); // Writing cc under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc"); + + ASSERT_OK(batch_->Merge("key", "dd")); // Merging dd under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); +} + +TEST_P(WriteBatchWithIndexTest, GetAfterMergeDelete) { + std::string value; + ASSERT_OK(OpenDB()); + ColumnFamilyHandle* cf0 = db_->DefaultColumnFamily(); + + ASSERT_OK(batch_->Merge("key", "aa")); // Merging aa under key + Status s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa"); + + ASSERT_OK(batch_->Merge("key", "bb")); // Merging bb under key + s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_EQ(s.code(), Status::Code::kMergeInProgress); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "aa,bb"); + + ASSERT_OK(batch_->Delete("key")); // Delete key from batch + s = batch_->GetFromBatch(cf0, options_, "key", &value); + ASSERT_TRUE(s.IsNotFound()); + s = batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value); + ASSERT_TRUE(s.IsNotFound()); + + ASSERT_OK(batch_->Merge("key", "cc")); // Merging cc under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc"); + ASSERT_OK(batch_->Merge("key", "dd")); // Merging dd under key + ASSERT_OK(batch_->GetFromBatch(cf0, options_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "key", &value)); + ASSERT_EQ(value, "cc,dd"); +} + +TEST_F(WBWIOverwriteTest, TestBadMergeOperator) { + class FailingMergeOperator : public MergeOperator { + public: + FailingMergeOperator() {} + + bool FullMergeV2(const MergeOperationInput& /*merge_in*/, + MergeOperationOutput* /*merge_out*/) const override { + return false; + } + + const char* Name() const override { return "Failing"; } + }; + options_.merge_operator.reset(new FailingMergeOperator()); + ASSERT_OK(OpenDB()); + + ColumnFamilyHandle* column_family = db_->DefaultColumnFamily(); + std::string value; + + ASSERT_OK(db_->Put(write_opts_, "a", "a0")); + ASSERT_OK(batch_->Put("b", "b0")); + + ASSERT_OK(batch_->Merge("a", "a1")); + ASSERT_NOK(batch_->GetFromBatchAndDB(db_, read_opts_, "a", &value)); + ASSERT_NOK(batch_->GetFromBatch(column_family, options_, "a", &value)); + ASSERT_OK(batch_->GetFromBatchAndDB(db_, read_opts_, "b", &value)); + ASSERT_OK(batch_->GetFromBatch(column_family, options_, "b", &value)); +} + +INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {