Ability for rocksdb to compact when flushing the in-memory memtable to a file in L0.
Summary: Rocks accumulates recent writes and deletes in the in-memory memtable. When the memtable is full, it writes the contents on the memtable to a file in L0. This patch removes redundant records at the time of the flush. If there are multiple versions of the same key in the memtable, then only the most recent one is dumped into the output file. The purging of redundant records occur only if the most recent snapshot is earlier than the earliest record in the memtable. Should we switch on this feature by default or should we keep this feature turned off in the default settings? Test Plan: Added test case to db_test.cc Reviewers: sheki, vamsi, emayanke, heyongqiang Reviewed By: sheki CC: leveldb Differential Revision: https://reviews.facebook.net/D8991
This commit is contained in:
parent
4992633751
commit
806e264350
@ -19,11 +19,22 @@ Status BuildTable(const std::string& dbname,
|
||||
const Options& options,
|
||||
TableCache* table_cache,
|
||||
Iterator* iter,
|
||||
FileMetaData* meta) {
|
||||
FileMetaData* meta,
|
||||
const Comparator* user_comparator,
|
||||
const SequenceNumber newest_snapshot,
|
||||
const SequenceNumber earliest_seqno_in_memtable) {
|
||||
Status s;
|
||||
meta->file_size = 0;
|
||||
iter->SeekToFirst();
|
||||
|
||||
// If the sequence number of the smallest entry in the memtable is
|
||||
// smaller than the most recent snapshot, then we do not trigger
|
||||
// removal of duplicate/deleted keys as part of this builder.
|
||||
bool purge = options.purge_redundant_kvs_while_flush;
|
||||
if (earliest_seqno_in_memtable <= newest_snapshot) {
|
||||
purge = false;
|
||||
}
|
||||
|
||||
std::string fname = TableFileName(dbname, meta->number);
|
||||
if (iter->Valid()) {
|
||||
unique_ptr<WritableFile> file;
|
||||
@ -31,13 +42,53 @@ Status BuildTable(const std::string& dbname,
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
TableBuilder* builder = new TableBuilder(options, file.get(), 0);
|
||||
meta->smallest.DecodeFrom(iter->key());
|
||||
for (; iter->Valid(); iter->Next()) {
|
||||
Slice key = iter->key();
|
||||
meta->largest.DecodeFrom(key);
|
||||
builder->Add(key, iter->value());
|
||||
|
||||
// the first key is the smallest key
|
||||
Slice key = iter->key();
|
||||
meta->smallest.DecodeFrom(key);
|
||||
|
||||
if (purge) {
|
||||
ParsedInternalKey prev_ikey;
|
||||
std::string prev_value;
|
||||
std::string prev_key;
|
||||
|
||||
// store first key-value
|
||||
prev_key.assign(key.data(), key.size());
|
||||
prev_value.assign(iter->value().data(), iter->value().size());
|
||||
ParseInternalKey(Slice(prev_key), &prev_ikey);
|
||||
assert(prev_ikey.sequence >= earliest_seqno_in_memtable);
|
||||
|
||||
for (iter->Next(); iter->Valid(); iter->Next()) {
|
||||
ParsedInternalKey this_ikey;
|
||||
Slice key = iter->key();
|
||||
ParseInternalKey(key, &this_ikey);
|
||||
assert(this_ikey.sequence >= earliest_seqno_in_memtable);
|
||||
|
||||
if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) {
|
||||
// This key is different from previous key.
|
||||
// Output prev key and remember current key
|
||||
builder->Add(Slice(prev_key), Slice(prev_value));
|
||||
prev_key.assign(key.data(), key.size());
|
||||
prev_value.assign(iter->value().data(), iter->value().size());
|
||||
ParseInternalKey(Slice(prev_key), &prev_ikey);
|
||||
} else {
|
||||
// seqno within the same key are in decreasing order
|
||||
assert(this_ikey.sequence < prev_ikey.sequence);
|
||||
// This key is an earlier version of the same key in prev_key.
|
||||
// Skip current key.
|
||||
}
|
||||
}
|
||||
// output last key
|
||||
builder->Add(Slice(prev_key), Slice(prev_value));
|
||||
meta->largest.DecodeFrom(Slice(prev_key));
|
||||
|
||||
} else {
|
||||
for (; iter->Valid(); iter->Next()) {
|
||||
Slice key = iter->key();
|
||||
meta->largest.DecodeFrom(key);
|
||||
builder->Add(key, iter->value());
|
||||
}
|
||||
}
|
||||
|
||||
// Finish and check for builder errors
|
||||
|
@ -5,7 +5,9 @@
|
||||
#ifndef STORAGE_LEVELDB_DB_BUILDER_H_
|
||||
#define STORAGE_LEVELDB_DB_BUILDER_H_
|
||||
|
||||
#include "leveldb/comparator.h"
|
||||
#include "leveldb/status.h"
|
||||
#include "leveldb/types.h"
|
||||
|
||||
namespace leveldb {
|
||||
|
||||
@ -27,7 +29,10 @@ extern Status BuildTable(const std::string& dbname,
|
||||
const Options& options,
|
||||
TableCache* table_cache,
|
||||
Iterator* iter,
|
||||
FileMetaData* meta);
|
||||
FileMetaData* meta,
|
||||
const Comparator* user_comparator,
|
||||
const SequenceNumber newest_snapshot,
|
||||
const SequenceNumber earliest_seqno_in_memtable);
|
||||
|
||||
} // namespace leveldb
|
||||
|
||||
|
@ -666,13 +666,18 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
|
||||
meta.number = versions_->NewFileNumber();
|
||||
pending_outputs_.insert(meta.number);
|
||||
Iterator* iter = mem->NewIterator();
|
||||
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
||||
const SequenceNumber earliest_seqno_in_memtable =
|
||||
mem->GetFirstSequenceNumber();
|
||||
Log(options_.info_log, "Level-0 table #%llu: started",
|
||||
(unsigned long long) meta.number);
|
||||
|
||||
Status s;
|
||||
{
|
||||
mutex_.Unlock();
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta);
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta,
|
||||
user_comparator(), newest_snapshot,
|
||||
earliest_seqno_in_memtable);
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
||||
@ -710,6 +715,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
|
||||
*filenumber = meta.number;
|
||||
pending_outputs_.insert(meta.number);
|
||||
Iterator* iter = mem->NewIterator();
|
||||
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
||||
const SequenceNumber earliest_seqno_in_memtable =
|
||||
mem->GetFirstSequenceNumber();
|
||||
Log(options_.info_log, "Level-0 flush table #%llu: started",
|
||||
(unsigned long long) meta.number);
|
||||
|
||||
@ -718,7 +726,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
|
||||
Status s;
|
||||
{
|
||||
mutex_.Unlock();
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta);
|
||||
s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta,
|
||||
user_comparator(), newest_snapshot,
|
||||
earliest_seqno_in_memtable);
|
||||
mutex_.Lock();
|
||||
}
|
||||
base->Unref();
|
||||
|
@ -211,6 +211,7 @@ class DBTest {
|
||||
kNumLevel_3,
|
||||
kDBLogDir,
|
||||
kManifestFileSize,
|
||||
kCompactOnFlush,
|
||||
kEnd
|
||||
};
|
||||
int option_config_;
|
||||
@ -268,6 +269,8 @@ class DBTest {
|
||||
break;
|
||||
case kManifestFileSize:
|
||||
options.max_manifest_file_size = 50; // 50 bytes
|
||||
case kCompactOnFlush:
|
||||
options.purge_redundant_kvs_while_flush = !options.purge_redundant_kvs_while_flush;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -1817,7 +1820,11 @@ TEST(DBTest, DeletionMarkers1) {
|
||||
Put("foo", "v2");
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
|
||||
if (CurrentOptions().purge_redundant_kvs_while_flush) {
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
|
||||
} else {
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
|
||||
}
|
||||
Slice z("z");
|
||||
dbfull()->TEST_CompactRange(last-2, nullptr, &z);
|
||||
// DEL eliminated, but v1 remains because we aren't compacting that level
|
||||
@ -2420,6 +2427,90 @@ TEST(DBTest, SnapshotFiles) {
|
||||
dbfull()->DisableFileDeletions();
|
||||
}
|
||||
|
||||
TEST(DBTest, CompactOnFlush) {
|
||||
Options options = CurrentOptions();
|
||||
options.purge_redundant_kvs_while_flush = true;
|
||||
options.disable_auto_compactions = true;
|
||||
Reopen(&options);
|
||||
|
||||
Put("foo", "v1");
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v1 ]");
|
||||
|
||||
// Write two new keys
|
||||
Put("a", "begin");
|
||||
Put("z", "end");
|
||||
dbfull()->TEST_CompactMemTable();
|
||||
|
||||
// Case1: Delete followed by a put
|
||||
Delete("foo");
|
||||
Put("foo", "v2");
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
|
||||
|
||||
// After the current memtable is flushed, the DEL should
|
||||
// have been removed
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
|
||||
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
|
||||
|
||||
// Case 2: Delete followed by another delete
|
||||
Delete("foo");
|
||||
Delete("foo");
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, DEL, v2 ]");
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v2 ]");
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
|
||||
|
||||
// Case 3: Put followed by a delete
|
||||
Put("foo", "v3");
|
||||
Delete("foo");
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v3 ]");
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL ]");
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
|
||||
|
||||
// Case 4: Put followed by another Put
|
||||
Put("foo", "v4");
|
||||
Put("foo", "v5");
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v5, v4 ]");
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v5 ]");
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v5 ]");
|
||||
|
||||
// clear database
|
||||
Delete("foo");
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
|
||||
|
||||
// Case 5: Put followed by snapshot followed by another Put
|
||||
// Both puts should remain.
|
||||
Put("foo", "v6");
|
||||
const Snapshot* snapshot = db_->GetSnapshot();
|
||||
Put("foo", "v7");
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v7, v6 ]");
|
||||
db_->ReleaseSnapshot(snapshot);
|
||||
|
||||
// clear database
|
||||
Delete("foo");
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
|
||||
|
||||
// Case 5: snapshot followed by a put followed by another Put
|
||||
// Only the last put should remain.
|
||||
const Snapshot* snapshot1 = db_->GetSnapshot();
|
||||
Put("foo", "v8");
|
||||
Put("foo", "v9");
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
ASSERT_EQ(AllEntriesFor("foo"), "[ v9 ]");
|
||||
db_->ReleaseSnapshot(snapshot1);
|
||||
}
|
||||
|
||||
void ListLogFiles(Env* env,
|
||||
const std::string& path,
|
||||
std::vector<uint64_t>* logFiles) {
|
||||
@ -2898,7 +2989,6 @@ static bool CompareIterators(int step,
|
||||
ok = false;
|
||||
}
|
||||
}
|
||||
fprintf(stderr, "%d entries compared: ok=%d\n", count, ok);
|
||||
delete miter;
|
||||
delete dbiter;
|
||||
return ok;
|
||||
@ -2913,9 +3003,6 @@ TEST(DBTest, Randomized) {
|
||||
const Snapshot* db_snap = nullptr;
|
||||
std::string k, v;
|
||||
for (int step = 0; step < N; step++) {
|
||||
if (step % 100 == 0) {
|
||||
fprintf(stderr, "Step %d of %d\n", step, N);
|
||||
}
|
||||
// TODO(sanjay): Test Get() works
|
||||
int p = rnd.Uniform(100);
|
||||
if (p < 45) { // Put
|
||||
|
@ -25,7 +25,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel)
|
||||
flush_in_progress_(false),
|
||||
flush_completed_(false),
|
||||
file_number_(0),
|
||||
edit_(numlevel) {
|
||||
edit_(numlevel),
|
||||
first_seqno_(0) {
|
||||
}
|
||||
|
||||
MemTable::~MemTable() {
|
||||
@ -107,6 +108,12 @@ void MemTable::Add(SequenceNumber s, ValueType type,
|
||||
memcpy(p, value.data(), val_size);
|
||||
assert((p + val_size) - buf == (unsigned)encoded_len);
|
||||
table_.Insert(buf);
|
||||
|
||||
// The first sequence number inserted into the memtable
|
||||
assert(first_seqno_ == 0 || s > first_seqno_);
|
||||
if (first_seqno_ == 0) {
|
||||
first_seqno_ = s;
|
||||
}
|
||||
}
|
||||
|
||||
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
|
||||
|
@ -68,6 +68,10 @@ class MemTable {
|
||||
// Returns the edits area that is needed for flushing the memtable
|
||||
VersionEdit* GetEdits() { return &edit_; }
|
||||
|
||||
// Returns the sequence number of the first element that was inserted
|
||||
// into the memtable
|
||||
SequenceNumber GetFirstSequenceNumber() { return first_seqno_; }
|
||||
|
||||
private:
|
||||
~MemTable(); // Private since only Unref() should be used to delete it
|
||||
|
||||
@ -96,6 +100,9 @@ class MemTable {
|
||||
// memtable is flushed to storage.
|
||||
VersionEdit edit_;
|
||||
|
||||
// The sequence number of the kv that was inserted first
|
||||
SequenceNumber first_seqno_;
|
||||
|
||||
// No copying allowed
|
||||
MemTable(const MemTable&);
|
||||
void operator=(const MemTable&);
|
||||
|
@ -216,7 +216,8 @@ class Repairer {
|
||||
FileMetaData meta;
|
||||
meta.number = next_file_number_++;
|
||||
Iterator* iter = mem->NewIterator();
|
||||
status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
|
||||
status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta,
|
||||
icmp_.user_comparator(), 0, 0);
|
||||
delete iter;
|
||||
mem->Unref();
|
||||
mem = nullptr;
|
||||
|
@ -70,6 +70,14 @@ class SnapshotList {
|
||||
}
|
||||
}
|
||||
|
||||
// get the sequence number of the most recent snapshot
|
||||
const SequenceNumber GetNewest() {
|
||||
if (empty()) {
|
||||
return 0;
|
||||
}
|
||||
return newest()->number_;
|
||||
}
|
||||
|
||||
private:
|
||||
// Dummy head of doubly-linked list of snapshots
|
||||
SnapshotImpl list_;
|
||||
|
@ -380,6 +380,10 @@ struct Options {
|
||||
// as well as prevent overallocation for mounts that preallocate
|
||||
// large amounts of data (such as xfs's allocsize option).
|
||||
size_t manifest_preallocation_size;
|
||||
|
||||
// Purge duplicate/deleted keys when a memtable is flushed to storage.
|
||||
// Default: true
|
||||
bool purge_redundant_kvs_while_flush;
|
||||
};
|
||||
|
||||
// Options that control read operations
|
||||
|
@ -58,7 +58,8 @@ Options::Options()
|
||||
CompactionFilter(nullptr),
|
||||
disable_auto_compactions(false),
|
||||
WAL_ttl_seconds(0),
|
||||
manifest_preallocation_size(4 * 1024 * 1024) {
|
||||
manifest_preallocation_size(4 * 1024 * 1024),
|
||||
purge_redundant_kvs_while_flush(true) {
|
||||
|
||||
}
|
||||
|
||||
@ -101,6 +102,8 @@ Options::Dump(Logger* log) const
|
||||
Log(log," Options.keep_log_file_num: %ld", keep_log_file_num);
|
||||
Log(log," Options.db_stats_log_interval: %d",
|
||||
db_stats_log_interval);
|
||||
Log(log," Options.purge_redundant_kvs_while_flush: %d",
|
||||
purge_redundant_kvs_while_flush);
|
||||
Log(log," Options.compression_opts.window_bits: %d",
|
||||
compression_opts.window_bits);
|
||||
Log(log," Options.compression_opts.level: %d",
|
||||
|
Loading…
x
Reference in New Issue
Block a user