From f441b273ae69124455e0ce2341c09d87ca94aed3 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 10 Oct 2014 13:31:28 -0700 Subject: [PATCH] WriteBatchWithIndex to support an option to overwrite rows when operating the same key Summary: With a new option, when accepting a new key, WriteBatchWithIndex will find an existing index of the same key, and replace the content of it. Test Plan: Add a unit test case. Reviewers: ljin, yhchiang, rven, igor Reviewed By: igor Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D24753 --- HISTORY.md | 1 + .../utilities/write_batch_with_index.h | 10 +- .../write_batch_with_index.cc | 147 ++++++++++++------ .../write_batch_with_index_test.cc | 76 +++++++++ 4 files changed, 183 insertions(+), 51 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 06660a0e8..b72bce080 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ ### Public API changes * Introduce 4 new convenient functions for converting Options from string: GetColumnFamilyOptionsFromMap(), GetColumnFamilyOptionsFromString(), GetDBOptionsFromMap(), GetDBOptionsFromString() +* Remove WriteBatchWithIndex.Delete() overloads using SliceParts ## 3.6.0 (10/7/2014) diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 85c80850f..4aafacaf5 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -19,7 +19,6 @@ namespace rocksdb { class ColumnFamilyHandle; -struct SliceParts; class Comparator; enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord }; @@ -62,9 +61,12 @@ class WriteBatchWithIndex { // interface, or we can't find a column family from the column family handle // passed in, backup_index_comparator will be used for the column family. // reserved_bytes: reserved bytes in underlying WriteBatch + // overwrite_key: if true, overwrite the key in the index when inserting + // the same key as previously, so iterator will never + // show two entries with the same key. explicit WriteBatchWithIndex( const Comparator* backup_index_comparator = BytewiseComparator(), - size_t reserved_bytes = 0); + size_t reserved_bytes = 0, bool overwrite_key = false); virtual ~WriteBatchWithIndex(); WriteBatch* GetWriteBatch(); @@ -84,10 +86,6 @@ class WriteBatchWithIndex { virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key); virtual void Delete(const Slice& key); - virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); - - virtual void Delete(const SliceParts& key); - // Create an iterator of a column family. User can call iterator.Seek() to // search to the next entry of or after a key. Keys will be iterated in the // order given by index_comparator. For multiple updates on the same key, diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 481ec6867..8cc5686f6 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -45,6 +45,9 @@ class WriteBatchEntryComparator { int operator()(const WriteBatchIndexEntry* entry1, const WriteBatchIndexEntry* entry2) const; + int CompareKey(uint32_t column_family, const Slice& key1, + const Slice& key2) const; + void SetComparatorForCF(uint32_t column_family_id, const Comparator* comparator) { cf_comparator_map_[column_family_id] = comparator; @@ -89,6 +92,10 @@ class WBWIIteratorImpl : public WBWIIterator { virtual Status status() const override { return status_; } + const WriteBatchIndexEntry* GetRawEntry() const { + return skip_list_iter_.key(); + } + private: uint32_t column_family_id_; WriteBatchEntrySkipList::Iterator skip_list_iter_; @@ -123,32 +130,90 @@ class WBWIIteratorImpl : public WBWIIterator { }; struct WriteBatchWithIndex::Rep { - Rep(const Comparator* index_comparator, size_t reserved_bytes = 0) + Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, + bool overwrite_key = false) : write_batch(reserved_bytes), comparator(index_comparator, &write_batch), - skip_list(comparator, &arena) {} + skip_list(comparator, &arena), + overwrite_key(overwrite_key), + last_entry_offset(0) {} ReadableWriteBatch write_batch; WriteBatchEntryComparator comparator; Arena arena; WriteBatchEntrySkipList skip_list; + bool overwrite_key; + size_t last_entry_offset; - WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) { + // Remember current offset of internal write batch, which is used as + // the starting offset of the next record. + void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); } + + // In overwrite mode, find the existing entry for the same key and update it + // to point to the current entry. + // Return true if the key is found and updated. + bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); + bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); + + // Add the recent entry to the update. + // In overwrite mode, if key already exists in the index, update it. + void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); + void AddOrUpdateIndex(const Slice& key); + + // Allocate an index entry pointing to the last entry in the write batch and + // put it to skip list. + void AddNewEntry(uint32_t column_family_id); +}; + +bool WriteBatchWithIndex::Rep::UpdateExistingEntry( + ColumnFamilyHandle* column_family, const Slice& key) { + uint32_t cf_id = GetColumnFamilyID(column_family); + return UpdateExistingEntryWithCfId(cf_id, key); +} + +bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( + uint32_t column_family_id, const Slice& key) { + if (!overwrite_key) { + return false; + } + + WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch); + iter.Seek(key); + if (!iter.Valid()) { + return false; + } + if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) { + return false; + } + WriteBatchIndexEntry* non_const_entry = + const_cast(iter.GetRawEntry()); + non_const_entry->offset = last_entry_offset; + return true; +} + +void WriteBatchWithIndex::Rep::AddOrUpdateIndex( + ColumnFamilyHandle* column_family, const Slice& key) { + if (!UpdateExistingEntry(column_family, key)) { uint32_t cf_id = GetColumnFamilyID(column_family); const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); if (cf_cmp != nullptr) { comparator.SetComparatorForCF(cf_id, cf_cmp); } - - return GetEntryWithCfId(cf_id); + AddNewEntry(cf_id); } +} - WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) { +void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { + if (!UpdateExistingEntryWithCfId(0, key)) { + AddNewEntry(0); + } +} + +void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); - auto* index_entry = new (mem) - WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id); - return index_entry; + auto* index_entry = + new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id); + skip_list.Insert(index_entry); } -}; Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, @@ -191,8 +256,9 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, } WriteBatchWithIndex::WriteBatchWithIndex( - const Comparator* default_index_comparator, size_t reserved_bytes) - : rep(new Rep(default_index_comparator, reserved_bytes)) {} + const Comparator* default_index_comparator, size_t reserved_bytes, + bool overwrite_key) + : rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {} WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; } @@ -210,28 +276,28 @@ WBWIIterator* WriteBatchWithIndex::NewIterator( void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - auto* index_entry = rep->GetEntry(column_family); + rep->SetLastEntryOffset(); rep->write_batch.Put(column_family, key, value); - rep->skip_list.Insert(index_entry); + rep->AddOrUpdateIndex(column_family, key); } void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { - auto* index_entry = rep->GetEntryWithCfId(0); + rep->SetLastEntryOffset(); rep->write_batch.Put(key, value); - rep->skip_list.Insert(index_entry); + rep->AddOrUpdateIndex(key); } void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - auto* index_entry = rep->GetEntry(column_family); + rep->SetLastEntryOffset(); rep->write_batch.Merge(column_family, key, value); - rep->skip_list.Insert(index_entry); + rep->AddOrUpdateIndex(column_family, key); } void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { - auto* index_entry = rep->GetEntryWithCfId(0); + rep->SetLastEntryOffset(); rep->write_batch.Merge(key, value); - rep->skip_list.Insert(index_entry); + rep->AddOrUpdateIndex(key); } void WriteBatchWithIndex::PutLogData(const Slice& blob) { @@ -240,28 +306,15 @@ void WriteBatchWithIndex::PutLogData(const Slice& blob) { void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, const Slice& key) { - auto* index_entry = rep->GetEntry(column_family); + rep->SetLastEntryOffset(); rep->write_batch.Delete(column_family, key); - rep->skip_list.Insert(index_entry); + rep->AddOrUpdateIndex(column_family, key); } void WriteBatchWithIndex::Delete(const Slice& key) { - auto* index_entry = rep->GetEntryWithCfId(0); + rep->SetLastEntryOffset(); rep->write_batch.Delete(key); - rep->skip_list.Insert(index_entry); -} - -void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, - const SliceParts& key) { - auto* index_entry = rep->GetEntry(column_family); - rep->write_batch.Delete(column_family, key); - rep->skip_list.Insert(index_entry); -} - -void WriteBatchWithIndex::Delete(const SliceParts& key) { - auto* index_entry = rep->GetEntryWithCfId(0); - rep->write_batch.Delete(key); - rep->skip_list.Insert(index_entry); + rep->AddOrUpdateIndex(key); } int WriteBatchEntryComparator::operator()( @@ -298,14 +351,7 @@ int WriteBatchEntryComparator::operator()( key2 = *(entry2->search_key); } - int cmp; - auto comparator_for_cf = cf_comparator_map_.find(entry1->column_family); - if (comparator_for_cf != cf_comparator_map_.end()) { - cmp = comparator_for_cf->second->Compare(key1, key2); - } else { - cmp = default_comparator_->Compare(key1, key2); - } - + int cmp = CompareKey(entry1->column_family, key1, key2); if (cmp != 0) { return cmp; } else if (entry1->offset > entry2->offset) { @@ -316,4 +362,15 @@ int WriteBatchEntryComparator::operator()( return 0; } +int WriteBatchEntryComparator::CompareKey(uint32_t column_family, + const Slice& key1, + const Slice& key2) const { + auto comparator_for_cf = cf_comparator_map_.find(column_family); + if (comparator_for_cf != cf_comparator_map_.end()) { + return comparator_for_cf->second->Compare(key1, key2); + } else { + return default_comparator_->Compare(key1, key2); + } +} + } // namespace rocksdb diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 1152c7b88..b3dbdaa68 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -318,6 +318,82 @@ TEST(WriteBatchWithIndexTest, TestComparatorForCF) { } } +TEST(WriteBatchWithIndexTest, TestOverwriteKey) { + ColumnFamilyHandleImplDummy cf1(6, nullptr); + ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + + batch.Put(&cf1, "ddd", ""); + batch.Merge(&cf1, "ddd", ""); + batch.Delete(&cf1, "ddd"); + batch.Put(&cf2, "aaa", ""); + batch.Delete(&cf2, "aaa"); + batch.Put(&cf2, "aaa", "aaa"); + batch.Put(&cf2, "eee", "eee"); + batch.Put(&cf1, "ccc", ""); + batch.Put(&reverse_cf, "a11", ""); + batch.Delete(&cf1, "ccc"); + batch.Put(&reverse_cf, "a33", "a33"); + batch.Put(&reverse_cf, "a11", "a11"); + batch.Delete(&reverse_cf, "a33"); + + { + std::unique_ptr iter(batch.NewIterator(&cf1)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ccc", iter->Entry().key.ToString()); + ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("ddd", iter->Entry().key.ToString()); + ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr iter(batch.NewIterator(&cf2)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("aaa", iter->Entry().key.ToString()); + ASSERT_EQ("aaa", iter->Entry().value.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("eee", iter->Entry().key.ToString()); + ASSERT_EQ("eee", iter->Entry().value.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + { + std::unique_ptr iter(batch.NewIterator(&reverse_cf)); + iter->Seek(""); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("z"); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a33", iter->Entry().key.ToString()); + ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ("a11", iter->Entry().key.ToString()); + ASSERT_EQ("a11", iter->Entry().value.ToString()); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } +} + } // namespace int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }