rocksdb/db/dbformat.cc
Andres Noetzli 014fd55adc Support for SingleDelete()
Summary:
This patch fixes #7460559. It introduces SingleDelete as a new database
operation. This operation can be used to delete keys that were never
overwritten (no put following another put of the same key). If an overwritten
key is single deleted the behavior is undefined. Single deletion of a
non-existent key has no effect but multiple consecutive single deletions are
not allowed (see limitations).

In contrast to the conventional Delete() operation, the deletion entry is
removed along with the value when the two are lined up in a compaction. Note:
The semantics are similar to @igor's prototype that allowed to have this
behavior on the granularity of a column family (
https://reviews.facebook.net/D42093 ). This new patch, however, is more
aggressive when it comes to removing tombstones: It removes the SingleDelete
together with the value whenever there is no snapshot between them while the
older patch only did this when the sequence number of the deletion was older
than the earliest snapshot.

Most of the complex additions are in the Compaction Iterator, all other changes
should be relatively straightforward. The patch also includes basic support for
single deletions in db_stress and db_bench.

Limitations:
- Not compatible with cuckoo hash tables
- Single deletions cannot be used in combination with merges and normal
  deletions on the same key (other keys are not affected by this)
- Consecutive single deletions are currently not allowed (and older version of
  this patch supported this so it could be resurrected if needed)

Test Plan: make all check

Reviewers: yhchiang, sdong, rven, anthony, yoshinorim, igor

Reviewed By: igor

Subscribers: maykov, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43179
2015-09-17 11:42:56 -07:00

163 lines
5.2 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/dbformat.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <stdio.h>
#include "port/port.h"
#include "util/coding.h"
#include "util/perf_context_imp.h"
namespace rocksdb {
uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(IsValueType(t));
return (seq << 8) | t;
}
void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) {
*seq = packed >> 8;
*t = static_cast<ValueType>(packed & 0xff);
assert(*seq <= kMaxSequenceNumber);
assert(IsValueType(*t));
}
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
std::string ParsedInternalKey::DebugString(bool hex) const {
char buf[50];
snprintf(buf, sizeof(buf), "' @ %" PRIu64 ": %d", sequence,
static_cast<int>(type));
std::string result = "'";
result += user_key.ToString(hex);
result += buf;
return result;
}
std::string InternalKey::DebugString(bool hex) const {
std::string result;
ParsedInternalKey parsed;
if (ParseInternalKey(rep_, &parsed)) {
result = parsed.DebugString(hex);
} else {
result = "(bad)";
result.append(EscapeString(rep_));
}
return result;
}
const char* InternalKeyComparator::Name() const {
return name_.c_str();
}
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
PERF_COUNTER_ADD(user_key_comparison_count, 1);
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
int InternalKeyComparator::Compare(const ParsedInternalKey& a,
const ParsedInternalKey& b) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(a.user_key, b.user_key);
PERF_COUNTER_ADD(user_key_comparison_count, 1);
if (r == 0) {
if (a.sequence > b.sequence) {
r = -1;
} else if (a.sequence < b.sequence) {
r = +1;
} else if (a.type > b.type) {
r = -1;
} else if (a.type < b.type) {
r = +1;
}
}
return r;
}
void InternalKeyComparator::FindShortestSeparator(
std::string* start,
const Slice& limit) const {
// Attempt to shorten the user portion of the key
Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if (tmp.size() < user_start.size() &&
user_comparator_->Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0);
start->swap(tmp);
}
}
void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
Slice user_key = ExtractUserKey(*key);
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if (tmp.size() < user_key.size() &&
user_comparator_->Compare(user_key, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp, PackSequenceAndType(kMaxSequenceNumber,kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0);
key->swap(tmp);
}
}
LookupKey::LookupKey(const Slice& _user_key, SequenceNumber s) {
size_t usize = _user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
// NOTE: We don't support users keys of more than 2GB :)
dst = EncodeVarint32(dst, static_cast<uint32_t>(usize + 8));
kstart_ = dst;
memcpy(dst, _user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}
} // namespace rocksdb