Fix a bug in timestamp-related GC (#9116)
Summary: For multiple versions (ts + seq) of the same user key, if they cross the boundary of `full_history_ts_low_`, we should retain the version that is visible to the `full_history_ts_low_`. Namely, we keep the internal key with the largest timestamp smaller than `full_history_ts_low`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9116 Test Plan: make check Reviewed By: ltamasi Differential Revision: D32261514 Pulled By: riversand963 fbshipit-source-id: e10f47c254c04c05261440051e4f50cb7d95474e
This commit is contained in:
parent
2fbe32b0c1
commit
a113cecfc9
@ -12,6 +12,7 @@
|
||||
* Fixed a bug in CompactionIterator when write-preared transaction is used. Releasing earliest_snapshot during compaction may cause a SingleDelete to be output after a PUT of the same user key whose seq has been zeroed.
|
||||
* Added input sanitization on negative bytes passed into `GenericRateLimiter::Request`.
|
||||
* Fixed an assertion failure in CompactionIterator when write-prepared transaction is used. We prove that certain operations can lead to a Delete being followed by a SingleDelete (same user key). We can drop the SingleDelete.
|
||||
* Fixed a bug of timestamp-based GC which can cause all versions of a key under full_history_ts_low to be dropped. This bug will be triggered when some of the ikeys' timestamps are lower than full_history_ts_low, while others are newer.
|
||||
|
||||
### Behavior Changes
|
||||
* `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files.
|
||||
|
@ -407,6 +407,12 @@ void CompactionIterator::NextFromInput() {
|
||||
// Copy key for output
|
||||
key_ = current_key_.SetInternalKey(key_, &ikey_);
|
||||
|
||||
int prev_cmp_with_ts_low =
|
||||
!full_history_ts_low_ ? 0
|
||||
: curr_ts_.empty()
|
||||
? 0
|
||||
: cmp_->CompareTimestamp(curr_ts_, *full_history_ts_low_);
|
||||
|
||||
// If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
|
||||
// in next iteration to compare with the timestamp of next key.
|
||||
UpdateTimestampAndCompareWithFullHistoryLow();
|
||||
@ -416,14 +422,16 @@ void CompactionIterator::NextFromInput() {
|
||||
// (2) timestamp is disabled, OR
|
||||
// (3) all history will be preserved, OR
|
||||
// (4) user key (excluding timestamp) is different from previous key, OR
|
||||
// (5) timestamp is NO older than *full_history_ts_low_
|
||||
// (5) timestamp is NO older than *full_history_ts_low_, OR
|
||||
// (6) timestamp is the largest one older than full_history_ts_low_,
|
||||
// then current_user_key_ must be treated as a different user key.
|
||||
// This means, if a user key (excluding ts) is the same as the previous
|
||||
// user key, and its ts is older than *full_history_ts_low_, then we
|
||||
// consider this key for GC, e.g. it may be dropped if certain conditions
|
||||
// match.
|
||||
if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
|
||||
!user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0) {
|
||||
!user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0 ||
|
||||
prev_cmp_with_ts_low >= 0) {
|
||||
// Initialize for future comparison for rule (A) and etc.
|
||||
current_user_key_sequence_ = kMaxSequenceNumber;
|
||||
current_user_key_snapshot_ = 0;
|
||||
|
@ -1171,9 +1171,10 @@ TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
|
||||
std::string full_history_ts_low;
|
||||
// Keys whose timestamps larger than or equal to 102 will be preserved.
|
||||
PutFixed64(&full_history_ts_low, 102);
|
||||
const std::vector<std::string> expected_keys = {input_keys[0],
|
||||
input_keys[1]};
|
||||
const std::vector<std::string> expected_values = {"", "a2"};
|
||||
const std::vector<std::string> expected_keys = {
|
||||
input_keys[0], input_keys[1], input_keys[2]};
|
||||
const std::vector<std::string> expected_values = {"", input_values[1],
|
||||
input_values[2]};
|
||||
RunTest(input_keys, input_values, expected_keys, expected_values,
|
||||
/*last_committed_seq=*/kMaxSequenceNumber,
|
||||
/*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
|
||||
|
@ -1398,6 +1398,7 @@ TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
|
||||
|
||||
auto expected_results =
|
||||
mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
|
||||
{KeyStr("a", 0, ValueType::kTypeValue, 0), "a3"},
|
||||
{KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
|
||||
const auto& files = cfd_->current()->storage_info()->LevelFiles(0);
|
||||
|
||||
|
@ -287,6 +287,51 @@ TEST_F(DBBasicTestWithTimestamp, CompactRangeWithSpecifiedRange) {
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp, GcPreserveLatestVersionBelowFullHistoryLow) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
const size_t kTimestampSize = Timestamp(0, 0).size();
|
||||
TestComparator test_cmp(kTimestampSize);
|
||||
options.comparator = &test_cmp;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
std::string ts_str = Timestamp(1, 0);
|
||||
WriteOptions wopts;
|
||||
Slice ts = ts_str;
|
||||
wopts.timestamp = &ts;
|
||||
ASSERT_OK(db_->Put(wopts, "k1", "v1"));
|
||||
ASSERT_OK(db_->Put(wopts, "k2", "v2"));
|
||||
ASSERT_OK(db_->Put(wopts, "k3", "v3"));
|
||||
|
||||
ts_str = Timestamp(2, 0);
|
||||
ts = ts_str;
|
||||
wopts.timestamp = &ts;
|
||||
ASSERT_OK(db_->Delete(wopts, "k3"));
|
||||
|
||||
ts_str = Timestamp(4, 0);
|
||||
ts = ts_str;
|
||||
wopts.timestamp = &ts;
|
||||
ASSERT_OK(db_->Put(wopts, "k1", "v5"));
|
||||
|
||||
ts_str = Timestamp(3, 0);
|
||||
ts = ts_str;
|
||||
CompactRangeOptions cro;
|
||||
cro.full_history_ts_low = &ts;
|
||||
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
|
||||
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
ReadOptions ropts;
|
||||
ropts.timestamp = &ts;
|
||||
std::string value;
|
||||
Status s = db_->Get(ropts, "k1", &value);
|
||||
ASSERT_OK(s);
|
||||
ASSERT_EQ("v1", value);
|
||||
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
@ -324,6 +369,7 @@ TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) {
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
// TODO return a non-ok for read ts < current_ts_low and test it.
|
||||
for (int i = 0; i < 10; i++) {
|
||||
ReadOptions read_opts;
|
||||
std::string ts_str = Timestamp(i, 0);
|
||||
@ -331,7 +377,7 @@ TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) {
|
||||
read_opts.timestamp = &ts;
|
||||
std::string value;
|
||||
Status status = db_->Get(read_opts, kKey, &value);
|
||||
if (i < current_ts_low) {
|
||||
if (i < current_ts_low - 1) {
|
||||
ASSERT_TRUE(status.IsNotFound());
|
||||
} else {
|
||||
ASSERT_OK(status);
|
||||
@ -358,19 +404,16 @@ TEST_F(DBBasicTestWithTimestamp, UpdateFullHistoryTsLow) {
|
||||
result_ts_low = cfd->GetFullHistoryTsLow();
|
||||
ASSERT_TRUE(test_cmp.CompareTimestamp(ts_low, result_ts_low) == 0);
|
||||
|
||||
for (int i = 0; i < 20; i++) {
|
||||
// TODO return a non-ok for read ts < current_ts_low and test it.
|
||||
for (int i = current_ts_low; i < 20; i++) {
|
||||
ReadOptions read_opts;
|
||||
std::string ts_str = Timestamp(i, 0);
|
||||
Slice ts = ts_str;
|
||||
read_opts.timestamp = &ts;
|
||||
std::string value;
|
||||
Status status = db_->Get(read_opts, kKey, &value);
|
||||
if (i < current_ts_low) {
|
||||
ASSERT_TRUE(status.IsNotFound());
|
||||
} else {
|
||||
ASSERT_OK(status);
|
||||
ASSERT_TRUE(value.compare(Key(i)) == 0);
|
||||
}
|
||||
ASSERT_OK(status);
|
||||
ASSERT_TRUE(value.compare(Key(i)) == 0);
|
||||
}
|
||||
|
||||
// Test invalid compaction with range
|
||||
|
Loading…
x
Reference in New Issue
Block a user