From 6f125998636a6ce245d662e59799b0cfa7a5ad56 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 22 Feb 2022 14:19:02 -0800 Subject: [PATCH] Support WBWI for keys having timestamps (#9603) Summary: This PR supports inserting keys to a `WriteBatchWithIndex` for column families that enable user-defined timestamps and reading the keys back. **The index does not have timestamps.** Writing a key to WBWI is unchanged, because the underlying WriteBatch already supports it. When reading the keys back, we need to make sure to distinguish between keys with and without timestamps before comparison. When user calls `GetFromBatchAndDB()`, no timestamp is needed to query the batch, but a timestamp has to be provided to query the db. The assumption is that data in the batch must be newer than data from the db. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9603 Test Plan: make check Reviewed By: ltamasi Differential Revision: D34354849 Pulled By: riversand963 fbshipit-source-id: d25d1f84e2240ce543e521fa30595082fb8db9a0 --- HISTORY.md | 4 + db/write_batch.cc | 6 + db/write_batch_internal.h | 4 + db/write_batch_test.cc | 24 ++- .../utilities/write_batch_with_index.h | 1 + include/rocksdb/write_batch.h | 4 + utilities/transactions/transaction_base.cc | 4 +- .../write_batch_with_index.cc | 31 +++- .../write_batch_with_index_internal.cc | 51 ++++-- .../write_batch_with_index_internal.h | 31 +++- .../write_batch_with_index_test.cc | 147 +++++++++++++++++- 11 files changed, 277 insertions(+), 30 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f1908c56b..eb138935b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,4 +1,8 @@ # Rocksdb Change Log +## Unreleased +### New Features +* Allow WriteBatchWithIndex to index a WriteBatch that includes keys with user-defined timestamps. The index itself does not have timestamp. + ## 7.0.0 (02/20/2022) ### Bug Fixes * Fixed a major bug in which batched MultiGet could return old values for keys deleted by DeleteRange when memtable Bloom filter is enabled (memtable_prefix_bloom_size_ratio > 0). (The fix includes a substantial MultiGet performance improvement in the unusual case of both memtable_whole_key_filtering and prefix_extractor.) diff --git a/db/write_batch.cc b/db/write_batch.cc index a8865c8b2..31e2db27f 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -800,6 +800,7 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, } needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; std::string dummy_ts(ts_sz, '\0'); std::array key_with_ts{{key, dummy_ts}}; return WriteBatchInternal::Put(this, cf_id, SliceParts(key_with_ts.data(), 2), @@ -812,6 +813,7 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, if (!s.ok()) { return s; } + has_key_with_ts_ = true; assert(column_family); uint32_t cf_id = column_family->GetID(); std::array key_with_ts{{key, ts}}; @@ -1002,6 +1004,7 @@ Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { } needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; std::string dummy_ts(ts_sz, '\0'); std::array key_with_ts{{key, dummy_ts}}; return WriteBatchInternal::Delete(this, cf_id, @@ -1015,6 +1018,7 @@ Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key, return s; } assert(column_family); + has_key_with_ts_ = true; uint32_t cf_id = column_family->GetID(); std::array key_with_ts{{key, ts}}; return WriteBatchInternal::Delete(this, cf_id, @@ -1115,6 +1119,7 @@ Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, } needs_in_place_update_ts_ = true; + has_key_with_ts_ = true; std::string dummy_ts(ts_sz, '\0'); std::array key_with_ts{{key, dummy_ts}}; return WriteBatchInternal::SingleDelete(this, cf_id, @@ -1127,6 +1132,7 @@ Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family, if (!s.ok()) { return s; } + has_key_with_ts_ = true; assert(column_family); uint32_t cf_id = column_family->GetID(); std::array key_with_ts{{key, ts}}; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 321be6c4c..49abed74e 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -228,6 +228,10 @@ class WriteBatchInternal { static bool TimestampsUpdateNeeded(const WriteBatch& wb) { return wb.needs_in_place_update_ts_; } + + static bool HasKeyWithTimestamp(const WriteBatch& wb) { + return wb.has_key_with_ts_; + } }; // LocalSavePoint is similar to a scope guard diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 0ad9f7667..6e6ad5578 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -1009,6 +1009,29 @@ TEST_F(WriteBatchTest, UpdateTimestamps) { {4, cf4.GetComparator()}, {5, cf5.GetComparator()}}; + static constexpr size_t timestamp_size = sizeof(uint64_t); + + { + WriteBatch wb1, wb2, wb3, wb4, wb5, wb6, wb7; + ASSERT_OK(wb1.Put(&cf0, "key", "value")); + ASSERT_FALSE(WriteBatchInternal::HasKeyWithTimestamp(wb1)); + ASSERT_OK(wb2.Put(&cf4, "key", "value")); + ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb2)); + ASSERT_OK(wb3.Put(&cf4, "key", /*ts=*/std::string(timestamp_size, '\xfe'), + "value")); + ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb3)); + ASSERT_OK(wb4.Delete(&cf4, "key", + /*ts=*/std::string(timestamp_size, '\xfe'))); + ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb4)); + ASSERT_OK(wb5.Delete(&cf4, "key")); + ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb5)); + ASSERT_OK(wb6.SingleDelete(&cf4, "key")); + ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb6)); + ASSERT_OK(wb7.SingleDelete(&cf4, "key", + /*ts=*/std::string(timestamp_size, '\xfe'))); + ASSERT_TRUE(WriteBatchInternal::HasKeyWithTimestamp(wb7)); + } + WriteBatch batch; // Write to the batch. We will assign timestamps later. for (const auto& key_str : key_strs) { @@ -1017,7 +1040,6 @@ TEST_F(WriteBatchTest, UpdateTimestamps) { ASSERT_OK(batch.Put(&cf5, key_str, "value")); } - static constexpr size_t timestamp_size = sizeof(uint64_t); const auto checker1 = [](uint32_t cf) { if (cf == 4 || cf == 5) { return timestamp_size; diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 828a5c8d2..90174abaf 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -275,6 +275,7 @@ class WriteBatchWithIndex : public WriteBatchBase { friend class WritePreparedTxn; friend class WriteUnpreparedTxn; friend class WriteBatchWithIndex_SubBatchCnt_Test; + friend class WriteBatchWithIndexInternal; // Returns the number of sub-batches inside the write batch. A sub-batch // starts right before inserting a key that is a duplicate of a key in the // last sub-batch. diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 4c1462478..d39727fac 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -444,6 +444,10 @@ class WriteBatch : public WriteBatchBase { // timestamps to desired values. bool needs_in_place_update_ts_ = false; + // True if the write batch contains at least one key from a column family + // that enables user-defined timestamp. + bool has_key_with_ts_ = false; + protected: std::string rep_; // See comment in write_batch.cc for the format of rep_ }; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 9775a1c1b..0149ffd50 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -125,7 +125,9 @@ Status TransactionBaseImpl::TryLock(ColumnFamilyHandle* column_family, void TransactionBaseImpl::SetSavePoint() { if (save_points_ == nullptr) { - save_points_.reset(new std::stack>()); + save_points_.reset( + new std::stack>()); } save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_, num_puts_, num_deletes_, num_merges_, diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 704549f17..3c22f41a8 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -26,7 +26,8 @@ namespace ROCKSDB_NAMESPACE { struct WriteBatchWithIndex::Rep { explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, size_t max_bytes = 0, bool _overwrite_key = false) - : write_batch(reserved_bytes, max_bytes), + : write_batch(reserved_bytes, max_bytes, /*protection_bytes_per_key=*/0, + index_comparator ? index_comparator->timestamp_size() : 0), comparator(index_comparator, &write_batch), skip_list(comparator, &arena), overwrite_key(_overwrite_key), @@ -144,9 +145,11 @@ void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { wb_data.size() - last_entry_offset); // Extract key Slice key; - bool success __attribute__((__unused__)); - success = + bool success = ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0); +#ifdef NDEBUG + (void)success; +#endif assert(success); auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); @@ -239,6 +242,7 @@ Status WriteBatchWithIndex::Rep::ReBuildIndex() { case kTypeBeginUnprepareXID: case kTypeEndPrepareXID: case kTypeCommitXID: + case kTypeCommitXIDAndTimestamp: case kTypeRollbackXID: case kTypeNoop: break; @@ -491,6 +495,12 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, Status WriteBatchWithIndex::GetFromBatchAndDB( DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* pinnable_val, ReadCallback* callback) { + const Comparator* const ucmp = rep->comparator.GetComparator(column_family); + size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0; + if (ts_sz > 0 && !read_options.timestamp) { + return Status::InvalidArgument("Must specify timestamp"); + } + Status s; WriteBatchWithIndexInternal wbwii(db, column_family); @@ -555,6 +565,15 @@ void WriteBatchWithIndex::MultiGetFromBatchAndDB( DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const size_t num_keys, const Slice* keys, PinnableSlice* values, Status* statuses, bool sorted_input, ReadCallback* callback) { + const Comparator* const ucmp = rep->comparator.GetComparator(column_family); + size_t ts_sz = ucmp ? ucmp->timestamp_size() : 0; + if (ts_sz > 0 && !read_options.timestamp) { + for (size_t i = 0; i < num_keys; ++i) { + statuses[i] = Status::InvalidArgument("Must specify timestamp"); + } + return; + } + WriteBatchWithIndexInternal wbwii(db, column_family); autovector key_context; @@ -657,5 +676,11 @@ size_t WriteBatchWithIndex::GetDataSize() const { return rep->write_batch.GetDataSize(); } +const Comparator* WriteBatchWithIndexInternal::GetUserComparator( + const WriteBatchWithIndex& wbwi, uint32_t cf_id) { + const WriteBatchEntryComparator& ucmps = wbwi.rep->comparator; + return ucmps.GetComparator(cf_id); +} + } // namespace ROCKSDB_NAMESPACE #endif // !ROCKSDB_LITE diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc index ee893cdd9..297d0e706 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc @@ -33,6 +33,7 @@ BaseDeltaIterator::BaseDeltaIterator(ColumnFamilyHandle* column_family, comparator_(comparator), iterate_upper_bound_(read_options ? read_options->iterate_upper_bound : nullptr) { + assert(comparator_); wbwii_.reset(new WriteBatchWithIndexInternal(column_family)); } @@ -95,8 +96,9 @@ void BaseDeltaIterator::Next() { AdvanceBase(); } if (DeltaValid() && BaseValid()) { - if (comparator_->Equal(delta_iterator_->Entry().key, - base_iterator_->key())) { + if (0 == comparator_->CompareWithoutTimestamp( + delta_iterator_->Entry().key, /*a_has_ts=*/false, + base_iterator_->key(), /*b_has_ts=*/false)) { equal_keys_ = true; } } @@ -131,8 +133,9 @@ void BaseDeltaIterator::Prev() { AdvanceBase(); } if (DeltaValid() && BaseValid()) { - if (comparator_->Equal(delta_iterator_->Entry().key, - base_iterator_->key())) { + if (0 == comparator_->CompareWithoutTimestamp( + delta_iterator_->Entry().key, /*a_has_ts=*/false, + base_iterator_->key(), /*b_has_ts=*/false)) { equal_keys_ = true; } } @@ -218,8 +221,9 @@ void BaseDeltaIterator::AssertInvariants() { // we don't support those yet assert(delta_iterator_->Entry().type != kMergeRecord && delta_iterator_->Entry().type != kLogDataRecord); - int compare = - comparator_->Compare(delta_iterator_->Entry().key, base_iterator_->key()); + int compare = comparator_->CompareWithoutTimestamp( + delta_iterator_->Entry().key, /*a_has_ts=*/false, base_iterator_->key(), + /*b_has_ts=*/false); if (forward_) { // current_at_base -> compare < 0 assert(!current_at_base_ || compare < 0); @@ -301,7 +305,9 @@ void BaseDeltaIterator::UpdateCurrent() { return; } if (iterate_upper_bound_) { - if (comparator_->Compare(delta_entry.key, *iterate_upper_bound_) >= 0) { + if (comparator_->CompareWithoutTimestamp( + delta_entry.key, /*a_has_ts=*/false, *iterate_upper_bound_, + /*b_has_ts=*/false) >= 0) { // out of upper bound -> finished. return; } @@ -319,8 +325,9 @@ void BaseDeltaIterator::UpdateCurrent() { return; } else { int compare = - (forward_ ? 1 : -1) * - comparator_->Compare(delta_entry.key, base_iterator_->key()); + (forward_ ? 1 : -1) * comparator_->CompareWithoutTimestamp( + delta_entry.key, /*a_has_ts=*/false, + base_iterator_->key(), /*b_has_ts=*/false); if (compare <= 0) { // delta bigger or equal if (compare == 0) { equal_keys_ = true; @@ -572,12 +579,28 @@ int WriteBatchEntryComparator::CompareKey(uint32_t column_family, const Slice& key2) const { if (column_family < cf_comparators_.size() && cf_comparators_[column_family] != nullptr) { - return cf_comparators_[column_family]->Compare(key1, key2); + return cf_comparators_[column_family]->CompareWithoutTimestamp( + key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false); } else { - return default_comparator_->Compare(key1, key2); + return default_comparator_->CompareWithoutTimestamp( + key1, /*a_has_ts=*/false, key2, /*b_has_ts=*/false); } } +const Comparator* WriteBatchEntryComparator::GetComparator( + const ColumnFamilyHandle* column_family) const { + return column_family ? column_family->GetComparator() : default_comparator_; +} + +const Comparator* WriteBatchEntryComparator::GetComparator( + uint32_t column_family) const { + if (column_family < cf_comparators_.size() && + cf_comparators_[column_family]) { + return cf_comparators_[column_family]; + } + return default_comparator_; +} + WriteEntry WBWIIteratorImpl::Entry() const { WriteEntry ret; Slice blob, xid; @@ -591,6 +614,12 @@ WriteEntry WBWIIteratorImpl::Entry() const { assert(ret.type == kPutRecord || ret.type == kDeleteRecord || ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord || ret.type == kMergeRecord); + // Make sure entry.key does not include user-defined timestamp. + const Comparator* const ucmp = comparator_->GetComparator(column_family_id_); + size_t ts_sz = ucmp->timestamp_size(); + if (ts_sz > 0) { + ret.key = StripTimestampFromUserKey(ret.key, ts_sz); + } return ret; } diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.h b/utilities/write_batch_with_index/write_batch_with_index_internal.h index 4a5a5bc18..cf8c46e5c 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_internal.h +++ b/utilities/write_batch_with_index/write_batch_with_index_internal.h @@ -137,8 +137,11 @@ struct WriteBatchIndexEntry { class ReadableWriteBatch : public WriteBatch { public: - explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0) - : WriteBatch(reserved_bytes, max_bytes) {} + explicit ReadableWriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0, + size_t protection_bytes_per_key = 0, + size_t default_cf_ts_sz = 0) + : WriteBatch(reserved_bytes, max_bytes, protection_bytes_per_key, + default_cf_ts_sz) {} // Retrieve some information from a write entry in the write batch, given // the start offset of the write entry. Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, @@ -168,10 +171,15 @@ class WriteBatchEntryComparator { const Comparator* default_comparator() { return default_comparator_; } + const Comparator* GetComparator( + const ColumnFamilyHandle* column_family) const; + + const Comparator* GetComparator(uint32_t column_family) const; + private: - const Comparator* default_comparator_; + const Comparator* const default_comparator_; std::vector cf_comparators_; - const ReadableWriteBatch* write_batch_; + const ReadableWriteBatch* const write_batch_; }; using WriteBatchEntrySkipList = @@ -179,7 +187,13 @@ using WriteBatchEntrySkipList = class WBWIIteratorImpl : public WBWIIterator { public: - enum Result { kFound, kDeleted, kNotFound, kMergeInProgress, kError }; + enum Result : uint8_t { + kFound, + kDeleted, + kNotFound, + kMergeInProgress, + kError + }; WBWIIteratorImpl(uint32_t column_family_id, WriteBatchEntrySkipList* skip_list, const ReadableWriteBatch* write_batch, @@ -251,9 +265,9 @@ class WBWIIteratorImpl : public WBWIIterator { bool MatchesKey(uint32_t cf_id, const Slice& key); - // Moves the to first entry of the previous key. + // Moves the iterator to first entry of the previous key. void PrevKey(); - // Moves the to first entry of the next key. + // Moves the iterator to first entry of the next key. void NextKey(); // Moves the iterator to the Update (Put or Delete) for the current key @@ -280,6 +294,9 @@ class WBWIIteratorImpl : public WBWIIterator { class WriteBatchWithIndexInternal { public: + static const Comparator* GetUserComparator(const WriteBatchWithIndex& wbwi, + uint32_t cf_id); + // For GetFromBatchAndDB or similar explicit WriteBatchWithIndexInternal(DB* db, ColumnFamilyHandle* column_family); diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 271084ad0..f03933823 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -17,6 +17,7 @@ #include "db/column_family.h" #include "port/stack_trace.h" #include "test_util/testharness.h" +#include "test_util/testutil.h" #include "util/random.h" #include "util/string_util.h" #include "utilities/merge_operators.h" @@ -116,7 +117,8 @@ class KVIter : public Iterator { }; static std::string PrintContents(WriteBatchWithIndex* batch, - ColumnFamilyHandle* column_family) { + ColumnFamilyHandle* column_family, + bool hex = false) { std::string result; WBWIIterator* iter; @@ -132,22 +134,22 @@ static std::string PrintContents(WriteBatchWithIndex* batch, if (e.type == kPutRecord) { result.append("PUT("); - result.append(e.key.ToString()); + result.append(e.key.ToString(hex)); result.append("):"); - result.append(e.value.ToString()); + result.append(e.value.ToString(hex)); } else if (e.type == kMergeRecord) { result.append("MERGE("); - result.append(e.key.ToString()); + result.append(e.key.ToString(hex)); result.append("):"); - result.append(e.value.ToString()); + result.append(e.value.ToString(hex)); } else if (e.type == kSingleDeleteRecord) { result.append("SINGLE-DEL("); - result.append(e.key.ToString()); + result.append(e.key.ToString(hex)); result.append(")"); } else { assert(e.type == kDeleteRecord); result.append("DEL("); - result.append(e.key.ToString()); + result.append(e.key.ToString(hex)); result.append(")"); } @@ -2254,6 +2256,137 @@ TEST_F(WBWIOverwriteTest, TestBadMergeOperator) { ASSERT_OK(batch_->GetFromBatch(column_family, options_, "b", &value)); } +TEST_P(WriteBatchWithIndexTest, ColumnFamilyWithTimestamp) { + ColumnFamilyHandleImplDummy cf2(2, + test::BytewiseComparatorWithU64TsWrapper()); + + // Sanity checks + ASSERT_TRUE(batch_->Put(&cf2, "key", "ts", "value").IsNotSupported()); + ASSERT_TRUE(batch_->Put(/*column_family=*/nullptr, "key", "ts", "value") + .IsInvalidArgument()); + ASSERT_TRUE(batch_->Delete(&cf2, "key", "ts").IsNotSupported()); + ASSERT_TRUE(batch_->Delete(/*column_family=*/nullptr, "key", "ts") + .IsInvalidArgument()); + ASSERT_TRUE(batch_->SingleDelete(&cf2, "key", "ts").IsNotSupported()); + ASSERT_TRUE(batch_->SingleDelete(/*column_family=*/nullptr, "key", "ts") + .IsInvalidArgument()); + { + std::string value; + ASSERT_TRUE(batch_ + ->GetFromBatchAndDB( + /*db=*/nullptr, ReadOptions(), &cf2, "key", &value) + .IsInvalidArgument()); + } + { + constexpr size_t num_keys = 2; + std::array keys{{Slice(), Slice()}}; + std::array pinnable_vals{ + {PinnableSlice(), PinnableSlice()}}; + std::array statuses{{Status(), Status()}}; + constexpr bool sorted_input = false; + batch_->MultiGetFromBatchAndDB(/*db=*/nullptr, ReadOptions(), &cf2, + num_keys, keys.data(), pinnable_vals.data(), + statuses.data(), sorted_input); + for (const auto& s : statuses) { + ASSERT_TRUE(s.IsInvalidArgument()); + } + } + + constexpr uint32_t kMaxKey = 10; + + const auto ts_sz_lookup = [&cf2](uint32_t id) { + if (cf2.GetID() == id) { + return sizeof(uint64_t); + } else { + return std::numeric_limits::max(); + } + }; + + // Put keys + for (uint32_t i = 0; i < kMaxKey; ++i) { + std::string key; + PutFixed32(&key, i); + Status s = batch_->Put(&cf2, key, "value" + std::to_string(i)); + ASSERT_OK(s); + } + + WriteBatch* wb = batch_->GetWriteBatch(); + assert(wb); + ASSERT_OK( + wb->UpdateTimestamps(std::string(sizeof(uint64_t), '\0'), ts_sz_lookup)); + + // Point lookup + for (uint32_t i = 0; i < kMaxKey; ++i) { + std::string value; + std::string key; + PutFixed32(&key, i); + Status s = batch_->GetFromBatch(&cf2, Options(), key, &value); + ASSERT_OK(s); + ASSERT_EQ("value" + std::to_string(i), value); + } + + // Iterator + { + std::unique_ptr it(batch_->NewIterator(&cf2)); + uint32_t start = 0; + for (it->SeekToFirst(); it->Valid(); it->Next(), ++start) { + std::string key; + PutFixed32(&key, start); + ASSERT_OK(it->status()); + ASSERT_EQ(key, it->Entry().key); + ASSERT_EQ("value" + std::to_string(start), it->Entry().value); + ASSERT_EQ(WriteType::kPutRecord, it->Entry().type); + } + ASSERT_EQ(kMaxKey, start); + } + + // Delete the keys with Delete() or SingleDelete() + for (uint32_t i = 0; i < kMaxKey; ++i) { + std::string key; + PutFixed32(&key, i); + Status s; + if (0 == (i % 2)) { + s = batch_->Delete(&cf2, key); + } else { + s = batch_->SingleDelete(&cf2, key); + } + ASSERT_OK(s); + } + + ASSERT_OK(wb->UpdateTimestamps(std::string(sizeof(uint64_t), '\xfe'), + ts_sz_lookup)); + + for (uint32_t i = 0; i < kMaxKey; ++i) { + std::string value; + std::string key; + PutFixed32(&key, i); + Status s = batch_->GetFromBatch(&cf2, Options(), key, &value); + ASSERT_TRUE(s.IsNotFound()); + } + + // Iterator + { + const bool overwrite = GetParam(); + std::unique_ptr it(batch_->NewIterator(&cf2)); + uint32_t start = 0; + for (it->SeekToFirst(); it->Valid(); it->Next(), ++start) { + std::string key; + PutFixed32(&key, start); + ASSERT_EQ(key, it->Entry().key); + if (!overwrite) { + ASSERT_EQ(WriteType::kPutRecord, it->Entry().type); + it->Next(); + ASSERT_TRUE(it->Valid()); + } + if (0 == (start % 2)) { + ASSERT_EQ(WriteType::kDeleteRecord, it->Entry().type); + } else { + ASSERT_EQ(WriteType::kSingleDeleteRecord, it->Entry().type); + } + } + } +} + INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); } // namespace ROCKSDB_NAMESPACE