Cascade TTL Compactions to move expired key ranges to bottom levels faster (#5992)
Summary: When users use Level-Compaction-with-TTL by setting `cf_options.ttl`, the ttl-expired data could take n*ttl time to reach the bottom level (where n is the number of levels) due to how the `creation_time` table property was calculated for the newly created files during compaction. The creation time of new files was set to a max of all compaction-input-files-creation-times which essentially resulted in resetting the ttl as the key range moves across levels. This behavior is now fixed by changing the `creation_time` to be based on minimum of all compaction-input-files-creation-times; this will cause cascading compactions across levels for the ttl-expired data to move to the bottom level, resulting in getting rid of tombstones/deleted-data faster. This will help start cascading compactions to move the expired key range to the bottom-most level faster. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5992 Test Plan: `make check` Differential Revision: D18257883 Pulled By: sagar0 fbshipit-source-id: 00df0bb8d0b7e14d9fc239df2cba8559f3e54cbc
This commit is contained in:
parent
8e7aa62813
commit
c17384fea4
@ -1,10 +1,10 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
## Unreleased
|
## Unreleased
|
||||||
### Public API Change
|
### Public API Change
|
||||||
|
* TTL Compactions in Level compaction style now initiate successive cascading compactions on a key range so that it reaches the bottom level quickly on TTL expiry. `creation_time` table property for compaction output files is now set to the minimum of the creation times of all compaction inputs.
|
||||||
* Changed the default value of periodic_compaction_seconds to `UINT64_MAX` which allows RocksDB to auto-tune periodic compaction scheduling. When using the default value, periodic compactions are now auto-enabled if a compaction filter is used. A value of `0` will turn off the feature completely.
|
* Changed the default value of periodic_compaction_seconds to `UINT64_MAX` which allows RocksDB to auto-tune periodic compaction scheduling. When using the default value, periodic compactions are now auto-enabled if a compaction filter is used. A value of `0` will turn off the feature completely.
|
||||||
* With FIFO compaction style, options.periodic_compaction_seconds will have the same meaning as options.ttl. Whichever stricter will be used. With the default options.periodic_compaction_seconds value with options.ttl's default of 0, RocksDB will give a default of 30 days.
|
* With FIFO compaction style, options.periodic_compaction_seconds will have the same meaning as options.ttl. Whichever stricter will be used. With the default options.periodic_compaction_seconds value with options.ttl's default of 0, RocksDB will give a default of 30 days.
|
||||||
* Added an API GetCreationTimeOfOldestFile(uint64_t* creation_time) to get the
|
* Added an API GetCreationTimeOfOldestFile(uint64_t* creation_time) to get the file_creation_time of the oldest SST file in the DB.
|
||||||
file_creation_time of the oldest SST file in the DB.
|
|
||||||
|
|
||||||
### New Features
|
### New Features
|
||||||
* Universal compaction to support options.periodic_compaction_seconds. A full compaction will be triggered if any file is over the threshold.
|
* Universal compaction to support options.periodic_compaction_seconds. A full compaction will be triggered if any file is over the threshold.
|
||||||
|
@ -545,17 +545,17 @@ bool Compaction::ShouldFormSubcompactions() const {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t Compaction::MaxInputFileCreationTime() const {
|
uint64_t Compaction::MinInputFileCreationTime() const {
|
||||||
uint64_t max_creation_time = 0;
|
uint64_t min_creation_time = port::kMaxUint64;
|
||||||
for (const auto& file : inputs_[0].files) {
|
for (const auto& file : inputs_[0].files) {
|
||||||
if (file->fd.table_reader != nullptr &&
|
if (file->fd.table_reader != nullptr &&
|
||||||
file->fd.table_reader->GetTableProperties() != nullptr) {
|
file->fd.table_reader->GetTableProperties() != nullptr) {
|
||||||
uint64_t creation_time =
|
uint64_t creation_time =
|
||||||
file->fd.table_reader->GetTableProperties()->creation_time;
|
file->fd.table_reader->GetTableProperties()->creation_time;
|
||||||
max_creation_time = std::max(max_creation_time, creation_time);
|
min_creation_time = std::min(min_creation_time, creation_time);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return max_creation_time;
|
return min_creation_time;
|
||||||
}
|
}
|
||||||
|
|
||||||
int Compaction::GetInputBaseLevel() const {
|
int Compaction::GetInputBaseLevel() const {
|
||||||
|
@ -291,7 +291,7 @@ class Compaction {
|
|||||||
|
|
||||||
uint32_t max_subcompactions() const { return max_subcompactions_; }
|
uint32_t max_subcompactions() const { return max_subcompactions_; }
|
||||||
|
|
||||||
uint64_t MaxInputFileCreationTime() const;
|
uint64_t MinInputFileCreationTime() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// mark (or clear) all files that are being compacted
|
// mark (or clear) all files that are being compacted
|
||||||
|
@ -1511,10 +1511,9 @@ Status CompactionJob::OpenCompactionOutputFile(
|
|||||||
}
|
}
|
||||||
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||||
|
|
||||||
uint64_t latest_key_time =
|
uint64_t creation_time = sub_compact->compaction->MinInputFileCreationTime();
|
||||||
sub_compact->compaction->MaxInputFileCreationTime();
|
if (creation_time == port::kMaxUint64) {
|
||||||
if (latest_key_time == 0) {
|
creation_time = current_time;
|
||||||
latest_key_time = current_time;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sub_compact->builder.reset(NewTableBuilder(
|
sub_compact->builder.reset(NewTableBuilder(
|
||||||
@ -1524,7 +1523,7 @@ Status CompactionJob::OpenCompactionOutputFile(
|
|||||||
sub_compact->compaction->output_compression(),
|
sub_compact->compaction->output_compression(),
|
||||||
0 /*sample_for_compression */,
|
0 /*sample_for_compression */,
|
||||||
sub_compact->compaction->output_compression_opts(),
|
sub_compact->compaction->output_compression_opts(),
|
||||||
sub_compact->compaction->output_level(), skip_filters, latest_key_time,
|
sub_compact->compaction->output_level(), skip_filters, creation_time,
|
||||||
0 /* oldest_key_time */, sub_compact->compaction->max_output_file_size(),
|
0 /* oldest_key_time */, sub_compact->compaction->max_output_file_size(),
|
||||||
current_time));
|
current_time));
|
||||||
LogFlush(db_options_.info_log);
|
LogFlush(db_options_.info_log);
|
||||||
|
@ -3521,6 +3521,93 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
|
|||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
|
||||||
|
const int kValueSize = 100;
|
||||||
|
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.compression = kNoCompression;
|
||||||
|
options.ttl = 24 * 60 * 60; // 24 hours
|
||||||
|
options.max_open_files = -1;
|
||||||
|
env_->time_elapse_only_sleep_ = false;
|
||||||
|
options.env = env_;
|
||||||
|
|
||||||
|
env_->addon_time_.store(0);
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
int ttl_compactions = 0;
|
||||||
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||||
|
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||||
|
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||||
|
auto compaction_reason = compaction->compaction_reason();
|
||||||
|
if (compaction_reason == CompactionReason::kTtl) {
|
||||||
|
ttl_compactions++;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
// Add two L6 files with key ranges: [1 .. 100], [101 .. 200].
|
||||||
|
Random rnd(301);
|
||||||
|
for (int i = 1; i <= 100; ++i) {
|
||||||
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
for (int i = 101; i <= 200; ++i) {
|
||||||
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
MoveFilesToLevel(6);
|
||||||
|
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
|
||||||
|
|
||||||
|
// Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
|
||||||
|
for (int i = 1; i <= 50; ++i) {
|
||||||
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
for (int i = 51; i <= 150; ++i) {
|
||||||
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
MoveFilesToLevel(4);
|
||||||
|
ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
|
||||||
|
|
||||||
|
// Add one L1 file with key range: [26, 75].
|
||||||
|
for (int i = 26; i <= 75; ++i) {
|
||||||
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, kValueSize)));
|
||||||
|
}
|
||||||
|
Flush();
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
MoveFilesToLevel(1);
|
||||||
|
ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
|
||||||
|
|
||||||
|
// LSM tree:
|
||||||
|
// L1: [26 .. 75]
|
||||||
|
// L4: [1 .. 50][51 ..... 150]
|
||||||
|
// L6: [1 ........ 100][101 .... 200]
|
||||||
|
//
|
||||||
|
// On TTL expiry, TTL compaction should be initiated on L1 file, and the
|
||||||
|
// compactions should keep going on until the key range hits bottom level.
|
||||||
|
// In other words: the compaction on this data range "cascasdes" until
|
||||||
|
// reaching the bottom level.
|
||||||
|
//
|
||||||
|
// Order of events on TTL expiry:
|
||||||
|
// 1. L1 file falls to L3 via 2 trivial moves which are initiated by the ttl
|
||||||
|
// compaction.
|
||||||
|
// 2. A TTL compaction happens between L3 and L4 files. Output file in L4.
|
||||||
|
// 3. The new output file from L4 falls to L5 via 1 trival move initiated
|
||||||
|
// by the ttl compaction.
|
||||||
|
// 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
|
||||||
|
|
||||||
|
// Add 25 hours and do a write
|
||||||
|
env_->addon_time_.fetch_add(25 * 60 * 60);
|
||||||
|
ASSERT_OK(Put("a", "1"));
|
||||||
|
Flush();
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
|
||||||
|
ASSERT_EQ(5, ttl_compactions);
|
||||||
|
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
|
TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
|
||||||
const int kNumKeysPerFile = 32;
|
const int kNumKeysPerFile = 32;
|
||||||
const int kNumLevelFiles = 2;
|
const int kNumLevelFiles = 2;
|
||||||
|
Loading…
Reference in New Issue
Block a user