WriteBatchWithIndex to support an option to overwrite rows when operating the same key
Summary: With a new option, when accepting a new key, WriteBatchWithIndex will find an existing index of the same key, and replace the content of it. Test Plan: Add a unit test case. Reviewers: ljin, yhchiang, rven, igor Reviewed By: igor Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D24753
This commit is contained in:
parent
3ead857a0d
commit
f441b273ae
@ -4,6 +4,7 @@
|
|||||||
|
|
||||||
### Public API changes
|
### Public API changes
|
||||||
* Introduce 4 new convenient functions for converting Options from string: GetColumnFamilyOptionsFromMap(), GetColumnFamilyOptionsFromString(), GetDBOptionsFromMap(), GetDBOptionsFromString()
|
* Introduce 4 new convenient functions for converting Options from string: GetColumnFamilyOptionsFromMap(), GetColumnFamilyOptionsFromString(), GetDBOptionsFromMap(), GetDBOptionsFromString()
|
||||||
|
* Remove WriteBatchWithIndex.Delete() overloads using SliceParts
|
||||||
|
|
||||||
|
|
||||||
## 3.6.0 (10/7/2014)
|
## 3.6.0 (10/7/2014)
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
class ColumnFamilyHandle;
|
class ColumnFamilyHandle;
|
||||||
struct SliceParts;
|
|
||||||
class Comparator;
|
class Comparator;
|
||||||
|
|
||||||
enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord };
|
enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord };
|
||||||
@ -62,9 +61,12 @@ class WriteBatchWithIndex {
|
|||||||
// interface, or we can't find a column family from the column family handle
|
// interface, or we can't find a column family from the column family handle
|
||||||
// passed in, backup_index_comparator will be used for the column family.
|
// passed in, backup_index_comparator will be used for the column family.
|
||||||
// reserved_bytes: reserved bytes in underlying WriteBatch
|
// reserved_bytes: reserved bytes in underlying WriteBatch
|
||||||
|
// overwrite_key: if true, overwrite the key in the index when inserting
|
||||||
|
// the same key as previously, so iterator will never
|
||||||
|
// show two entries with the same key.
|
||||||
explicit WriteBatchWithIndex(
|
explicit WriteBatchWithIndex(
|
||||||
const Comparator* backup_index_comparator = BytewiseComparator(),
|
const Comparator* backup_index_comparator = BytewiseComparator(),
|
||||||
size_t reserved_bytes = 0);
|
size_t reserved_bytes = 0, bool overwrite_key = false);
|
||||||
virtual ~WriteBatchWithIndex();
|
virtual ~WriteBatchWithIndex();
|
||||||
|
|
||||||
WriteBatch* GetWriteBatch();
|
WriteBatch* GetWriteBatch();
|
||||||
@ -84,10 +86,6 @@ class WriteBatchWithIndex {
|
|||||||
virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key);
|
virtual void Delete(ColumnFamilyHandle* column_family, const Slice& key);
|
||||||
virtual void Delete(const Slice& key);
|
virtual void Delete(const Slice& key);
|
||||||
|
|
||||||
virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key);
|
|
||||||
|
|
||||||
virtual void Delete(const SliceParts& key);
|
|
||||||
|
|
||||||
// Create an iterator of a column family. User can call iterator.Seek() to
|
// Create an iterator of a column family. User can call iterator.Seek() to
|
||||||
// search to the next entry of or after a key. Keys will be iterated in the
|
// search to the next entry of or after a key. Keys will be iterated in the
|
||||||
// order given by index_comparator. For multiple updates on the same key,
|
// order given by index_comparator. For multiple updates on the same key,
|
||||||
|
@ -45,6 +45,9 @@ class WriteBatchEntryComparator {
|
|||||||
int operator()(const WriteBatchIndexEntry* entry1,
|
int operator()(const WriteBatchIndexEntry* entry1,
|
||||||
const WriteBatchIndexEntry* entry2) const;
|
const WriteBatchIndexEntry* entry2) const;
|
||||||
|
|
||||||
|
int CompareKey(uint32_t column_family, const Slice& key1,
|
||||||
|
const Slice& key2) const;
|
||||||
|
|
||||||
void SetComparatorForCF(uint32_t column_family_id,
|
void SetComparatorForCF(uint32_t column_family_id,
|
||||||
const Comparator* comparator) {
|
const Comparator* comparator) {
|
||||||
cf_comparator_map_[column_family_id] = comparator;
|
cf_comparator_map_[column_family_id] = comparator;
|
||||||
@ -89,6 +92,10 @@ class WBWIIteratorImpl : public WBWIIterator {
|
|||||||
|
|
||||||
virtual Status status() const override { return status_; }
|
virtual Status status() const override { return status_; }
|
||||||
|
|
||||||
|
const WriteBatchIndexEntry* GetRawEntry() const {
|
||||||
|
return skip_list_iter_.key();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
uint32_t column_family_id_;
|
uint32_t column_family_id_;
|
||||||
WriteBatchEntrySkipList::Iterator skip_list_iter_;
|
WriteBatchEntrySkipList::Iterator skip_list_iter_;
|
||||||
@ -123,32 +130,90 @@ class WBWIIteratorImpl : public WBWIIterator {
|
|||||||
};
|
};
|
||||||
|
|
||||||
struct WriteBatchWithIndex::Rep {
|
struct WriteBatchWithIndex::Rep {
|
||||||
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0)
|
Rep(const Comparator* index_comparator, size_t reserved_bytes = 0,
|
||||||
|
bool overwrite_key = false)
|
||||||
: write_batch(reserved_bytes),
|
: write_batch(reserved_bytes),
|
||||||
comparator(index_comparator, &write_batch),
|
comparator(index_comparator, &write_batch),
|
||||||
skip_list(comparator, &arena) {}
|
skip_list(comparator, &arena),
|
||||||
|
overwrite_key(overwrite_key),
|
||||||
|
last_entry_offset(0) {}
|
||||||
ReadableWriteBatch write_batch;
|
ReadableWriteBatch write_batch;
|
||||||
WriteBatchEntryComparator comparator;
|
WriteBatchEntryComparator comparator;
|
||||||
Arena arena;
|
Arena arena;
|
||||||
WriteBatchEntrySkipList skip_list;
|
WriteBatchEntrySkipList skip_list;
|
||||||
|
bool overwrite_key;
|
||||||
|
size_t last_entry_offset;
|
||||||
|
|
||||||
WriteBatchIndexEntry* GetEntry(ColumnFamilyHandle* column_family) {
|
// Remember current offset of internal write batch, which is used as
|
||||||
|
// the starting offset of the next record.
|
||||||
|
void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); }
|
||||||
|
|
||||||
|
// In overwrite mode, find the existing entry for the same key and update it
|
||||||
|
// to point to the current entry.
|
||||||
|
// Return true if the key is found and updated.
|
||||||
|
bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key);
|
||||||
|
bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key);
|
||||||
|
|
||||||
|
// Add the recent entry to the update.
|
||||||
|
// In overwrite mode, if key already exists in the index, update it.
|
||||||
|
void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key);
|
||||||
|
void AddOrUpdateIndex(const Slice& key);
|
||||||
|
|
||||||
|
// Allocate an index entry pointing to the last entry in the write batch and
|
||||||
|
// put it to skip list.
|
||||||
|
void AddNewEntry(uint32_t column_family_id);
|
||||||
|
};
|
||||||
|
|
||||||
|
bool WriteBatchWithIndex::Rep::UpdateExistingEntry(
|
||||||
|
ColumnFamilyHandle* column_family, const Slice& key) {
|
||||||
|
uint32_t cf_id = GetColumnFamilyID(column_family);
|
||||||
|
return UpdateExistingEntryWithCfId(cf_id, key);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId(
|
||||||
|
uint32_t column_family_id, const Slice& key) {
|
||||||
|
if (!overwrite_key) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch);
|
||||||
|
iter.Seek(key);
|
||||||
|
if (!iter.Valid()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
WriteBatchIndexEntry* non_const_entry =
|
||||||
|
const_cast<WriteBatchIndexEntry*>(iter.GetRawEntry());
|
||||||
|
non_const_entry->offset = last_entry_offset;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(
|
||||||
|
ColumnFamilyHandle* column_family, const Slice& key) {
|
||||||
|
if (!UpdateExistingEntry(column_family, key)) {
|
||||||
uint32_t cf_id = GetColumnFamilyID(column_family);
|
uint32_t cf_id = GetColumnFamilyID(column_family);
|
||||||
const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
|
const auto* cf_cmp = GetColumnFamilyUserComparator(column_family);
|
||||||
if (cf_cmp != nullptr) {
|
if (cf_cmp != nullptr) {
|
||||||
comparator.SetComparatorForCF(cf_id, cf_cmp);
|
comparator.SetComparatorForCF(cf_id, cf_cmp);
|
||||||
}
|
}
|
||||||
|
AddNewEntry(cf_id);
|
||||||
return GetEntryWithCfId(cf_id);
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
WriteBatchIndexEntry* GetEntryWithCfId(uint32_t column_family_id) {
|
void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
|
||||||
|
if (!UpdateExistingEntryWithCfId(0, key)) {
|
||||||
|
AddNewEntry(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
|
||||||
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
|
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
|
||||||
auto* index_entry = new (mem)
|
auto* index_entry =
|
||||||
WriteBatchIndexEntry(write_batch.GetDataSize(), column_family_id);
|
new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id);
|
||||||
return index_entry;
|
skip_list.Insert(index_entry);
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
|
Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
|
||||||
WriteType* type, Slice* Key,
|
WriteType* type, Slice* Key,
|
||||||
@ -191,8 +256,9 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
|
|||||||
}
|
}
|
||||||
|
|
||||||
WriteBatchWithIndex::WriteBatchWithIndex(
|
WriteBatchWithIndex::WriteBatchWithIndex(
|
||||||
const Comparator* default_index_comparator, size_t reserved_bytes)
|
const Comparator* default_index_comparator, size_t reserved_bytes,
|
||||||
: rep(new Rep(default_index_comparator, reserved_bytes)) {}
|
bool overwrite_key)
|
||||||
|
: rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {}
|
||||||
|
|
||||||
WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
|
WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; }
|
||||||
|
|
||||||
@ -210,28 +276,28 @@ WBWIIterator* WriteBatchWithIndex::NewIterator(
|
|||||||
|
|
||||||
void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
|
void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family,
|
||||||
const Slice& key, const Slice& value) {
|
const Slice& key, const Slice& value) {
|
||||||
auto* index_entry = rep->GetEntry(column_family);
|
rep->SetLastEntryOffset();
|
||||||
rep->write_batch.Put(column_family, key, value);
|
rep->write_batch.Put(column_family, key, value);
|
||||||
rep->skip_list.Insert(index_entry);
|
rep->AddOrUpdateIndex(column_family, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
|
void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
|
||||||
auto* index_entry = rep->GetEntryWithCfId(0);
|
rep->SetLastEntryOffset();
|
||||||
rep->write_batch.Put(key, value);
|
rep->write_batch.Put(key, value);
|
||||||
rep->skip_list.Insert(index_entry);
|
rep->AddOrUpdateIndex(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
|
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
|
||||||
const Slice& key, const Slice& value) {
|
const Slice& key, const Slice& value) {
|
||||||
auto* index_entry = rep->GetEntry(column_family);
|
rep->SetLastEntryOffset();
|
||||||
rep->write_batch.Merge(column_family, key, value);
|
rep->write_batch.Merge(column_family, key, value);
|
||||||
rep->skip_list.Insert(index_entry);
|
rep->AddOrUpdateIndex(column_family, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
|
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
|
||||||
auto* index_entry = rep->GetEntryWithCfId(0);
|
rep->SetLastEntryOffset();
|
||||||
rep->write_batch.Merge(key, value);
|
rep->write_batch.Merge(key, value);
|
||||||
rep->skip_list.Insert(index_entry);
|
rep->AddOrUpdateIndex(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBatchWithIndex::PutLogData(const Slice& blob) {
|
void WriteBatchWithIndex::PutLogData(const Slice& blob) {
|
||||||
@ -240,28 +306,15 @@ void WriteBatchWithIndex::PutLogData(const Slice& blob) {
|
|||||||
|
|
||||||
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
|
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
|
||||||
const Slice& key) {
|
const Slice& key) {
|
||||||
auto* index_entry = rep->GetEntry(column_family);
|
rep->SetLastEntryOffset();
|
||||||
rep->write_batch.Delete(column_family, key);
|
rep->write_batch.Delete(column_family, key);
|
||||||
rep->skip_list.Insert(index_entry);
|
rep->AddOrUpdateIndex(column_family, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WriteBatchWithIndex::Delete(const Slice& key) {
|
void WriteBatchWithIndex::Delete(const Slice& key) {
|
||||||
auto* index_entry = rep->GetEntryWithCfId(0);
|
rep->SetLastEntryOffset();
|
||||||
rep->write_batch.Delete(key);
|
rep->write_batch.Delete(key);
|
||||||
rep->skip_list.Insert(index_entry);
|
rep->AddOrUpdateIndex(key);
|
||||||
}
|
|
||||||
|
|
||||||
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
|
|
||||||
const SliceParts& key) {
|
|
||||||
auto* index_entry = rep->GetEntry(column_family);
|
|
||||||
rep->write_batch.Delete(column_family, key);
|
|
||||||
rep->skip_list.Insert(index_entry);
|
|
||||||
}
|
|
||||||
|
|
||||||
void WriteBatchWithIndex::Delete(const SliceParts& key) {
|
|
||||||
auto* index_entry = rep->GetEntryWithCfId(0);
|
|
||||||
rep->write_batch.Delete(key);
|
|
||||||
rep->skip_list.Insert(index_entry);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int WriteBatchEntryComparator::operator()(
|
int WriteBatchEntryComparator::operator()(
|
||||||
@ -298,14 +351,7 @@ int WriteBatchEntryComparator::operator()(
|
|||||||
key2 = *(entry2->search_key);
|
key2 = *(entry2->search_key);
|
||||||
}
|
}
|
||||||
|
|
||||||
int cmp;
|
int cmp = CompareKey(entry1->column_family, key1, key2);
|
||||||
auto comparator_for_cf = cf_comparator_map_.find(entry1->column_family);
|
|
||||||
if (comparator_for_cf != cf_comparator_map_.end()) {
|
|
||||||
cmp = comparator_for_cf->second->Compare(key1, key2);
|
|
||||||
} else {
|
|
||||||
cmp = default_comparator_->Compare(key1, key2);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cmp != 0) {
|
if (cmp != 0) {
|
||||||
return cmp;
|
return cmp;
|
||||||
} else if (entry1->offset > entry2->offset) {
|
} else if (entry1->offset > entry2->offset) {
|
||||||
@ -316,4 +362,15 @@ int WriteBatchEntryComparator::operator()(
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
|
||||||
|
const Slice& key1,
|
||||||
|
const Slice& key2) const {
|
||||||
|
auto comparator_for_cf = cf_comparator_map_.find(column_family);
|
||||||
|
if (comparator_for_cf != cf_comparator_map_.end()) {
|
||||||
|
return comparator_for_cf->second->Compare(key1, key2);
|
||||||
|
} else {
|
||||||
|
return default_comparator_->Compare(key1, key2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -318,6 +318,82 @@ TEST(WriteBatchWithIndexTest, TestComparatorForCF) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(WriteBatchWithIndexTest, TestOverwriteKey) {
|
||||||
|
ColumnFamilyHandleImplDummy cf1(6, nullptr);
|
||||||
|
ColumnFamilyHandleImplDummy reverse_cf(66, ReverseBytewiseComparator());
|
||||||
|
ColumnFamilyHandleImplDummy cf2(88, BytewiseComparator());
|
||||||
|
WriteBatchWithIndex batch(BytewiseComparator(), 20, true);
|
||||||
|
|
||||||
|
batch.Put(&cf1, "ddd", "");
|
||||||
|
batch.Merge(&cf1, "ddd", "");
|
||||||
|
batch.Delete(&cf1, "ddd");
|
||||||
|
batch.Put(&cf2, "aaa", "");
|
||||||
|
batch.Delete(&cf2, "aaa");
|
||||||
|
batch.Put(&cf2, "aaa", "aaa");
|
||||||
|
batch.Put(&cf2, "eee", "eee");
|
||||||
|
batch.Put(&cf1, "ccc", "");
|
||||||
|
batch.Put(&reverse_cf, "a11", "");
|
||||||
|
batch.Delete(&cf1, "ccc");
|
||||||
|
batch.Put(&reverse_cf, "a33", "a33");
|
||||||
|
batch.Put(&reverse_cf, "a11", "a11");
|
||||||
|
batch.Delete(&reverse_cf, "a33");
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf1));
|
||||||
|
iter->Seek("");
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("ccc", iter->Entry().key.ToString());
|
||||||
|
ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
|
||||||
|
iter->Next();
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("ddd", iter->Entry().key.ToString());
|
||||||
|
ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
|
||||||
|
iter->Next();
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(!iter->Valid());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&cf2));
|
||||||
|
iter->Seek("");
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("aaa", iter->Entry().key.ToString());
|
||||||
|
ASSERT_EQ("aaa", iter->Entry().value.ToString());
|
||||||
|
iter->Next();
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("eee", iter->Entry().key.ToString());
|
||||||
|
ASSERT_EQ("eee", iter->Entry().value.ToString());
|
||||||
|
iter->Next();
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(!iter->Valid());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
std::unique_ptr<WBWIIterator> iter(batch.NewIterator(&reverse_cf));
|
||||||
|
iter->Seek("");
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(!iter->Valid());
|
||||||
|
|
||||||
|
iter->Seek("z");
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("a33", iter->Entry().key.ToString());
|
||||||
|
ASSERT_TRUE(iter->Entry().type == WriteType::kDeleteRecord);
|
||||||
|
iter->Next();
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(iter->Valid());
|
||||||
|
ASSERT_EQ("a11", iter->Entry().key.ToString());
|
||||||
|
ASSERT_EQ("a11", iter->Entry().value.ToString());
|
||||||
|
iter->Next();
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
ASSERT_TRUE(!iter->Valid());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
|
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }
|
||||||
|
Loading…
Reference in New Issue
Block a user