Zero out redundant sequence numbers for kvs to increase compression efficiency
Summary: The sequence numbers in each record eat up plenty of space on storage. The optimization zeroes out sequence numbers on kvs in the Lmax layer that are earlier than the earliest snapshot. Test Plan: Unit test attached. Differential Revision: https://reviews.facebook.net/D8619
This commit is contained in:
parent
27e26df665
commit
4564915446
@ -1508,6 +1508,16 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
earliest_snapshot = compact->existing_snapshots[0];
|
||||
}
|
||||
|
||||
// Is this compaction producing files at the bottommost level?
|
||||
bool bottommost_level = true;
|
||||
for (int i = compact->compaction->level() + 2;
|
||||
i < versions_->NumberLevels(); i++) {
|
||||
if (versions_->NumLevelFiles(i) > 0) {
|
||||
bottommost_level = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Allocate the output file numbers before we release the lock
|
||||
AllocateCompactionOutputFileNumbers(compact);
|
||||
|
||||
@ -1621,14 +1631,25 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
|
||||
#if 0
|
||||
Log(options_.info_log,
|
||||
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
|
||||
"%d smallest_snapshot: %d",
|
||||
"%d smallest_snapshot: %d level: %d bottommost %d",
|
||||
ikey.user_key.ToString().c_str(),
|
||||
(int)ikey.sequence, ikey.type, kTypeValue, drop,
|
||||
compact->compaction->IsBaseLevelForKey(ikey.user_key),
|
||||
(int)last_sequence_for_key, (int)compact->smallest_snapshot);
|
||||
(int)last_sequence_for_key, (int)earliest_snapshot,
|
||||
compact->compaction->level(), bottommost_level);
|
||||
#endif
|
||||
|
||||
if (!drop) {
|
||||
|
||||
// Zeroing out the sequence number leads to better compression.
|
||||
// If this is the bottommost level (no files in lower levels)
|
||||
// and the earliest snapshot is larger than this seqno
|
||||
// then we can squash the seqno to zero.
|
||||
if (bottommost_level && ikey.sequence < earliest_snapshot) {
|
||||
assert(ikey.type != kTypeDeletion);
|
||||
UpdateInternalKey(key, (uint64_t)0, ikey.type);
|
||||
}
|
||||
|
||||
// Open output file if necessary
|
||||
if (compact->builder == NULL) {
|
||||
status = OpenCompactionOutputFile(compact);
|
||||
|
@ -1309,7 +1309,6 @@ TEST(DBTest, RepeatedWritesToSameKey) {
|
||||
for (int i = 0; i < 5 * kMaxFiles; i++) {
|
||||
Put("key", value);
|
||||
ASSERT_LE(TotalTableFiles(), kMaxFiles);
|
||||
fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1372,6 +1371,30 @@ TEST(DBTest, CompactionFilter) {
|
||||
ASSERT_NE(NumTableFilesAtLevel(2), 0);
|
||||
cfilter_count = 0;
|
||||
|
||||
// All the files are in the lowest level.
|
||||
// Verify that all but the 100001st record
|
||||
// has sequence number zero. The 100001st record
|
||||
// is at the tip of this snapshot and cannot
|
||||
// be zeroed out.
|
||||
int count = 0;
|
||||
int total = 0;
|
||||
Iterator* iter = dbfull()->TEST_NewInternalIterator();
|
||||
iter->SeekToFirst();
|
||||
ASSERT_EQ(iter->status().ok(), true);
|
||||
while (iter->Valid()) {
|
||||
ParsedInternalKey ikey;
|
||||
ikey.sequence = -1;
|
||||
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
|
||||
total++;
|
||||
if (ikey.sequence != 0) {
|
||||
count++;
|
||||
}
|
||||
iter->Next();
|
||||
}
|
||||
ASSERT_EQ(total, 100001);
|
||||
ASSERT_EQ(count, 1);
|
||||
delete iter;
|
||||
|
||||
// overwrite all the 100K+1 keys once again.
|
||||
for (int i = 0; i < 100001; i++) {
|
||||
char key[100];
|
||||
@ -1427,15 +1450,32 @@ TEST(DBTest, CompactionFilter) {
|
||||
// 100001th key is left in the db. The 100001th key
|
||||
// is part of the default-most-current snapshot and
|
||||
// cannot be deleted.
|
||||
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||
iter = db_->NewIterator(ReadOptions());
|
||||
iter->SeekToFirst();
|
||||
int count = 0;
|
||||
count = 0;
|
||||
while (iter->Valid()) {
|
||||
count++;
|
||||
iter->Next();
|
||||
}
|
||||
ASSERT_EQ(count, 1);
|
||||
delete iter;
|
||||
|
||||
// The sequence number of the remaining record
|
||||
// is not zeroed out even though it is at the
|
||||
// level Lmax because this record is at the tip
|
||||
count = 0;
|
||||
iter = dbfull()->TEST_NewInternalIterator();
|
||||
iter->SeekToFirst();
|
||||
ASSERT_EQ(iter->status().ok(), true);
|
||||
while (iter->Valid()) {
|
||||
ParsedInternalKey ikey;
|
||||
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
|
||||
ASSERT_NE(ikey.sequence, 0);
|
||||
count++;
|
||||
iter->Next();
|
||||
}
|
||||
ASSERT_EQ(count, 1);
|
||||
delete iter;
|
||||
}
|
||||
|
||||
TEST(DBTest, CompactionFilterWithValueChange) {
|
||||
@ -2144,7 +2184,6 @@ TEST(DBTest, NonWritableFileSystem)
|
||||
std::string big(100000, 'x');
|
||||
int errors = 0;
|
||||
for (int i = 0; i < 20; i++) {
|
||||
fprintf(stderr, "iter %d; errors %d\n", i, errors);
|
||||
if (!Put("foo", big).ok()) {
|
||||
errors++;
|
||||
env_->SleepForMicroseconds(100000);
|
||||
|
@ -157,6 +157,16 @@ inline bool ParseInternalKey(const Slice& internal_key,
|
||||
return (c <= static_cast<unsigned char>(kTypeValue));
|
||||
}
|
||||
|
||||
// Update the sequence number in the internal key
|
||||
inline void UpdateInternalKey(const Slice& internal_key,
|
||||
uint64_t seq, ValueType t) {
|
||||
const size_t n = internal_key.size();
|
||||
assert(n >= 8);
|
||||
char* seqtype = (char *)internal_key.data() + n - 8;
|
||||
uint64_t newval = (seq << 8) | t;
|
||||
EncodeFixed64(seqtype, newval);
|
||||
}
|
||||
|
||||
// A helper class useful for DBImpl::Get()
|
||||
class LookupKey {
|
||||
public:
|
||||
|
Loading…
Reference in New Issue
Block a user