Store range tombstones in memtable
Summary: - Store range tombstones in a separate MemTableRep instantiated with ColumnFamilyOptions::memtable_factory - MemTable::NewRangeTombstoneIterator() returns a MemTableIterator over the separate MemTableRep - Part of the read path is not implemented yet (i.e., MemTable::Get()) Test Plan: see unit tests Reviewers: wanning Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D62217
This commit is contained in:
parent
3c21c64c78
commit
6009c473c7
@ -557,9 +557,9 @@ TEST_F(DBPropertiesTest, NumImmutableMemTable) {
|
||||
ASSERT_EQ(num, "3");
|
||||
ASSERT_TRUE(dbfull()->GetProperty(
|
||||
handles_[1], "rocksdb.cur-size-active-mem-table", &num));
|
||||
// "192" is the size of the metadata of an empty skiplist, this would
|
||||
// break if we change the default skiplist implementation
|
||||
ASSERT_EQ(num, "192");
|
||||
// "384" is the size of the metadata of two empty skiplists, this would
|
||||
// break if we change the default vectorrep/skiplist implementation
|
||||
ASSERT_EQ(num, "384");
|
||||
|
||||
uint64_t int_num;
|
||||
uint64_t base_total_size;
|
||||
|
@ -68,6 +68,9 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
|
||||
table_(ioptions.memtable_factory->CreateMemTableRep(
|
||||
comparator_, &allocator_, ioptions.prefix_extractor,
|
||||
ioptions.info_log)),
|
||||
range_del_table_(ioptions.memtable_factory->CreateMemTableRep(
|
||||
comparator_, &allocator_, nullptr /* transform */,
|
||||
ioptions.info_log)),
|
||||
data_size_(0),
|
||||
num_entries_(0),
|
||||
num_deletes_(0),
|
||||
@ -101,6 +104,7 @@ MemTable::~MemTable() { assert(refs_ == 0); }
|
||||
size_t MemTable::ApproximateMemoryUsage() {
|
||||
size_t arena_usage = arena_.ApproximateMemoryUsage();
|
||||
size_t table_usage = table_->ApproximateMemoryUsage();
|
||||
table_usage += range_del_table_->ApproximateMemoryUsage();
|
||||
// let MAX_USAGE = std::numeric_limits<size_t>::max()
|
||||
// then if arena_usage + total_usage >= MAX_USAGE, return MAX_USAGE.
|
||||
// the following variation is to avoid numeric overflow.
|
||||
@ -122,8 +126,9 @@ bool MemTable::ShouldFlushNow() const {
|
||||
|
||||
// If arena still have room for new block allocation, we can safely say it
|
||||
// shouldn't flush.
|
||||
auto allocated_memory =
|
||||
table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
|
||||
auto allocated_memory = table_->ApproximateMemoryUsage() +
|
||||
range_del_table_->ApproximateMemoryUsage() +
|
||||
arena_.MemoryAllocatedBytes();
|
||||
|
||||
// if we can still allocate one more block without exceeding the
|
||||
// over-allocation ratio, then we should not flush.
|
||||
@ -219,14 +224,16 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
|
||||
class MemTableIterator : public InternalIterator {
|
||||
public:
|
||||
MemTableIterator(const MemTable& mem, const ReadOptions& read_options,
|
||||
Arena* arena)
|
||||
Arena* arena, bool use_range_del_table = false)
|
||||
: bloom_(nullptr),
|
||||
prefix_extractor_(mem.prefix_extractor_),
|
||||
comparator_(mem.comparator_),
|
||||
valid_(false),
|
||||
arena_mode_(arena != nullptr),
|
||||
value_pinned_(!mem.GetMemTableOptions()->inplace_update_support) {
|
||||
if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
|
||||
if (use_range_del_table) {
|
||||
iter_ = mem.range_del_table_->GetIterator(arena);
|
||||
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
|
||||
bloom_ = mem.prefix_bloom_.get();
|
||||
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
|
||||
} else {
|
||||
@ -356,6 +363,14 @@ InternalIterator* MemTable::NewIterator(const ReadOptions& read_options,
|
||||
return new (mem) MemTableIterator(*this, read_options, arena);
|
||||
}
|
||||
|
||||
InternalIterator* MemTable::NewRangeTombstoneIterator(
|
||||
const ReadOptions& read_options, Arena* arena) {
|
||||
assert(arena != nullptr);
|
||||
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
|
||||
return new (mem) MemTableIterator(*this, read_options, arena,
|
||||
true /* use_range_del_table */);
|
||||
}
|
||||
|
||||
port::RWMutex* MemTable::GetLock(const Slice& key) {
|
||||
static murmur_hash hash;
|
||||
return &locks_[hash(key) % locks_.size()];
|
||||
@ -364,6 +379,7 @@ port::RWMutex* MemTable::GetLock(const Slice& key) {
|
||||
uint64_t MemTable::ApproximateSize(const Slice& start_ikey,
|
||||
const Slice& end_ikey) {
|
||||
uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
|
||||
entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
|
||||
if (entry_count == 0) {
|
||||
return 0;
|
||||
}
|
||||
@ -372,9 +388,9 @@ uint64_t MemTable::ApproximateSize(const Slice& start_ikey,
|
||||
return 0;
|
||||
}
|
||||
if (entry_count > n) {
|
||||
// table_->ApproximateNumEntries() is just an estimate so it can be larger
|
||||
// than actual entries we have. Cap it to entries we have to limit the
|
||||
// inaccuracy.
|
||||
// (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
|
||||
// be larger than actual entries we have. Cap it to entries we have to limit
|
||||
// the inaccuracy.
|
||||
entry_count = n;
|
||||
}
|
||||
uint64_t data_size = data_size_.load(std::memory_order_relaxed);
|
||||
@ -397,7 +413,9 @@ void MemTable::Add(SequenceNumber s, ValueType type,
|
||||
internal_key_size + VarintLength(val_size) +
|
||||
val_size;
|
||||
char* buf = nullptr;
|
||||
KeyHandle handle = table_->Allocate(encoded_len, &buf);
|
||||
std::unique_ptr<MemTableRep>& table =
|
||||
type == kTypeRangeDeletion ? range_del_table_ : table_;
|
||||
KeyHandle handle = table->Allocate(encoded_len, &buf);
|
||||
|
||||
char* p = EncodeVarint32(buf, internal_key_size);
|
||||
memcpy(p, key.data(), key_size);
|
||||
@ -409,7 +427,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
|
||||
memcpy(p, value.data(), val_size);
|
||||
assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
|
||||
if (!allow_concurrent) {
|
||||
table_->Insert(handle);
|
||||
table->Insert(handle);
|
||||
|
||||
// this is a bit ugly, but is the way to avoid locked instructions
|
||||
// when incrementing an atomic
|
||||
@ -441,7 +459,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
|
||||
assert(post_process_info == nullptr);
|
||||
UpdateFlushState();
|
||||
} else {
|
||||
table_->InsertConcurrently(handle);
|
||||
table->InsertConcurrently(handle);
|
||||
|
||||
assert(post_process_info != nullptr);
|
||||
post_process_info->num_entries++;
|
||||
|
@ -158,6 +158,9 @@ class MemTable {
|
||||
// those allocated in arena.
|
||||
InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena);
|
||||
|
||||
InternalIterator* NewRangeTombstoneIterator(const ReadOptions& read_options,
|
||||
Arena* arena);
|
||||
|
||||
// Add an entry into memtable that maps key to value at the
|
||||
// specified sequence number and with the specified type.
|
||||
// Typically value will be empty if type==kTypeDeletion.
|
||||
@ -344,6 +347,7 @@ class MemTable {
|
||||
ConcurrentArena arena_;
|
||||
MemTableAllocator allocator_;
|
||||
unique_ptr<MemTableRep> table_;
|
||||
unique_ptr<MemTableRep> range_del_table_;
|
||||
|
||||
// Total data size of all data inserted
|
||||
std::atomic<uint64_t> data_size_;
|
||||
|
@ -43,60 +43,65 @@ static std::string PrintContents(WriteBatch* b) {
|
||||
int single_delete_count = 0;
|
||||
int delete_range_count = 0;
|
||||
int merge_count = 0;
|
||||
Arena arena;
|
||||
ScopedArenaIterator iter(mem->NewIterator(ReadOptions(), &arena));
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ParsedInternalKey ikey;
|
||||
memset((void *)&ikey, 0, sizeof(ikey));
|
||||
EXPECT_TRUE(ParseInternalKey(iter->key(), &ikey));
|
||||
switch (ikey.type) {
|
||||
case kTypeValue:
|
||||
state.append("Put(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(", ");
|
||||
state.append(iter->value().ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
put_count++;
|
||||
break;
|
||||
case kTypeDeletion:
|
||||
state.append("Delete(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
delete_count++;
|
||||
break;
|
||||
case kTypeSingleDeletion:
|
||||
state.append("SingleDelete(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
single_delete_count++;
|
||||
break;
|
||||
case kTypeRangeDeletion:
|
||||
state.append("DeleteRange(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(", ");
|
||||
state.append(iter->value().ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
delete_range_count++;
|
||||
break;
|
||||
case kTypeMerge:
|
||||
state.append("Merge(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(", ");
|
||||
state.append(iter->value().ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
merge_count++;
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
break;
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
Arena arena;
|
||||
auto iter =
|
||||
i == 0 ? ScopedArenaIterator(mem->NewIterator(ReadOptions(), &arena))
|
||||
: ScopedArenaIterator(
|
||||
mem->NewRangeTombstoneIterator(ReadOptions(), &arena));
|
||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||
ParsedInternalKey ikey;
|
||||
memset((void*)&ikey, 0, sizeof(ikey));
|
||||
EXPECT_TRUE(ParseInternalKey(iter->key(), &ikey));
|
||||
switch (ikey.type) {
|
||||
case kTypeValue:
|
||||
state.append("Put(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(", ");
|
||||
state.append(iter->value().ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
put_count++;
|
||||
break;
|
||||
case kTypeDeletion:
|
||||
state.append("Delete(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
delete_count++;
|
||||
break;
|
||||
case kTypeSingleDeletion:
|
||||
state.append("SingleDelete(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
single_delete_count++;
|
||||
break;
|
||||
case kTypeRangeDeletion:
|
||||
state.append("DeleteRange(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(", ");
|
||||
state.append(iter->value().ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
delete_range_count++;
|
||||
break;
|
||||
case kTypeMerge:
|
||||
state.append("Merge(");
|
||||
state.append(ikey.user_key.ToString());
|
||||
state.append(", ");
|
||||
state.append(iter->value().ToString());
|
||||
state.append(")");
|
||||
count++;
|
||||
merge_count++;
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
break;
|
||||
}
|
||||
state.append("@");
|
||||
state.append(NumberToString(ikey.sequence));
|
||||
}
|
||||
state.append("@");
|
||||
state.append(NumberToString(ikey.sequence));
|
||||
}
|
||||
EXPECT_EQ(b->HasPut(), put_count > 0);
|
||||
EXPECT_EQ(b->HasDelete(), delete_count > 0);
|
||||
@ -131,10 +136,10 @@ TEST_F(WriteBatchTest, Multiple) {
|
||||
ASSERT_EQ(100U, WriteBatchInternal::Sequence(&batch));
|
||||
ASSERT_EQ(4, WriteBatchInternal::Count(&batch));
|
||||
ASSERT_EQ(
|
||||
"DeleteRange(bar, foo)@102"
|
||||
"Put(baz, boo)@103"
|
||||
"Delete(box)@101"
|
||||
"Put(foo, bar)@100",
|
||||
"Put(foo, bar)@100"
|
||||
"DeleteRange(bar, foo)@102",
|
||||
PrintContents(&batch));
|
||||
ASSERT_EQ(4, batch.Count());
|
||||
}
|
||||
|
@ -623,6 +623,11 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
||||
// TODO(wanning&andrewkr) add num_tomestone to table properties
|
||||
r->range_del_block.Add(key, value);
|
||||
++r->props.num_entries;
|
||||
r->props.raw_key_size += key.size();
|
||||
r->props.raw_value_size += value.size();
|
||||
NotifyCollectTableCollectorsOnAdd(key, value, r->offset,
|
||||
r->table_properties_collectors,
|
||||
r->ioptions.info_log);
|
||||
} else {
|
||||
assert(false);
|
||||
}
|
||||
|
@ -2432,18 +2432,25 @@ TEST_F(MemTableTest, Simple) {
|
||||
batch.Put(std::string("k2"), std::string("v2"));
|
||||
batch.Put(std::string("k3"), std::string("v3"));
|
||||
batch.Put(std::string("largekey"), std::string("vlarge"));
|
||||
batch.DeleteRange(std::string("chi"), std::string("xigua"));
|
||||
batch.DeleteRange(std::string("begin"), std::string("end"));
|
||||
ColumnFamilyMemTablesDefault cf_mems_default(memtable);
|
||||
ASSERT_TRUE(
|
||||
WriteBatchInternal::InsertInto(&batch, &cf_mems_default, nullptr).ok());
|
||||
|
||||
Arena arena;
|
||||
ScopedArenaIterator iter(memtable->NewIterator(ReadOptions(), &arena));
|
||||
iter->SeekToFirst();
|
||||
while (iter->Valid()) {
|
||||
fprintf(stderr, "key: '%s' -> '%s'\n",
|
||||
iter->key().ToString().c_str(),
|
||||
iter->value().ToString().c_str());
|
||||
iter->Next();
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
Arena arena;
|
||||
ScopedArenaIterator iter =
|
||||
i == 0
|
||||
? ScopedArenaIterator(memtable->NewIterator(ReadOptions(), &arena))
|
||||
: ScopedArenaIterator(
|
||||
memtable->NewRangeTombstoneIterator(ReadOptions(), &arena));
|
||||
iter->SeekToFirst();
|
||||
while (iter->Valid()) {
|
||||
fprintf(stderr, "key: '%s' -> '%s'\n", iter->key().ToString().c_str(),
|
||||
iter->value().ToString().c_str());
|
||||
iter->Next();
|
||||
}
|
||||
}
|
||||
|
||||
delete memtable->Unref();
|
||||
|
Loading…
Reference in New Issue
Block a user