Fox db_stress crash by copying keys before changing sequencenum to zero.

Summary:
The compaction process zeros out sequence numbers if the output is
part of the bottommost level.
The Slice is supposed to refer to an immutable data buffer. The
merger that implements the priority queue while reading kvs as
the input of a compaction run reies on this fact. The bug was that
were updating the sequence number of a record in-place and that was
causing suceeding invocations of the merger to return kvs in
arbitrary order of sequence numbers.
The fix is to copy the key to a local memory buffer before setting
its seqno to 0.

Test Plan:
Set Options.purge_redundant_kvs_while_flush = false and then run
db_stress --ops_per_thread=1000 --max_key=320

Reviewers: emayanke, sheki

Reviewed By: emayanke

CC: leveldb

Differential Revision: https://reviews.facebook.net/D9147
This commit is contained in:
Dhruba Borthakur 2013-03-05 21:53:28 -08:00
parent 2fb47d661a
commit afed60938f
2 changed files with 18 additions and 8 deletions

View File

@ -1656,15 +1656,25 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (!drop) { if (!drop) {
char* kptr = (char*)key.data();
std::string kstr;
// Zeroing out the sequence number leads to better compression. // Zeroing out the sequence number leads to better compression.
// If this is the bottommost level (no files in lower levels) // If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno // and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero. // then we can squash the seqno to zero.
if (bottommost_level && ikey.sequence < earliest_snapshot) { if (bottommost_level && ikey.sequence < earliest_snapshot) {
assert(ikey.type != kTypeDeletion); assert(ikey.type != kTypeDeletion);
UpdateInternalKey(key, (uint64_t)0, ikey.type); // make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
kstr.assign(key.data(), key.size());
kptr = (char *)kstr.c_str();
UpdateInternalKey(kptr, key.size(), (uint64_t)0, ikey.type);
} }
Slice newkey(kptr, key.size());
assert((key.clear(), 1)); // we do not need 'key' anymore
// Open output file if necessary // Open output file if necessary
if (compact->builder == nullptr) { if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact); status = OpenCompactionOutputFile(compact);
@ -1673,10 +1683,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
} }
if (compact->builder->NumEntries() == 0) { if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key); compact->current_output()->smallest.DecodeFrom(newkey);
} }
compact->current_output()->largest.DecodeFrom(key); compact->current_output()->largest.DecodeFrom(newkey);
compact->builder->Add(key, value); compact->builder->Add(newkey, value);
// Close output file if it is big enough // Close output file if it is big enough
if (compact->builder->FileSize() >= if (compact->builder->FileSize() >=

View File

@ -158,11 +158,11 @@ inline bool ParseInternalKey(const Slice& internal_key,
} }
// Update the sequence number in the internal key // Update the sequence number in the internal key
inline void UpdateInternalKey(const Slice& internal_key, inline void UpdateInternalKey(char* internal_key,
const size_t internal_key_size,
uint64_t seq, ValueType t) { uint64_t seq, ValueType t) {
const size_t n = internal_key.size(); assert(internal_key_size >= 8);
assert(n >= 8); char* seqtype = internal_key + internal_key_size - 8;
char* seqtype = (char *)internal_key.data() + n - 8;
uint64_t newval = (seq << 8) | t; uint64_t newval = (seq << 8) | t;
EncodeFixed64(seqtype, newval); EncodeFixed64(seqtype, newval);
} }