WriteBatch support for range deletion

Summary:
Add API to WriteBatch to store range deletions in its buffer
which are later added to memtable. In the WriteBatch buffer, a range
deletion is encoded as "<optype><CF ID (optional)><begin key><end key>".

With this diff, the range tombstones are stored inline with the data in
the memtable. It's useful for now because the test cases rely on the
data being accessible via memtable. My next step is to store range
tombstones in a separate area in the memtable.

Test Plan: unit tests

Reviewers: IslamAbdelRahman, sdong, wanning

Reviewed By: wanning

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D61401
This commit is contained in:
Andrew Kryczka 2016-08-16 08:16:04 -07:00
parent 236756f2cf
commit 3771e37970
12 changed files with 251 additions and 22 deletions

View File

@ -22,7 +22,7 @@ namespace rocksdb {
uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(IsValueType(t));
assert(IsExtendedValueType(t));
return (seq << 8) | t;
}
@ -31,7 +31,7 @@ void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) {
*t = static_cast<ValueType>(packed & 0xff);
assert(*seq <= kMaxSequenceNumber);
assert(IsValueType(*t));
assert(IsExtendedValueType(*t));
}
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {

View File

@ -10,6 +10,7 @@
#pragma once
#include <stdio.h>
#include <string>
#include <utility>
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/filter_policy.h"
@ -44,6 +45,8 @@ enum ValueType : unsigned char {
kTypeCommitXID = 0xB, // WAL only.
kTypeRollbackXID = 0xC, // WAL only.
kTypeNoop = 0xD, // WAL only.
kTypeColumnFamilyRangeDeletion = 0xE, // WAL only.
kTypeRangeDeletion = 0xF, // meta block
kMaxValue = 0x7F // Not used for storing records.
};
@ -55,12 +58,18 @@ enum ValueType : unsigned char {
// ValueType, not the lowest).
static const ValueType kValueTypeForSeek = kTypeSingleDeletion;
// Checks whether a type is a value type (i.e. a type used in memtables and sst
// files).
// Checks whether a type is an inline value type
// (i.e. a type used in memtable skiplist and sst file datablock).
inline bool IsValueType(ValueType t) {
return t <= kTypeMerge || t == kTypeSingleDeletion;
}
// Checks whether a type is from user operation
// kTypeRangeDeletion is in meta block so this API is separated from above
inline bool IsExtendedValueType(ValueType t) {
return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeRangeDeletion;
}
// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
static const SequenceNumber kMaxSequenceNumber =
@ -208,7 +217,7 @@ inline bool ParseInternalKey(const Slice& internal_key,
result->type = static_cast<ValueType>(c);
assert(result->type <= ValueType::kMaxValue);
result->user_key = Slice(internal_key.data(), n - 8);
return IsValueType(result->type);
return IsExtendedValueType(result->type);
}
// Update the sequence number in the internal key.

View File

@ -64,6 +64,7 @@ enum ContentFlags : uint32_t {
HAS_END_PREPARE = 1 << 6,
HAS_COMMIT = 1 << 7,
HAS_ROLLBACK = 1 << 8,
HAS_DELETE_RANGE = 1 << 9,
};
struct BatchContentClassifier : public WriteBatch::Handler {
@ -84,6 +85,11 @@ struct BatchContentClassifier : public WriteBatch::Handler {
return Status::OK();
}
Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE_RANGE;
return Status::OK();
}
Status MergeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_MERGE;
return Status::OK();
@ -219,6 +225,10 @@ bool WriteBatch::HasSingleDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
}
bool WriteBatch::HasDeleteRange() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
}
bool WriteBatch::HasMerge() const {
return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
}
@ -287,6 +297,18 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeColumnFamilyRangeDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch DeleteRange");
}
// intentional fallthrough
case kTypeRangeDeletion:
// for range delete, "key" is begin_key, "value" is end_key
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch DeleteRange");
}
break;
case kTypeColumnFamilyMerge:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Merge");
@ -370,6 +392,13 @@ Status WriteBatch::Iterate(Handler* handler) const {
s = handler->SingleDeleteCF(column_family, key);
found++;
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
found++;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
assert(content_flags_.load(std::memory_order_relaxed) &
@ -598,6 +627,53 @@ void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, begin_key);
PutLengthPrefixedSlice(&b->rep_, end_key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed);
}
void WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key);
}
void WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const SliceParts& begin_key,
const SliceParts& end_key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, begin_key);
PutLengthPrefixedSliceParts(&b->rep_, end_key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed);
}
void WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key,
const SliceParts& end_key) {
WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key);
}
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
@ -836,9 +912,9 @@ class MemTableInserter : public WriteBatch::Handler {
}
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
ValueType delete_type) {
const Slice& value, ValueType delete_type) {
MemTable* mem = cf_mems_->GetMemTable();
mem->Add(sequence_, delete_type, key, Slice(), concurrent_memtable_writes_,
mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
sequence_++;
CheckMemtableFull();
@ -858,7 +934,7 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
return DeleteImpl(column_family_id, key, kTypeDeletion);
return DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
@ -874,7 +950,25 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
return DeleteImpl(column_family_id, key, kTypeSingleDeletion);
return DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
}
virtual Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
return DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,

View File

@ -56,6 +56,23 @@ void WriteBatchBase::SingleDelete(const SliceParts& key) {
SingleDelete(key_slice);
}
void WriteBatchBase::DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key,
const SliceParts& end_key) {
std::string begin_key_buf, end_key_buf;
Slice begin_key_slice(begin_key, &begin_key_buf);
Slice end_key_slice(end_key, &end_key_buf);
DeleteRange(column_family, begin_key_slice, end_key_slice);
}
void WriteBatchBase::DeleteRange(const SliceParts& begin_key,
const SliceParts& end_key) {
std::string begin_key_buf, end_key_buf;
Slice begin_key_slice(begin_key, &begin_key_buf);
Slice end_key_slice(end_key, &end_key_buf);
DeleteRange(begin_key_slice, end_key_slice);
}
void WriteBatchBase::Merge(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf;

View File

@ -86,6 +86,13 @@ class WriteBatchInternal {
static void SingleDelete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key);
static void DeleteRange(WriteBatch* b, uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key);
static void DeleteRange(WriteBatch* b, uint32_t column_family_id,
const SliceParts& begin_key,
const SliceParts& end_key);
static void Merge(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);

View File

@ -42,6 +42,7 @@ static std::string PrintContents(WriteBatch* b) {
int put_count = 0;
int delete_count = 0;
int single_delete_count = 0;
int delete_range_count = 0;
int merge_count = 0;
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena));
@ -73,6 +74,15 @@ static std::string PrintContents(WriteBatch* b) {
count++;
single_delete_count++;
break;
case kTypeRangeDeletion:
state.append("DeleteRange(");
state.append(ikey.user_key.ToString());
state.append(", ");
state.append(iter->value().ToString());
state.append(")");
count++;
delete_range_count++;
break;
case kTypeMerge:
state.append("Merge(");
state.append(ikey.user_key.ToString());
@ -92,6 +102,7 @@ static std::string PrintContents(WriteBatch* b) {
EXPECT_EQ(b->HasPut(), put_count > 0);
EXPECT_EQ(b->HasDelete(), delete_count > 0);
EXPECT_EQ(b->HasSingleDelete(), single_delete_count > 0);
EXPECT_EQ(b->HasDeleteRange(), delete_range_count > 0);
EXPECT_EQ(b->HasMerge(), merge_count > 0);
if (!s.ok()) {
state.append(s.ToString());
@ -115,15 +126,18 @@ TEST_F(WriteBatchTest, Multiple) {
WriteBatch batch;
batch.Put(Slice("foo"), Slice("bar"));
batch.Delete(Slice("box"));
batch.DeleteRange(Slice("bar"), Slice("foo"));
batch.Put(Slice("baz"), Slice("boo"));
WriteBatchInternal::SetSequence(&batch, 100);
ASSERT_EQ(100U, WriteBatchInternal::Sequence(&batch));
ASSERT_EQ(3, WriteBatchInternal::Count(&batch));
ASSERT_EQ("Put(baz, boo)@102"
ASSERT_EQ(4, WriteBatchInternal::Count(&batch));
ASSERT_EQ(
"DeleteRange(bar, foo)@102"
"Put(baz, boo)@103"
"Delete(box)@101"
"Put(foo, bar)@100",
PrintContents(&batch));
ASSERT_EQ(3, batch.Count());
ASSERT_EQ(4, batch.Count());
}
TEST_F(WriteBatchTest, Corruption) {
@ -218,6 +232,18 @@ namespace {
}
return Status::OK();
}
virtual Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) override {
if (column_family_id == 0) {
seen += "DeleteRange(" + begin_key.ToString() + ", " +
end_key.ToString() + ")";
} else {
seen += "DeleteRangeCF(" + ToString(column_family_id) + ", " +
begin_key.ToString() + ", " + end_key.ToString() + ")";
}
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (column_family_id == 0) {
@ -563,6 +589,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
batch.Delete(&eight, Slice("eightfoo"));
batch.SingleDelete(&two, Slice("twofoo"));
batch.DeleteRange(&two, Slice("3foo"), Slice("4foo"));
batch.Merge(&three, Slice("threethree"), Slice("3three"));
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Merge(Slice("omom"), Slice("nom"));
@ -575,6 +602,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
"PutCF(8, eightfoo, bar8)"
"DeleteCF(8, eightfoo)"
"SingleDeleteCF(2, twofoo)"
"DeleteRangeCF(2, 3foo, 4foo)"
"MergeCF(3, threethree, 3three)"
"Put(foo, bar)"
"Merge(omom, nom)",
@ -590,6 +618,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
batch.Delete(&eight, Slice("eightfoo"));
batch.SingleDelete(&two, Slice("twofoo"));
batch.DeleteRange(&two, Slice("twofoo"), Slice("threefoo"));
batch.Merge(&three, Slice("threethree"), Slice("3three"));
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Merge(Slice("omom"), Slice("nom"));
@ -628,6 +657,13 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
ASSERT_EQ(WriteType::kSingleDeleteRecord, iter->Entry().type);
ASSERT_EQ("twofoo", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kDeleteRangeRecord, iter->Entry().type);
ASSERT_EQ("twofoo", iter->Entry().key.ToString());
ASSERT_EQ("threefoo", iter->Entry().value.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
@ -678,6 +714,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
"PutCF(8, eightfoo, bar8)"
"DeleteCF(8, eightfoo)"
"SingleDeleteCF(2, twofoo)"
"DeleteRangeCF(2, twofoo, threefoo)"
"MergeCF(3, threethree, 3three)"
"Put(foo, bar)"
"Merge(omom, nom)",

View File

@ -185,8 +185,8 @@ void WriteThread::SetState(Writer* w, uint8_t new_state) {
void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
assert(w->state == STATE_INIT);
Writer* writers = newest_writer_.load(std::memory_order_relaxed);
while (true) {
Writer* writers = newest_writer_.load(std::memory_order_relaxed);
w->link_older = writers;
if (newest_writer_.compare_exchange_strong(writers, w)) {
if (writers == nullptr) {

View File

@ -34,6 +34,7 @@ enum WriteType {
kMergeRecord,
kDeleteRecord,
kSingleDeleteRecord,
kDeleteRangeRecord,
kLogDataRecord,
kXIDRecord,
};
@ -113,6 +114,11 @@ class WriteBatchWithIndex : public WriteBatchBase {
const Slice& key) override;
void SingleDelete(const Slice& key) override;
using WriteBatchBase::DeleteRange;
void DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key) override;
void DeleteRange(const Slice& begin_key, const Slice& end_key) override;
using WriteBatchBase::PutLogData;
void PutLogData(const Slice& blob) override;

View File

@ -84,6 +84,23 @@ class WriteBatch : public WriteBatchBase {
SingleDelete(nullptr, key);
}
using WriteBatchBase::DeleteRange;
// WriteBatch implementation of DB::DeleteRange(). See db.h.
void DeleteRange(ColumnFamilyHandle* column_family, const Slice& begin_key,
const Slice& end_key) override;
void DeleteRange(const Slice& begin_key, const Slice& end_key) override {
DeleteRange(nullptr, begin_key, end_key);
}
// variant that takes SliceParts
void DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key,
const SliceParts& end_key) override;
void DeleteRange(const SliceParts& begin_key,
const SliceParts& end_key) override {
DeleteRange(nullptr, begin_key, end_key);
}
using WriteBatchBase::Merge;
// Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)"
@ -132,6 +149,10 @@ class WriteBatch : public WriteBatchBase {
class Handler {
public:
virtual ~Handler();
// All handler functions in this class provide default implementations so
// we won't break existing clients of Handler on a source code level when
// adding a new member function.
// default implementation will just call Put without column family for
// backwards compatibility. If the column family is not default,
// the function is noop
@ -169,9 +190,11 @@ class WriteBatch : public WriteBatchBase {
}
virtual void SingleDelete(const Slice& /*key*/) {}
// Merge and LogData are not pure virtual. Otherwise, we would break
// existing clients of Handler on a source code level. The default
// implementation of Merge does nothing.
virtual Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) {
return Status::InvalidArgument("DeleteRangeCF not implemented");
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (column_family_id == 0) {
@ -228,7 +251,10 @@ class WriteBatch : public WriteBatchBase {
// Returns true if SingleDeleteCF will be called during Iterate
bool HasSingleDelete() const;
// Returns trie if MergeCF will be called during Iterate
// Returns true if DeleteRangeCF will be called during Iterate
bool HasDeleteRange() const;
// Returns true if MergeCF will be called during Iterate
bool HasMerge() const;
// Returns true if MarkBeginPrepare will be called during Iterate

View File

@ -65,6 +65,19 @@ class WriteBatchBase {
const SliceParts& key);
virtual void SingleDelete(const SliceParts& key);
// If the database contains mappings in the range ["begin_key", "end_key"],
// erase them. Else do nothing.
virtual void DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) = 0;
virtual void DeleteRange(const Slice& begin_key, const Slice& end_key) = 0;
// variant that takes SliceParts
virtual void DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key,
const SliceParts& end_key);
virtual void DeleteRange(const SliceParts& begin_key,
const SliceParts& end_key);
// Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular,
// it will not be persisted to the SST files. When iterating over this

View File

@ -349,7 +349,8 @@ class WBWIIteratorImpl : public WBWIIterator {
iter_entry->offset, &ret.type, &ret.key, &ret.value, &blob, &xid);
assert(s.ok());
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
ret.type == kSingleDeleteRecord || ret.type == kMergeRecord);
ret.type == kSingleDeleteRecord || ret.type == kDeleteRangeRecord ||
ret.type == kMergeRecord);
return ret;
}
@ -628,6 +629,21 @@ void WriteBatchWithIndex::SingleDelete(const Slice& key) {
rep->AddOrUpdateIndex(key);
}
void WriteBatchWithIndex::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key,
const Slice& end_key) {
rep->SetLastEntryOffset();
rep->write_batch.DeleteRange(column_family, begin_key, end_key);
rep->AddOrUpdateIndex(column_family, begin_key);
}
void WriteBatchWithIndex::DeleteRange(const Slice& begin_key,
const Slice& end_key) {
rep->SetLastEntryOffset();
rep->write_batch.DeleteRange(begin_key, end_key);
rep->AddOrUpdateIndex(begin_key);
}
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();

View File

@ -58,6 +58,10 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
case kTypeSingleDeletion:
*type = kSingleDeleteRecord;
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
*type = kDeleteRangeRecord;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
*type = kMergeRecord;