Periodic Compactions (#5166)
Summary: Introducing Periodic Compactions. This feature allows all the files in a CF to be periodically compacted. It could help in catching any corruptions that could creep into the DB proactively as every file is constantly getting re-compacted. And also, of course, it helps to cleanup data older than certain threshold. - Introduced a new option `periodic_compaction_time` to control how long a file can live without being compacted in a CF. - This works across all levels. - The files are put in the same level after going through the compaction. (Related files in the same level are picked up as `ExpandInputstoCleanCut` is used). - Compaction filters, if any, are invoked as usual. - A new table property, `file_creation_time`, is introduced to implement this feature. This property is set to the time at which the SST file was created (and that time is given by the underlying Env/OS). This feature can be enabled on its own, or in conjunction with `ttl`. It is possible to set a different time threshold for the bottom level when used in conjunction with ttl. Since `ttl` works only on 0 to last but one levels, you could set `ttl` to, say, 1 day, and `periodic_compaction_time` to, say, 7 days. Since `ttl < periodic_compaction_time` all files in last but one levels keep getting picked up based on ttl, and almost never based on periodic_compaction_time. The files in the bottom level get picked up for compaction based on `periodic_compaction_time`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5166 Differential Revision: D14884441 Pulled By: sagar0 fbshipit-source-id: 408426cbacb409c06386a98632dcf90bfa1bda47
This commit is contained in:
parent
ef0fc1b461
commit
d3d20dcdca
@ -3,6 +3,8 @@
|
||||
### Unreleased
|
||||
### New Features
|
||||
* When reading from option file/string/map, customized comparators and/or merge operators can be filled according to object registry.
|
||||
* Introduce Periodic Compaction for Level style compaction. Files are re-compacted periodically and put in the same level.
|
||||
|
||||
### Public API Change
|
||||
### Bug Fixes
|
||||
* Fix a bug in 2PC where a sequence of txn prepare, memtable flush, and crash could result in losing the prepared transaction.
|
||||
|
@ -49,7 +49,8 @@ TableBuilder* NewTableBuilder(
|
||||
WritableFileWriter* file, const CompressionType compression_type,
|
||||
uint64_t sample_for_compression, const CompressionOptions& compression_opts,
|
||||
int level, const bool skip_filters, const uint64_t creation_time,
|
||||
const uint64_t oldest_key_time, const uint64_t target_file_size) {
|
||||
const uint64_t oldest_key_time, const uint64_t target_file_size,
|
||||
const uint64_t file_creation_time) {
|
||||
assert((column_family_id ==
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
|
||||
column_family_name.empty());
|
||||
@ -58,7 +59,8 @@ TableBuilder* NewTableBuilder(
|
||||
int_tbl_prop_collector_factories, compression_type,
|
||||
sample_for_compression, compression_opts,
|
||||
skip_filters, column_family_name, level,
|
||||
creation_time, oldest_key_time, target_file_size),
|
||||
creation_time, oldest_key_time, target_file_size,
|
||||
file_creation_time),
|
||||
column_family_id, file);
|
||||
}
|
||||
|
||||
@ -80,7 +82,7 @@ Status BuildTable(
|
||||
TableFileCreationReason reason, EventLogger* event_logger, int job_id,
|
||||
const Env::IOPriority io_priority, TableProperties* table_properties,
|
||||
int level, const uint64_t creation_time, const uint64_t oldest_key_time,
|
||||
Env::WriteLifeTimeHint write_hint) {
|
||||
Env::WriteLifeTimeHint write_hint, const uint64_t file_creation_time) {
|
||||
assert((column_family_id ==
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
|
||||
column_family_name.empty());
|
||||
@ -135,7 +137,8 @@ Status BuildTable(
|
||||
int_tbl_prop_collector_factories, column_family_id,
|
||||
column_family_name, file_writer.get(), compression,
|
||||
sample_for_compression, compression_opts_for_flush, level,
|
||||
false /* skip_filters */, creation_time, oldest_key_time);
|
||||
false /* skip_filters */, creation_time, oldest_key_time,
|
||||
0 /*target_file_size*/, file_creation_time);
|
||||
}
|
||||
|
||||
MergeHelper merge(env, internal_comparator.user_comparator(),
|
||||
|
@ -50,7 +50,8 @@ TableBuilder* NewTableBuilder(
|
||||
const uint64_t sample_for_compression,
|
||||
const CompressionOptions& compression_opts, int level,
|
||||
const bool skip_filters = false, const uint64_t creation_time = 0,
|
||||
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0);
|
||||
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0,
|
||||
const uint64_t file_creation_time = 0);
|
||||
|
||||
// Build a Table file from the contents of *iter. The generated file
|
||||
// will be named according to number specified in meta. On success, the rest of
|
||||
@ -80,6 +81,7 @@ extern Status BuildTable(
|
||||
const Env::IOPriority io_priority = Env::IO_HIGH,
|
||||
TableProperties* table_properties = nullptr, int level = -1,
|
||||
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
|
||||
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET);
|
||||
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
|
||||
const uint64_t file_creation_time = 0);
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -97,6 +97,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
|
||||
return "Flush";
|
||||
case CompactionReason::kExternalSstIngestion:
|
||||
return "ExternalSstIngestion";
|
||||
case CompactionReason::kPeriodicCompaction:
|
||||
return "PeriodicCompaction";
|
||||
case CompactionReason::kNumOfReasons:
|
||||
// fall through
|
||||
default:
|
||||
@ -1480,20 +1482,20 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
bool skip_filters =
|
||||
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
|
||||
|
||||
uint64_t output_file_creation_time =
|
||||
int64_t temp_current_time = 0;
|
||||
auto get_time_status = env_->GetCurrentTime(&temp_current_time);
|
||||
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
|
||||
if (!get_time_status.ok()) {
|
||||
ROCKS_LOG_WARN(db_options_.info_log,
|
||||
"Failed to get current time. Status: %s",
|
||||
get_time_status.ToString().c_str());
|
||||
}
|
||||
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||
|
||||
uint64_t latest_key_time =
|
||||
sub_compact->compaction->MaxInputFileCreationTime();
|
||||
if (output_file_creation_time == 0) {
|
||||
int64_t _current_time = 0;
|
||||
auto status = db_options_.env->GetCurrentTime(&_current_time);
|
||||
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
|
||||
if (!status.ok()) {
|
||||
ROCKS_LOG_WARN(
|
||||
db_options_.info_log,
|
||||
"Failed to get current time to populate creation_time property. "
|
||||
"Status: %s",
|
||||
status.ToString().c_str());
|
||||
}
|
||||
output_file_creation_time = static_cast<uint64_t>(_current_time);
|
||||
if (latest_key_time == 0) {
|
||||
latest_key_time = current_time;
|
||||
}
|
||||
|
||||
sub_compact->builder.reset(NewTableBuilder(
|
||||
@ -1503,9 +1505,9 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
sub_compact->compaction->output_compression(),
|
||||
0 /*sample_for_compression */,
|
||||
sub_compact->compaction->output_compression_opts(),
|
||||
sub_compact->compaction->output_level(), skip_filters,
|
||||
output_file_creation_time, 0 /* oldest_key_time */,
|
||||
sub_compact->compaction->max_output_file_size()));
|
||||
sub_compact->compaction->output_level(), skip_filters, latest_key_time,
|
||||
0 /* oldest_key_time */, sub_compact->compaction->max_output_file_size(),
|
||||
current_time));
|
||||
LogFlush(db_options_.info_log);
|
||||
return s;
|
||||
}
|
||||
|
@ -1073,6 +1073,9 @@ bool LevelCompactionPicker::NeedsCompaction(
|
||||
if (!vstorage->ExpiredTtlFiles().empty()) {
|
||||
return true;
|
||||
}
|
||||
if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
|
||||
return true;
|
||||
}
|
||||
if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
|
||||
return true;
|
||||
}
|
||||
@ -1141,6 +1144,8 @@ class LevelCompactionBuilder {
|
||||
|
||||
void PickExpiredTtlFiles();
|
||||
|
||||
void PickFilesMarkedForPeriodicCompaction();
|
||||
|
||||
const std::string& cf_name_;
|
||||
VersionStorageInfo* vstorage_;
|
||||
CompactionPicker* compaction_picker_;
|
||||
@ -1203,6 +1208,39 @@ void LevelCompactionBuilder::PickExpiredTtlFiles() {
|
||||
start_level_inputs_.files.clear();
|
||||
}
|
||||
|
||||
void LevelCompactionBuilder::PickFilesMarkedForPeriodicCompaction() {
|
||||
if (vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
|
||||
// If it's being compacted it has nothing to do here.
|
||||
// If this assert() fails that means that some function marked some
|
||||
// files as being_compacted, but didn't call ComputeCompactionScore()
|
||||
assert(!level_file.second->being_compacted);
|
||||
output_level_ = start_level_ = level_file.first;
|
||||
|
||||
if (start_level_ == 0 &&
|
||||
!compaction_picker_->level0_compactions_in_progress()->empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
start_level_inputs_.files = {level_file.second};
|
||||
start_level_inputs_.level = start_level_;
|
||||
return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
|
||||
&start_level_inputs_);
|
||||
};
|
||||
|
||||
for (auto& level_file : vstorage_->FilesMarkedForPeriodicCompaction()) {
|
||||
if (continuation(level_file)) {
|
||||
// found the compaction!
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
start_level_inputs_.files.clear();
|
||||
}
|
||||
|
||||
void LevelCompactionBuilder::SetupInitialFiles() {
|
||||
// Find the compactions by size on all levels.
|
||||
bool skipped_l0_to_base = false;
|
||||
@ -1256,7 +1294,6 @@ void LevelCompactionBuilder::SetupInitialFiles() {
|
||||
if (start_level_inputs_.empty()) {
|
||||
parent_index_ = base_index_ = -1;
|
||||
|
||||
// PickFilesMarkedForCompaction();
|
||||
compaction_picker_->PickFilesMarkedForCompaction(
|
||||
cf_name_, vstorage_, &start_level_, &output_level_, &start_level_inputs_);
|
||||
if (!start_level_inputs_.empty()) {
|
||||
@ -1264,7 +1301,10 @@ void LevelCompactionBuilder::SetupInitialFiles() {
|
||||
compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Bottommost Files Compaction on deleting tombstones
|
||||
if (start_level_inputs_.empty()) {
|
||||
size_t i;
|
||||
for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size();
|
||||
++i) {
|
||||
@ -1285,11 +1325,23 @@ void LevelCompactionBuilder::SetupInitialFiles() {
|
||||
compaction_reason_ = CompactionReason::kBottommostFiles;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
assert(start_level_inputs_.empty());
|
||||
// TTL Compaction
|
||||
if (start_level_inputs_.empty()) {
|
||||
PickExpiredTtlFiles();
|
||||
if (!start_level_inputs_.empty()) {
|
||||
compaction_reason_ = CompactionReason::kTtl;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic Compaction
|
||||
if (start_level_inputs_.empty()) {
|
||||
PickFilesMarkedForPeriodicCompaction();
|
||||
if (!start_level_inputs_.empty()) {
|
||||
compaction_reason_ = CompactionReason::kPeriodicCompaction;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3517,6 +3517,160 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
|
||||
const int kNumKeysPerFile = 32;
|
||||
const int kNumLevelFiles = 2;
|
||||
const int kValueSize = 100;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
|
||||
options.max_open_files = -1; // needed for ttl compaction
|
||||
env_->time_elapse_only_sleep_ = false;
|
||||
options.env = env_;
|
||||
|
||||
env_->addon_time_.store(0);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
int periodic_compactions = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||
auto compaction_reason = compaction->compaction_reason();
|
||||
if (compaction_reason == CompactionReason::kPeriodicCompaction) {
|
||||
periodic_compactions++;
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < kNumLevelFiles; ++i) {
|
||||
for (int j = 0; j < kNumKeysPerFile; ++j) {
|
||||
ASSERT_OK(
|
||||
Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
ASSERT_EQ("2", FilesPerLevel());
|
||||
ASSERT_EQ(0, periodic_compactions);
|
||||
|
||||
// Add 50 hours and do a write
|
||||
env_->addon_time_.fetch_add(50 * 60 * 60);
|
||||
ASSERT_OK(Put("a", "1"));
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
// Assert that the files stay in the same level
|
||||
ASSERT_EQ("3", FilesPerLevel());
|
||||
// The two old files go through the periodic compaction process
|
||||
ASSERT_EQ(2, periodic_compactions);
|
||||
|
||||
MoveFilesToLevel(1);
|
||||
ASSERT_EQ("0,3", FilesPerLevel());
|
||||
|
||||
// Add another 50 hours and do another write
|
||||
env_->addon_time_.fetch_add(50 * 60 * 60);
|
||||
ASSERT_OK(Put("b", "2"));
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("1,3", FilesPerLevel());
|
||||
// The three old files now go through the periodic compaction process. 2 + 3.
|
||||
ASSERT_EQ(5, periodic_compactions);
|
||||
|
||||
// Add another 50 hours and do another write
|
||||
env_->addon_time_.fetch_add(50 * 60 * 60);
|
||||
ASSERT_OK(Put("c", "3"));
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("2,3", FilesPerLevel());
|
||||
// The four old files now go through the periodic compaction process. 5 + 4.
|
||||
ASSERT_EQ(9, periodic_compactions);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
|
||||
const int kNumKeysPerFile = 32;
|
||||
const int kNumLevelFiles = 2;
|
||||
const int kValueSize = 100;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.ttl = 10 * 60 * 60; // 10 hours
|
||||
options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
|
||||
options.max_open_files = -1; // needed for both periodic and ttl compactions
|
||||
env_->time_elapse_only_sleep_ = false;
|
||||
options.env = env_;
|
||||
|
||||
env_->addon_time_.store(0);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
int periodic_compactions = 0;
|
||||
int ttl_compactions = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
|
||||
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
|
||||
auto compaction_reason = compaction->compaction_reason();
|
||||
if (compaction_reason == CompactionReason::kPeriodicCompaction) {
|
||||
periodic_compactions++;
|
||||
} else if (compaction_reason == CompactionReason::kTtl) {
|
||||
ttl_compactions++;
|
||||
}
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < kNumLevelFiles; ++i) {
|
||||
for (int j = 0; j < kNumKeysPerFile; ++j) {
|
||||
ASSERT_OK(
|
||||
Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
|
||||
}
|
||||
Flush();
|
||||
}
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
MoveFilesToLevel(3);
|
||||
|
||||
ASSERT_EQ("0,0,0,2", FilesPerLevel());
|
||||
ASSERT_EQ(0, periodic_compactions);
|
||||
ASSERT_EQ(0, ttl_compactions);
|
||||
|
||||
// Add some time greater than periodic_compaction_time.
|
||||
env_->addon_time_.fetch_add(50 * 60 * 60);
|
||||
ASSERT_OK(Put("a", "1"));
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
// Files in the bottom level go through periodic compactions.
|
||||
ASSERT_EQ("1,0,0,2", FilesPerLevel());
|
||||
ASSERT_EQ(2, periodic_compactions);
|
||||
ASSERT_EQ(0, ttl_compactions);
|
||||
|
||||
// Add a little more time than ttl
|
||||
env_->addon_time_.fetch_add(11 * 60 * 60);
|
||||
ASSERT_OK(Put("b", "1"));
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
// Notice that the previous file in level 1 falls down to the bottom level
|
||||
// due to ttl compactions, one level at a time.
|
||||
// And bottom level files don't get picked up for ttl compactions.
|
||||
ASSERT_EQ("1,0,0,3", FilesPerLevel());
|
||||
ASSERT_EQ(2, periodic_compactions);
|
||||
ASSERT_EQ(3, ttl_compactions);
|
||||
|
||||
// Add some time greater than periodic_compaction_time.
|
||||
env_->addon_time_.fetch_add(50 * 60 * 60);
|
||||
ASSERT_OK(Put("c", "1"));
|
||||
Flush();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
// Previous L0 file falls one level at a time to bottom level due to ttl.
|
||||
// And all 4 bottom files go through periodic compactions.
|
||||
ASSERT_EQ("1,0,0,4", FilesPerLevel());
|
||||
ASSERT_EQ(6, periodic_compactions);
|
||||
ASSERT_EQ(6, ttl_compactions);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
|
||||
TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
|
||||
// Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
|
||||
// compaction only triggers flush after it's sure stall won't be triggered for
|
||||
|
@ -188,6 +188,20 @@ static Status ValidateOptions(
|
||||
"TTL is only supported in Block-Based Table format. ");
|
||||
}
|
||||
}
|
||||
|
||||
if (cfd.options.periodic_compaction_seconds > 0) {
|
||||
if (db_options.max_open_files != -1) {
|
||||
return Status::NotSupported(
|
||||
"Periodic Compaction is only supported when files are always "
|
||||
"kept open (set max_open_files = -1). ");
|
||||
}
|
||||
if (cfd.options.table_factory->Name() !=
|
||||
BlockBasedTableFactory().Name()) {
|
||||
return Status::NotSupported(
|
||||
"Periodic Compaction is only supported in "
|
||||
"Block-Based Table format. ");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (db_options.db_paths.size() > 4) {
|
||||
|
@ -379,7 +379,7 @@ Status FlushJob::WriteLevel0Table() {
|
||||
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
|
||||
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
|
||||
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
|
||||
oldest_key_time, write_hint);
|
||||
oldest_key_time, write_hint, current_time);
|
||||
LogFlush(db_options_.info_log);
|
||||
}
|
||||
ROCKS_LOG_INFO(db_options_.info_log,
|
||||
|
@ -1705,6 +1705,10 @@ void VersionStorageInfo::ComputeCompactionScore(
|
||||
if (mutable_cf_options.ttl > 0) {
|
||||
ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
|
||||
}
|
||||
if (mutable_cf_options.periodic_compaction_seconds > 0) {
|
||||
ComputeFilesMarkedForPeriodicCompaction(
|
||||
immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
|
||||
}
|
||||
EstimateCompactionBytesNeeded(mutable_cf_options);
|
||||
}
|
||||
|
||||
@ -1758,6 +1762,36 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
|
||||
}
|
||||
}
|
||||
|
||||
void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const uint64_t periodic_compaction_seconds) {
|
||||
assert(periodic_compaction_seconds > 0);
|
||||
|
||||
files_marked_for_periodic_compaction_.clear();
|
||||
|
||||
int64_t temp_current_time;
|
||||
auto status = ioptions.env->GetCurrentTime(&temp_current_time);
|
||||
if (!status.ok()) {
|
||||
return;
|
||||
}
|
||||
const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||
const uint64_t allowed_time_limit =
|
||||
current_time - periodic_compaction_seconds;
|
||||
|
||||
for (int level = 0; level < num_levels(); level++) {
|
||||
for (auto f : files_[level]) {
|
||||
if (!f->being_compacted && f->fd.table_reader != nullptr &&
|
||||
f->fd.table_reader->GetTableProperties() != nullptr) {
|
||||
auto file_creation_time =
|
||||
f->fd.table_reader->GetTableProperties()->file_creation_time;
|
||||
if (file_creation_time > 0 && file_creation_time < allowed_time_limit) {
|
||||
files_marked_for_periodic_compaction_.emplace_back(level, f);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// used to sort files by size
|
||||
|
@ -139,6 +139,12 @@ class VersionStorageInfo {
|
||||
void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions,
|
||||
const uint64_t ttl);
|
||||
|
||||
// This computes files_marked_for_periodic_compaction_ and is called by
|
||||
// ComputeCompactionScore()
|
||||
void ComputeFilesMarkedForPeriodicCompaction(
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const uint64_t periodic_compaction_seconds);
|
||||
|
||||
// This computes bottommost_files_marked_for_compaction_ and is called by
|
||||
// ComputeCompactionScore() or UpdateOldestSnapshot().
|
||||
//
|
||||
@ -300,6 +306,14 @@ class VersionStorageInfo {
|
||||
return expired_ttl_files_;
|
||||
}
|
||||
|
||||
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
|
||||
// REQUIRES: DB mutex held during access
|
||||
const autovector<std::pair<int, FileMetaData*>>&
|
||||
FilesMarkedForPeriodicCompaction() const {
|
||||
assert(finalized_);
|
||||
return files_marked_for_periodic_compaction_;
|
||||
}
|
||||
|
||||
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
|
||||
// REQUIRES: DB mutex held during access
|
||||
const autovector<std::pair<int, FileMetaData*>>&
|
||||
@ -469,6 +483,9 @@ class VersionStorageInfo {
|
||||
|
||||
autovector<std::pair<int, FileMetaData*>> expired_ttl_files_;
|
||||
|
||||
autovector<std::pair<int, FileMetaData*>>
|
||||
files_marked_for_periodic_compaction_;
|
||||
|
||||
// These files are considered bottommost because none of their keys can exist
|
||||
// at lower levels. They are not necessarily all in the same level. The marked
|
||||
// ones are eligible for compaction because they contain duplicate key
|
||||
|
@ -644,6 +644,16 @@ struct AdvancedColumnFamilyOptions {
|
||||
// Dynamically changeable through SetOptions() API
|
||||
uint64_t ttl = 0;
|
||||
|
||||
// Files older than this value will be picked up for compaction, and
|
||||
// re-written to the same level as they were before.
|
||||
// Only supported in Level compaction.
|
||||
// Pre-req: max_open_file == -1.
|
||||
// unit: seconds. Ex: 7 days = 7 * 24 * 60 * 60
|
||||
// Default: 0 (disabled)
|
||||
//
|
||||
// Dynamically changeable through SetOptions() API
|
||||
uint64_t periodic_compaction_seconds = 0;
|
||||
|
||||
// If this option is set then 1 in N blocks are compressed
|
||||
// using a fast (lz4) and slow (zstd) compression algorithm.
|
||||
// The compressibility is reported as stats and the stored
|
||||
|
@ -89,6 +89,8 @@ enum class CompactionReason : int {
|
||||
kFlush,
|
||||
// Compaction caused by external sst file ingestion
|
||||
kExternalSstIngestion,
|
||||
// Compaction due to SST file being too old
|
||||
kPeriodicCompaction,
|
||||
// total number of compaction reasons, new reasons must be added above this.
|
||||
kNumOfReasons,
|
||||
};
|
||||
|
@ -56,6 +56,7 @@ struct TablePropertiesNames {
|
||||
static const std::string kCompressionOptions;
|
||||
static const std::string kCreationTime;
|
||||
static const std::string kOldestKeyTime;
|
||||
static const std::string kFileCreationTime;
|
||||
};
|
||||
|
||||
extern const std::string kPropertiesBlock;
|
||||
@ -177,11 +178,14 @@ struct TableProperties {
|
||||
// by column_family_name.
|
||||
uint64_t column_family_id =
|
||||
rocksdb::TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
|
||||
// The time when the SST file was created.
|
||||
// Since SST files are immutable, this is equivalent to last modified time.
|
||||
// Timestamp of the latest key. 0 means unknown.
|
||||
// TODO(sagar0): Should be changed to latest_key_time ... but don't know the
|
||||
// full implications of backward compatibility. Hence retaining for now.
|
||||
uint64_t creation_time = 0;
|
||||
// Timestamp of the earliest key. 0 means unknown.
|
||||
uint64_t oldest_key_time = 0;
|
||||
// Actual SST file creation time. 0 means unknown.
|
||||
uint64_t file_creation_time = 0;
|
||||
|
||||
// Name of the column family with which this SST file is associated.
|
||||
// If column family is unknown, `column_family_name` will be an empty string.
|
||||
|
@ -173,6 +173,8 @@ void MutableCFOptions::Dump(Logger* log) const {
|
||||
max_bytes_for_level_multiplier);
|
||||
ROCKS_LOG_INFO(log, " ttl: %" PRIu64,
|
||||
ttl);
|
||||
ROCKS_LOG_INFO(log, " periodic_compaction_seconds: %" PRIu64,
|
||||
periodic_compaction_seconds);
|
||||
std::string result;
|
||||
char buf[10];
|
||||
for (const auto m : max_bytes_for_level_multiplier_additional) {
|
||||
|
@ -151,6 +151,7 @@ struct MutableCFOptions {
|
||||
max_bytes_for_level_base(options.max_bytes_for_level_base),
|
||||
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
|
||||
ttl(options.ttl),
|
||||
periodic_compaction_seconds(options.periodic_compaction_seconds),
|
||||
max_bytes_for_level_multiplier_additional(
|
||||
options.max_bytes_for_level_multiplier_additional),
|
||||
compaction_options_fifo(options.compaction_options_fifo),
|
||||
@ -186,6 +187,7 @@ struct MutableCFOptions {
|
||||
max_bytes_for_level_base(0),
|
||||
max_bytes_for_level_multiplier(0),
|
||||
ttl(0),
|
||||
periodic_compaction_seconds(0),
|
||||
compaction_options_fifo(),
|
||||
max_sequential_skip_in_iterations(0),
|
||||
paranoid_file_checks(false),
|
||||
@ -236,6 +238,7 @@ struct MutableCFOptions {
|
||||
uint64_t max_bytes_for_level_base;
|
||||
double max_bytes_for_level_multiplier;
|
||||
uint64_t ttl;
|
||||
uint64_t periodic_compaction_seconds;
|
||||
std::vector<int> max_bytes_for_level_multiplier_additional;
|
||||
CompactionOptionsFIFO compaction_options_fifo;
|
||||
CompactionOptionsUniversal compaction_options_universal;
|
||||
|
@ -88,6 +88,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
|
||||
force_consistency_checks(options.force_consistency_checks),
|
||||
report_bg_io_stats(options.report_bg_io_stats),
|
||||
ttl(options.ttl),
|
||||
periodic_compaction_seconds(options.periodic_compaction_seconds),
|
||||
sample_for_compression(options.sample_for_compression) {
|
||||
assert(memtable_factory.get() != nullptr);
|
||||
if (max_bytes_for_level_multiplier_additional.size() <
|
||||
@ -352,6 +353,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
|
||||
report_bg_io_stats);
|
||||
ROCKS_LOG_HEADER(log, " Options.ttl: %" PRIu64,
|
||||
ttl);
|
||||
ROCKS_LOG_HEADER(log,
|
||||
" Options.periodic_compaction_seconds: %" PRIu64,
|
||||
periodic_compaction_seconds);
|
||||
} // ColumnFamilyOptions::Dump
|
||||
|
||||
void Options::Dump(Logger* log) const {
|
||||
|
@ -179,6 +179,8 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
|
||||
cf_opts.max_bytes_for_level_multiplier =
|
||||
mutable_cf_options.max_bytes_for_level_multiplier;
|
||||
cf_opts.ttl = mutable_cf_options.ttl;
|
||||
cf_opts.periodic_compaction_seconds =
|
||||
mutable_cf_options.periodic_compaction_seconds;
|
||||
|
||||
cf_opts.max_bytes_for_level_multiplier_additional.clear();
|
||||
for (auto value :
|
||||
@ -1960,6 +1962,10 @@ std::unordered_map<std::string, OptionTypeInfo>
|
||||
{offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T,
|
||||
OptionVerificationType::kNormal, true,
|
||||
offsetof(struct MutableCFOptions, ttl)}},
|
||||
{"periodic_compaction_seconds",
|
||||
{offset_of(&ColumnFamilyOptions::periodic_compaction_seconds),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal, true,
|
||||
offsetof(struct MutableCFOptions, periodic_compaction_seconds)}},
|
||||
{"sample_for_compression",
|
||||
{offset_of(&ColumnFamilyOptions::sample_for_compression),
|
||||
OptionType::kUInt64T, OptionVerificationType::kNormal, true,
|
||||
|
@ -452,6 +452,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
|
||||
"disable_auto_compactions=false;"
|
||||
"report_bg_io_stats=true;"
|
||||
"ttl=60;"
|
||||
"periodic_compaction_seconds=3600;"
|
||||
"sample_for_compression=0;"
|
||||
"compaction_options_fifo={max_table_files_size=3;allow_"
|
||||
"compaction=false;};",
|
||||
|
@ -351,6 +351,7 @@ struct BlockBasedTableBuilder::Rep {
|
||||
uint64_t creation_time = 0;
|
||||
uint64_t oldest_key_time = 0;
|
||||
const uint64_t target_file_size;
|
||||
uint64_t file_creation_time = 0;
|
||||
|
||||
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
|
||||
|
||||
@ -364,7 +365,8 @@ struct BlockBasedTableBuilder::Rep {
|
||||
const uint64_t _sample_for_compression,
|
||||
const CompressionOptions& _compression_opts, const bool skip_filters,
|
||||
const std::string& _column_family_name, const uint64_t _creation_time,
|
||||
const uint64_t _oldest_key_time, const uint64_t _target_file_size)
|
||||
const uint64_t _oldest_key_time, const uint64_t _target_file_size,
|
||||
const uint64_t _file_creation_time)
|
||||
: ioptions(_ioptions),
|
||||
moptions(_moptions),
|
||||
table_options(table_opt),
|
||||
@ -401,7 +403,8 @@ struct BlockBasedTableBuilder::Rep {
|
||||
column_family_name(_column_family_name),
|
||||
creation_time(_creation_time),
|
||||
oldest_key_time(_oldest_key_time),
|
||||
target_file_size(_target_file_size) {
|
||||
target_file_size(_target_file_size),
|
||||
file_creation_time(_file_creation_time) {
|
||||
if (table_options.index_type ==
|
||||
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
||||
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
|
||||
@ -453,7 +456,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
|
||||
const uint64_t sample_for_compression,
|
||||
const CompressionOptions& compression_opts, const bool skip_filters,
|
||||
const std::string& column_family_name, const uint64_t creation_time,
|
||||
const uint64_t oldest_key_time, const uint64_t target_file_size) {
|
||||
const uint64_t oldest_key_time, const uint64_t target_file_size,
|
||||
const uint64_t file_creation_time) {
|
||||
BlockBasedTableOptions sanitized_table_options(table_options);
|
||||
if (sanitized_table_options.format_version == 0 &&
|
||||
sanitized_table_options.checksum != kCRC32c) {
|
||||
@ -466,11 +470,12 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
|
||||
sanitized_table_options.format_version = 1;
|
||||
}
|
||||
|
||||
rep_ = new Rep(
|
||||
ioptions, moptions, sanitized_table_options, internal_comparator,
|
||||
int_tbl_prop_collector_factories, column_family_id, file,
|
||||
compression_type, sample_for_compression, compression_opts, skip_filters,
|
||||
column_family_name, creation_time, oldest_key_time, target_file_size);
|
||||
rep_ =
|
||||
new Rep(ioptions, moptions, sanitized_table_options, internal_comparator,
|
||||
int_tbl_prop_collector_factories, column_family_id, file,
|
||||
compression_type, sample_for_compression, compression_opts,
|
||||
skip_filters, column_family_name, creation_time, oldest_key_time,
|
||||
target_file_size, file_creation_time);
|
||||
|
||||
if (rep_->filter_builder != nullptr) {
|
||||
rep_->filter_builder->StartBlock(0);
|
||||
@ -955,6 +960,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
|
||||
rep_->use_delta_encoding_for_index_values;
|
||||
rep_->props.creation_time = rep_->creation_time;
|
||||
rep_->props.oldest_key_time = rep_->oldest_key_time;
|
||||
rep_->props.file_creation_time = rep_->file_creation_time;
|
||||
|
||||
// Add basic properties
|
||||
property_block_builder.AddTableProperty(rep_->props);
|
||||
|
@ -48,7 +48,8 @@ class BlockBasedTableBuilder : public TableBuilder {
|
||||
const uint64_t sample_for_compression,
|
||||
const CompressionOptions& compression_opts, const bool skip_filters,
|
||||
const std::string& column_family_name, const uint64_t creation_time = 0,
|
||||
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0);
|
||||
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0,
|
||||
const uint64_t file_creation_time = 0);
|
||||
|
||||
// REQUIRES: Either Finish() or Abandon() has been called.
|
||||
~BlockBasedTableBuilder();
|
||||
|
@ -220,7 +220,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
|
||||
table_builder_options.column_family_name,
|
||||
table_builder_options.creation_time,
|
||||
table_builder_options.oldest_key_time,
|
||||
table_builder_options.target_file_size);
|
||||
table_builder_options.target_file_size,
|
||||
table_builder_options.file_creation_time);
|
||||
|
||||
return table_builder;
|
||||
}
|
||||
|
@ -89,6 +89,9 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
|
||||
Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id);
|
||||
Add(TablePropertiesNames::kCreationTime, props.creation_time);
|
||||
Add(TablePropertiesNames::kOldestKeyTime, props.oldest_key_time);
|
||||
if (props.file_creation_time > 0) {
|
||||
Add(TablePropertiesNames::kFileCreationTime, props.file_creation_time);
|
||||
}
|
||||
|
||||
if (!props.filter_policy_name.empty()) {
|
||||
Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name);
|
||||
@ -260,6 +263,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
|
||||
&new_table_properties->creation_time},
|
||||
{TablePropertiesNames::kOldestKeyTime,
|
||||
&new_table_properties->oldest_key_time},
|
||||
{TablePropertiesNames::kFileCreationTime,
|
||||
&new_table_properties->file_creation_time},
|
||||
};
|
||||
|
||||
std::string last_key;
|
||||
|
@ -77,7 +77,8 @@ struct TableBuilderOptions {
|
||||
const CompressionOptions& _compression_opts, bool _skip_filters,
|
||||
const std::string& _column_family_name, int _level,
|
||||
const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0,
|
||||
const uint64_t _target_file_size = 0)
|
||||
const uint64_t _target_file_size = 0,
|
||||
const uint64_t _file_creation_time = 0)
|
||||
: ioptions(_ioptions),
|
||||
moptions(_moptions),
|
||||
internal_comparator(_internal_comparator),
|
||||
@ -90,7 +91,8 @@ struct TableBuilderOptions {
|
||||
level(_level),
|
||||
creation_time(_creation_time),
|
||||
oldest_key_time(_oldest_key_time),
|
||||
target_file_size(_target_file_size) {}
|
||||
target_file_size(_target_file_size),
|
||||
file_creation_time(_file_creation_time) {}
|
||||
const ImmutableCFOptions& ioptions;
|
||||
const MutableCFOptions& moptions;
|
||||
const InternalKeyComparator& internal_comparator;
|
||||
@ -105,6 +107,7 @@ struct TableBuilderOptions {
|
||||
const uint64_t creation_time;
|
||||
const int64_t oldest_key_time;
|
||||
const uint64_t target_file_size;
|
||||
const uint64_t file_creation_time;
|
||||
};
|
||||
|
||||
// TableBuilder provides the interface used to build a Table
|
||||
|
@ -163,6 +163,9 @@ std::string TableProperties::ToString(
|
||||
AppendProperty(result, "time stamp of earliest key", oldest_key_time,
|
||||
prop_delim, kv_delim);
|
||||
|
||||
AppendProperty(result, "file creation time", file_creation_time, prop_delim,
|
||||
kv_delim);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -233,6 +236,8 @@ const std::string TablePropertiesNames::kCompressionOptions =
|
||||
const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time";
|
||||
const std::string TablePropertiesNames::kOldestKeyTime =
|
||||
"rocksdb.oldest.key.time";
|
||||
const std::string TablePropertiesNames::kFileCreationTime =
|
||||
"rocksdb.file.creation.time";
|
||||
|
||||
extern const std::string kPropertiesBlock = "rocksdb.properties";
|
||||
// Old property block name for backward compatibility
|
||||
|
@ -346,6 +346,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd) {
|
||||
// uint64_t options
|
||||
static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
|
||||
cf_opt->ttl = uint_max + rnd->Uniform(10000);
|
||||
cf_opt->periodic_compaction_seconds = uint_max + rnd->Uniform(10000);
|
||||
cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000);
|
||||
cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000);
|
||||
cf_opt->max_compaction_bytes =
|
||||
|
Loading…
Reference in New Issue
Block a user