Add timestamp to delete (#6253)

Summary:
Preliminary user-timestamp support for delete.

If ["a", ts=100] exists, you can delete it by calling `DB::Delete(write_options, key)` in which `write_options.timestamp` points to a `ts` higher than 100.

Implementation
A new ValueType, i.e. `kTypeDeletionWithTimestamp` is added for deletion marker with timestamp.
The reason for a separate `kTypeDeletionWithTimestamp`: RocksDB may drop tombstones (keys with kTypeDeletion) when compacting them to the bottom level. This is OK and useful if timestamp is disabled. When timestamp is enabled, should we still reuse `kTypeDeletion`, we may drop the tombstone with a more recent timestamp, causing deleted keys to re-appear.

Test plan (dev server)
```
make check
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6253

Reviewed By: ltamasi

Differential Revision: D20995328

Pulled By: riversand963

fbshipit-source-id: a9e5c22968ad76f98e3dc6ee0151265a3f0df619
This commit is contained in:
Yanqin Jin 2020-05-28 10:37:57 -07:00 committed by Facebook GitHub Bot
parent e3f953a863
commit 961c7590d6
8 changed files with 297 additions and 16 deletions

View File

@ -1837,8 +1837,30 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatch batch;
batch.Delete(column_family, key);
if (nullptr == opt.timestamp) {
WriteBatch batch;
Status s = batch.Delete(column_family, key);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
const Slice* ts = opt.timestamp;
assert(ts != nullptr);
const size_t ts_sz = ts->size();
constexpr size_t kKeyAndValueLenSize = 11;
constexpr size_t kWriteBatchOverhead =
WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize;
WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0,
ts_sz);
Status s = batch.Delete(column_family, key);
if (!s.ok()) {
return s;
}
s = batch.AssignTimestamp(*ts);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}

View File

@ -258,7 +258,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
// prone to bugs causing the same user key with the same sequence number.
// Note that with current timestamp implementation, the same user key can
// have different timestamps and zero sequence number on the bottommost
// level. This will change in the future.
// level. This may change in the future.
if ((!is_prev_key_seqnum_zero || timestamp_size_ > 0) &&
skipping_saved_key &&
CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) {
@ -276,6 +276,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
reseek_done = false;
switch (ikey_.type) {
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.

View File

@ -449,6 +449,65 @@ TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
Close();
}
// Create two L0, and compact them to a new L1. In this test, L1 is L_bottom.
// Two L0s:
// f1 f2
// <a, 1, kTypeValue> <a, 3, kTypeDeletionWithTimestamp>...<b, 2, kTypeValue>
// Since f2.smallest < f1.largest < f2.largest
// f1 and f2 will be the inputs of a real compaction instead of trivial move.
TEST_F(DBBasicTestWithTimestamp, CompactDeletionWithTimestampMarkerToBottom) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.num_levels = 2;
options.level0_file_num_compaction_trigger = 2;
DestroyAndReopen(options);
WriteOptions write_opts;
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "a", "value0"));
ASSERT_OK(Flush());
ts_str = Timestamp(2, 0);
ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Put(write_opts, "b", "value0"));
ts_str = Timestamp(3, 0);
ts = ts_str;
write_opts.timestamp = &ts;
ASSERT_OK(db_->Delete(write_opts, "a"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ReadOptions read_opts;
ts_str = Timestamp(1, 0);
ts = ts_str;
read_opts.timestamp = &ts;
std::string value;
Status s = db_->Get(read_opts, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("value0", value);
ts_str = Timestamp(3, 0);
ts = ts_str;
read_opts.timestamp = &ts;
s = db_->Get(read_opts, "a", &value);
ASSERT_TRUE(s.IsNotFound());
// Time-travel to the past before deletion
ts_str = Timestamp(2, 0);
ts = ts_str;
read_opts.timestamp = &ts;
s = db_->Get(read_opts, "a", &value);
ASSERT_OK(s);
ASSERT_EQ("value0", value);
Close();
}
class DBBasicTestWithTimestampCompressionSettings
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<
@ -536,6 +595,104 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
Close();
}
TEST_P(DBBasicTestWithTimestampCompressionSettings, PutDeleteGet) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
const int kNumKeysPerFile = 1024;
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
BlockBasedTableOptions bbto;
bbto.filter_policy = std::get<0>(GetParam());
bbto.whole_key_filtering = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
const CompressionType comp_type = std::get<1>(GetParam());
#if LZ4_VERSION_NUMBER < 10400 // r124+
if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
return;
}
#endif // LZ4_VERSION_NUMBER >= 10400
if (!ZSTD_Supported() && comp_type == kZSTD) {
return;
}
if (!Zlib_Supported() && comp_type == kZlibCompression) {
return;
}
options.compression = comp_type;
options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
if (comp_type == kZSTD) {
options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
}
options.compression_opts.parallel_threads = std::get<3>(GetParam());
options.target_file_size_base = 1 << 26; // 64MB
DestroyAndReopen(options);
const size_t kNumL0Files =
static_cast<size_t>(Options().level0_file_num_compaction_trigger);
{
// Generate enough L0 files with ts=1 to trigger compaction to L1
std::string ts_str = Timestamp(1, 0);
Slice ts = ts_str;
WriteOptions wopts;
wopts.timestamp = &ts;
for (size_t i = 0; i != kNumL0Files; ++i) {
for (int j = 0; j != kNumKeysPerFile; ++j) {
ASSERT_OK(db_->Put(wopts, Key1(j), "value" + std::to_string(i)));
}
ASSERT_OK(db_->Flush(FlushOptions()));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Generate another L0 at ts=3
ts_str = Timestamp(3, 0);
ts = ts_str;
wopts.timestamp = &ts;
for (int i = 0; i != kNumKeysPerFile; ++i) {
std::string key_str = Key1(i);
Slice key(key_str);
if ((i % 3) == 0) {
ASSERT_OK(db_->Delete(wopts, key));
} else {
ASSERT_OK(db_->Put(wopts, key, "new_value"));
}
}
ASSERT_OK(db_->Flush(FlushOptions()));
// Populate memtable at ts=5
ts_str = Timestamp(5, 0);
ts = ts_str;
wopts.timestamp = &ts;
for (int i = 0; i != kNumKeysPerFile; ++i) {
std::string key_str = Key1(i);
Slice key(key_str);
if ((i % 3) == 1) {
ASSERT_OK(db_->Delete(wopts, key));
} else if ((i % 3) == 2) {
ASSERT_OK(db_->Put(wopts, key, "new_value_2"));
}
}
}
{
std::string ts_str = Timestamp(6, 0);
Slice ts = ts_str;
ReadOptions ropts;
ropts.timestamp = &ts;
for (uint64_t i = 0; i != static_cast<uint64_t>(kNumKeysPerFile); ++i) {
std::string value;
Status s = db_->Get(ropts, Key1(i), &value);
if ((i % 3) == 2) {
ASSERT_OK(s);
ASSERT_EQ("new_value_2", value);
} else {
ASSERT_TRUE(s.IsNotFound());
}
}
}
}
#ifndef ROCKSDB_LITE
// A class which remembers the name of each flushed file.
class FlushedFileCollector : public EventListener {
@ -870,15 +1027,10 @@ TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
for (size_t i = 0; i != write_ts_list.size(); ++i) {
Slice write_ts = write_ts_list[i];
write_opts.timestamp = &write_ts;
uint64_t key = kMinKey;
do {
for (uint64_t key = kMaxKey; key >= kMinKey; --key) {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
ASSERT_OK(s);
if (key == kMaxKey) {
break;
}
++key;
} while (true);
}
}
}
const std::vector<std::string> read_ts_list = {Timestamp(5, 0xffffffff),
@ -962,6 +1114,90 @@ INSTANTIATE_TEST_CASE_P(
false))),
::testing::Bool()));
class DBBasicTestWithTsIterTombstones
: public DBBasicTestWithTimestampBase,
public testing::WithParamInterface<
std::tuple<std::shared_ptr<const SliceTransform>,
std::shared_ptr<const FilterPolicy>, int>> {
public:
DBBasicTestWithTsIterTombstones()
: DBBasicTestWithTimestampBase("/db_basic_ts_iter_tombstones") {}
};
TEST_P(DBBasicTestWithTsIterTombstones, ForwardIterDelete) {
constexpr size_t kNumKeysPerFile = 128;
Options options = CurrentOptions();
options.env = env_;
const size_t kTimestampSize = Timestamp(0, 0).size();
TestComparator test_cmp(kTimestampSize);
options.comparator = &test_cmp;
options.prefix_extractor = std::get<0>(GetParam());
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
BlockBasedTableOptions bbto;
bbto.filter_policy = std::get<1>(GetParam());
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
options.num_levels = std::get<2>(GetParam());
DestroyAndReopen(options);
std::vector<std::string> write_ts_strs = {Timestamp(2, 0), Timestamp(4, 0)};
constexpr uint64_t kMaxKey = 0xffffffffffffffff;
constexpr uint64_t kMinKey = 0xfffffffffffff000;
// Insert kMinKey...kMaxKey
uint64_t key = kMinKey;
WriteOptions write_opts;
Slice ts = write_ts_strs[0];
write_opts.timestamp = &ts;
do {
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(key));
ASSERT_OK(s);
if (kMaxKey == key) {
break;
}
++key;
} while (true);
// Delete them all
ts = write_ts_strs[1];
write_opts.timestamp = &ts;
for (key = kMaxKey; key >= kMinKey; --key) {
Status s;
if (0 != (key % 2)) {
s = db_->Put(write_opts, Key1(key), "value1" + std::to_string(key));
} else {
s = db_->Delete(write_opts, Key1(key));
}
ASSERT_OK(s);
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
{
std::string read_ts = Timestamp(4, 0);
ts = read_ts;
ReadOptions read_opts;
read_opts.total_order_seek = true;
read_opts.timestamp = &ts;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
size_t count = 0;
key = kMinKey + 1;
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++count, key += 2) {
ASSERT_EQ(Key1(key), iter->key());
ASSERT_EQ("value1" + std::to_string(key), iter->value());
}
ASSERT_EQ((kMaxKey - kMinKey + 1) / 2, count);
}
Close();
}
INSTANTIATE_TEST_CASE_P(
Timestamp, DBBasicTestWithTsIterTombstones,
::testing::Combine(
::testing::Values(
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
std::shared_ptr<const FilterPolicy>(
NewBloomFilterPolicy(10, false)),
std::shared_ptr<const FilterPolicy>(
NewBloomFilterPolicy(20, false))),
::testing::Values(2, 6)));
} // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

View File

@ -23,7 +23,7 @@ namespace ROCKSDB_NAMESPACE {
// and the value type is embedded as the low 8 bits in the sequence
// number in internal keys, we need to use the highest-numbered
// ValueType, not the lowest).
const ValueType kValueTypeForSeek = kTypeBlobIndex;
const ValueType kValueTypeForSeek = kTypeDeletionWithTimestamp;
const ValueType kValueTypeForSeekForPrev = kTypeDeletion;
uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {

View File

@ -69,7 +69,8 @@ enum ValueType : unsigned char {
// generated by WriteUnprepared write policy is not mistakenly read by
// another.
kTypeBeginUnprepareXID = 0x13, // WAL only.
kMaxValue = 0x7F // Not used for storing records.
kTypeDeletionWithTimestamp = 0x14,
kMaxValue = 0x7F // Not used for storing records.
};
// Defined in dbformat.cc
@ -79,7 +80,8 @@ extern const ValueType kValueTypeForSeekForPrev;
// Checks whether a type is an inline value type
// (i.e. a type used in memtable skiplist and sst file datablock).
inline bool IsValueType(ValueType t) {
return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex;
return t <= kTypeMerge || t == kTypeSingleDeletion || t == kTypeBlobIndex ||
kTypeDeletionWithTimestamp == t;
}
// Checks whether a type is from user operation

View File

@ -724,6 +724,7 @@ static bool SaveValue(void* arg, const char* entry) {
return false;
}
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) {

View File

@ -908,7 +908,14 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSlice(&b->rep_, key);
} else {
PutVarint32(&b->rep_,
static_cast<uint32_t>(key.size() + b->timestamp_size_));
b->rep_.append(key.data(), key.size());
b->rep_.append(b->timestamp_size_, '\0');
}
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
@ -930,7 +937,11 @@ Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
if (0 == b->timestamp_size_) {
PutLengthPrefixedSliceParts(&b->rep_, key);
} else {
PutLengthPrefixedSlicePartsWithPadding(&b->rep_, key, b->timestamp_size_);
}
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
@ -1546,7 +1557,14 @@ class MemTableInserter : public WriteBatch::Handler {
return seek_status;
}
auto ret_status = DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
ColumnFamilyData* cfd = cf_mems_->current();
assert(!cfd || cfd->user_comparator());
const size_t ts_sz = (cfd && cfd->user_comparator())
? cfd->user_comparator()->timestamp_size()
: 0;
const ValueType delete_type =
(0 == ts_sz) ? kTypeDeletion : kTypeDeletionWithTimestamp;
auto ret_status = DeleteImpl(column_family_id, key, Slice(), delete_type);
// optimize for non-recovery mode
if (UNLIKELY(!ret_status.IsTryAgain() && rebuilding_trx_ != nullptr)) {
assert(!write_after_commit_);

View File

@ -303,6 +303,7 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return false;
case kTypeDeletion:
case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion:
case kTypeRangeDeletion:
// TODO(noetzli): Verify correctness once merge of single-deletes