Periodic Compactions (#5166)

Summary:
Introducing Periodic Compactions.

This feature allows all the files in a CF to be periodically compacted. It could help in catching any corruptions that could creep into the DB proactively as every file is constantly getting re-compacted.  And also, of course, it helps to cleanup data older than certain threshold.

- Introduced a new option `periodic_compaction_time` to control how long a file can live without being compacted in a CF.
- This works across all levels.
- The files are put in the same level after going through the compaction. (Related files in the same level are picked up as `ExpandInputstoCleanCut` is used).
- Compaction filters, if any, are invoked as usual.
- A new table property, `file_creation_time`, is introduced to implement this feature. This property is set to the time at which the SST file was created (and that time is given by the underlying Env/OS).

This feature can be enabled on its own, or in conjunction with `ttl`. It is possible to set a different time threshold for the bottom level when used in conjunction with ttl. Since `ttl` works only on 0 to last but one levels, you could set `ttl` to, say, 1 day, and `periodic_compaction_time` to, say, 7 days. Since `ttl < periodic_compaction_time` all files in last but one levels keep getting picked up based on ttl, and almost never based on periodic_compaction_time. The files in the bottom level get picked up for compaction based on `periodic_compaction_time`.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5166

Differential Revision: D14884441

Pulled By: sagar0

fbshipit-source-id: 408426cbacb409c06386a98632dcf90bfa1bda47
This commit is contained in:
Sagar Vemuri 2019-04-10 19:24:25 -07:00
parent 628a7fd74b
commit 6aba4c6a9b
25 changed files with 373 additions and 38 deletions

View File

@ -1,5 +1,9 @@
# Rocksdb Change Log # Rocksdb Change Log
## 6.0.3 (4/30/2019)
### New Features
* Introduce Periodic Compaction for Level style compaction. Files are re-compacted periodically and put in the same level.
## 6.0.2 (4/23/2019) ## 6.0.2 (4/23/2019)
## Bug Fixes ## Bug Fixes
* Fix build failures due to missing JEMALLOC_CXX_THROW macro (#5053) * Fix build failures due to missing JEMALLOC_CXX_THROW macro (#5053)

View File

@ -49,7 +49,8 @@ TableBuilder* NewTableBuilder(
WritableFileWriter* file, const CompressionType compression_type, WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level, const CompressionOptions& compression_opts, int level,
const bool skip_filters, const uint64_t creation_time, const bool skip_filters, const uint64_t creation_time,
const uint64_t oldest_key_time, const uint64_t target_file_size) { const uint64_t oldest_key_time, const uint64_t target_file_size,
const uint64_t file_creation_time) {
assert((column_family_id == assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty()); column_family_name.empty());
@ -58,7 +59,7 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories, compression_type, int_tbl_prop_collector_factories, compression_type,
compression_opts, skip_filters, column_family_name, compression_opts, skip_filters, column_family_name,
level, creation_time, oldest_key_time, level, creation_time, oldest_key_time,
target_file_size), target_file_size, file_creation_time),
column_family_id, file); column_family_id, file);
} }
@ -79,7 +80,8 @@ Status BuildTable(
InternalStats* internal_stats, TableFileCreationReason reason, InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time, TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) { const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
const uint64_t file_creation_time) {
assert((column_family_id == assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty()); column_family_name.empty());
@ -134,7 +136,7 @@ Status BuildTable(
int_tbl_prop_collector_factories, column_family_id, int_tbl_prop_collector_factories, column_family_id,
column_family_name, file_writer.get(), compression, column_family_name, file_writer.get(), compression,
compression_opts_for_flush, level, false /* skip_filters */, compression_opts_for_flush, level, false /* skip_filters */,
creation_time, oldest_key_time); creation_time, oldest_key_time, file_creation_time);
} }
MergeHelper merge(env, internal_comparator.user_comparator(), MergeHelper merge(env, internal_comparator.user_comparator(),

View File

@ -49,7 +49,8 @@ TableBuilder* NewTableBuilder(
WritableFileWriter* file, const CompressionType compression_type, WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level, const CompressionOptions& compression_opts, int level,
const bool skip_filters = false, const uint64_t creation_time = 0, const bool skip_filters = false, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0); const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0,
const uint64_t file_creation_time = 0);
// Build a Table file from the contents of *iter. The generated file // Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of // will be named according to number specified in meta. On success, the rest of
@ -78,6 +79,7 @@ extern Status BuildTable(
const Env::IOPriority io_priority = Env::IO_HIGH, const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1, TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0, const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET); Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
const uint64_t file_creation_time = 0);
} // namespace rocksdb } // namespace rocksdb

View File

@ -97,6 +97,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
return "Flush"; return "Flush";
case CompactionReason::kExternalSstIngestion: case CompactionReason::kExternalSstIngestion:
return "ExternalSstIngestion"; return "ExternalSstIngestion";
case CompactionReason::kPeriodicCompaction:
return "PeriodicCompaction";
case CompactionReason::kNumOfReasons: case CompactionReason::kNumOfReasons:
// fall through // fall through
default: default:
@ -1478,20 +1480,20 @@ Status CompactionJob::OpenCompactionOutputFile(
bool skip_filters = bool skip_filters =
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
uint64_t output_file_creation_time = int64_t temp_current_time = 0;
auto get_time_status = env_->GetCurrentTime(&temp_current_time);
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
if (!get_time_status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time. Status: %s",
get_time_status.ToString().c_str());
}
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
uint64_t latest_key_time =
sub_compact->compaction->MaxInputFileCreationTime(); sub_compact->compaction->MaxInputFileCreationTime();
if (output_file_creation_time == 0) { if (latest_key_time == 0) {
int64_t _current_time = 0; latest_key_time = current_time;
auto status = db_options_.env->GetCurrentTime(&_current_time);
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
if (!status.ok()) {
ROCKS_LOG_WARN(
db_options_.info_log,
"Failed to get current time to populate creation_time property. "
"Status: %s",
status.ToString().c_str());
}
output_file_creation_time = static_cast<uint64_t>(_current_time);
} }
sub_compact->builder.reset(NewTableBuilder( sub_compact->builder.reset(NewTableBuilder(
@ -1500,9 +1502,9 @@ Status CompactionJob::OpenCompactionOutputFile(
cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(), sub_compact->compaction->output_compression(),
sub_compact->compaction->output_compression_opts(), sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), skip_filters, sub_compact->compaction->output_level(), skip_filters, latest_key_time,
output_file_creation_time, 0 /* oldest_key_time */, 0 /* oldest_key_time */, sub_compact->compaction->max_output_file_size(),
sub_compact->compaction->max_output_file_size())); current_time));
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
return s; return s;
} }

View File

@ -1073,6 +1073,9 @@ bool LevelCompactionPicker::NeedsCompaction(
if (!vstorage->ExpiredTtlFiles().empty()) { if (!vstorage->ExpiredTtlFiles().empty()) {
return true; return true;
} }
if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
return true;
}
if (!vstorage->BottommostFilesMarkedForCompaction().empty()) { if (!vstorage->BottommostFilesMarkedForCompaction().empty()) {
return true; return true;
} }
@ -1141,6 +1144,8 @@ class LevelCompactionBuilder {
void PickExpiredTtlFiles(); void PickExpiredTtlFiles();
void PickFilesMarkedForPeriodicCompaction();
const std::string& cf_name_; const std::string& cf_name_;
VersionStorageInfo* vstorage_; VersionStorageInfo* vstorage_;
CompactionPicker* compaction_picker_; CompactionPicker* compaction_picker_;
@ -1203,6 +1208,39 @@ void LevelCompactionBuilder::PickExpiredTtlFiles() {
start_level_inputs_.files.clear(); start_level_inputs_.files.clear();
} }
void LevelCompactionBuilder::PickFilesMarkedForPeriodicCompaction() {
if (vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
return;
}
auto continuation = [&](std::pair<int, FileMetaData*> level_file) {
// If it's being compacted it has nothing to do here.
// If this assert() fails that means that some function marked some
// files as being_compacted, but didn't call ComputeCompactionScore()
assert(!level_file.second->being_compacted);
output_level_ = start_level_ = level_file.first;
if (start_level_ == 0 &&
!compaction_picker_->level0_compactions_in_progress()->empty()) {
return false;
}
start_level_inputs_.files = {level_file.second};
start_level_inputs_.level = start_level_;
return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&start_level_inputs_);
};
for (auto& level_file : vstorage_->FilesMarkedForPeriodicCompaction()) {
if (continuation(level_file)) {
// found the compaction!
return;
}
}
start_level_inputs_.files.clear();
}
void LevelCompactionBuilder::SetupInitialFiles() { void LevelCompactionBuilder::SetupInitialFiles() {
// Find the compactions by size on all levels. // Find the compactions by size on all levels.
bool skipped_l0_to_base = false; bool skipped_l0_to_base = false;
@ -1256,7 +1294,6 @@ void LevelCompactionBuilder::SetupInitialFiles() {
if (start_level_inputs_.empty()) { if (start_level_inputs_.empty()) {
parent_index_ = base_index_ = -1; parent_index_ = base_index_ = -1;
// PickFilesMarkedForCompaction();
compaction_picker_->PickFilesMarkedForCompaction( compaction_picker_->PickFilesMarkedForCompaction(
cf_name_, vstorage_, &start_level_, &output_level_, &start_level_inputs_); cf_name_, vstorage_, &start_level_, &output_level_, &start_level_inputs_);
if (!start_level_inputs_.empty()) { if (!start_level_inputs_.empty()) {
@ -1264,7 +1301,10 @@ void LevelCompactionBuilder::SetupInitialFiles() {
compaction_reason_ = CompactionReason::kFilesMarkedForCompaction; compaction_reason_ = CompactionReason::kFilesMarkedForCompaction;
return; return;
} }
}
// Bottommost Files Compaction on deleting tombstones
if (start_level_inputs_.empty()) {
size_t i; size_t i;
for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size(); for (i = 0; i < vstorage_->BottommostFilesMarkedForCompaction().size();
++i) { ++i) {
@ -1285,11 +1325,23 @@ void LevelCompactionBuilder::SetupInitialFiles() {
compaction_reason_ = CompactionReason::kBottommostFiles; compaction_reason_ = CompactionReason::kBottommostFiles;
return; return;
} }
}
assert(start_level_inputs_.empty()); // TTL Compaction
if (start_level_inputs_.empty()) {
PickExpiredTtlFiles(); PickExpiredTtlFiles();
if (!start_level_inputs_.empty()) { if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kTtl; compaction_reason_ = CompactionReason::kTtl;
return;
}
}
// Periodic Compaction
if (start_level_inputs_.empty()) {
PickFilesMarkedForPeriodicCompaction();
if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kPeriodicCompaction;
return;
} }
} }
} }

View File

@ -3515,6 +3515,160 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
const int kNumKeysPerFile = 32;
const int kNumLevelFiles = 2;
const int kValueSize = 100;
Options options = CurrentOptions();
options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
options.max_open_files = -1; // needed for ttl compaction
env_->time_elapse_only_sleep_ = false;
options.env = env_;
env_->addon_time_.store(0);
DestroyAndReopen(options);
int periodic_compactions = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
auto compaction_reason = compaction->compaction_reason();
if (compaction_reason == CompactionReason::kPeriodicCompaction) {
periodic_compactions++;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
}
Flush();
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("2", FilesPerLevel());
ASSERT_EQ(0, periodic_compactions);
// Add 50 hours and do a write
env_->addon_time_.fetch_add(50 * 60 * 60);
ASSERT_OK(Put("a", "1"));
Flush();
dbfull()->TEST_WaitForCompact();
// Assert that the files stay in the same level
ASSERT_EQ("3", FilesPerLevel());
// The two old files go through the periodic compaction process
ASSERT_EQ(2, periodic_compactions);
MoveFilesToLevel(1);
ASSERT_EQ("0,3", FilesPerLevel());
// Add another 50 hours and do another write
env_->addon_time_.fetch_add(50 * 60 * 60);
ASSERT_OK(Put("b", "2"));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("1,3", FilesPerLevel());
// The three old files now go through the periodic compaction process. 2 + 3.
ASSERT_EQ(5, periodic_compactions);
// Add another 50 hours and do another write
env_->addon_time_.fetch_add(50 * 60 * 60);
ASSERT_OK(Put("c", "3"));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("2,3", FilesPerLevel());
// The four old files now go through the periodic compaction process. 5 + 4.
ASSERT_EQ(9, periodic_compactions);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
const int kNumKeysPerFile = 32;
const int kNumLevelFiles = 2;
const int kValueSize = 100;
Options options = CurrentOptions();
options.ttl = 10 * 60 * 60; // 10 hours
options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
options.max_open_files = -1; // needed for both periodic and ttl compactions
env_->time_elapse_only_sleep_ = false;
options.env = env_;
env_->addon_time_.store(0);
DestroyAndReopen(options);
int periodic_compactions = 0;
int ttl_compactions = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
auto compaction_reason = compaction->compaction_reason();
if (compaction_reason == CompactionReason::kPeriodicCompaction) {
periodic_compactions++;
} else if (compaction_reason == CompactionReason::kTtl) {
ttl_compactions++;
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Random rnd(301);
for (int i = 0; i < kNumLevelFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
}
Flush();
}
dbfull()->TEST_WaitForCompact();
MoveFilesToLevel(3);
ASSERT_EQ("0,0,0,2", FilesPerLevel());
ASSERT_EQ(0, periodic_compactions);
ASSERT_EQ(0, ttl_compactions);
// Add some time greater than periodic_compaction_time.
env_->addon_time_.fetch_add(50 * 60 * 60);
ASSERT_OK(Put("a", "1"));
Flush();
dbfull()->TEST_WaitForCompact();
// Files in the bottom level go through periodic compactions.
ASSERT_EQ("1,0,0,2", FilesPerLevel());
ASSERT_EQ(2, periodic_compactions);
ASSERT_EQ(0, ttl_compactions);
// Add a little more time than ttl
env_->addon_time_.fetch_add(11 * 60 * 60);
ASSERT_OK(Put("b", "1"));
Flush();
dbfull()->TEST_WaitForCompact();
// Notice that the previous file in level 1 falls down to the bottom level
// due to ttl compactions, one level at a time.
// And bottom level files don't get picked up for ttl compactions.
ASSERT_EQ("1,0,0,3", FilesPerLevel());
ASSERT_EQ(2, periodic_compactions);
ASSERT_EQ(3, ttl_compactions);
// Add some time greater than periodic_compaction_time.
env_->addon_time_.fetch_add(50 * 60 * 60);
ASSERT_OK(Put("c", "1"));
Flush();
dbfull()->TEST_WaitForCompact();
// Previous L0 file falls one level at a time to bottom level due to ttl.
// And all 4 bottom files go through periodic compactions.
ASSERT_EQ("1,0,0,4", FilesPerLevel());
ASSERT_EQ(6, periodic_compactions);
ASSERT_EQ(6, ttl_compactions);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
// Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
// compaction only triggers flush after it's sure stall won't be triggered for // compaction only triggers flush after it's sure stall won't be triggered for

View File

@ -188,6 +188,20 @@ static Status ValidateOptions(
"TTL is only supported in Block-Based Table format. "); "TTL is only supported in Block-Based Table format. ");
} }
} }
if (cfd.options.periodic_compaction_seconds > 0) {
if (db_options.max_open_files != -1) {
return Status::NotSupported(
"Periodic Compaction is only supported when files are always "
"kept open (set max_open_files = -1). ");
}
if (cfd.options.table_factory->Name() !=
BlockBasedTableFactory().Name()) {
return Status::NotSupported(
"Periodic Compaction is only supported in "
"Block-Based Table format. ");
}
}
} }
if (db_options.db_paths.size() > 4) { if (db_options.db_paths.size() > 4) {

View File

@ -376,7 +376,7 @@ Status FlushJob::WriteLevel0Table() {
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time, Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
oldest_key_time, write_hint); oldest_key_time, write_hint, current_time);
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);
} }
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,

View File

@ -1704,6 +1704,10 @@ void VersionStorageInfo::ComputeCompactionScore(
if (mutable_cf_options.ttl > 0) { if (mutable_cf_options.ttl > 0) {
ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl); ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
} }
if (mutable_cf_options.periodic_compaction_seconds > 0) {
ComputeFilesMarkedForPeriodicCompaction(
immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
}
EstimateCompactionBytesNeeded(mutable_cf_options); EstimateCompactionBytesNeeded(mutable_cf_options);
} }
@ -1757,6 +1761,36 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
} }
} }
void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
const ImmutableCFOptions& ioptions,
const uint64_t periodic_compaction_seconds) {
assert(periodic_compaction_seconds > 0);
files_marked_for_periodic_compaction_.clear();
int64_t temp_current_time;
auto status = ioptions.env->GetCurrentTime(&temp_current_time);
if (!status.ok()) {
return;
}
const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
const uint64_t allowed_time_limit =
current_time - periodic_compaction_seconds;
for (int level = 0; level < num_levels(); level++) {
for (auto f : files_[level]) {
if (!f->being_compacted && f->fd.table_reader != nullptr &&
f->fd.table_reader->GetTableProperties() != nullptr) {
auto file_creation_time =
f->fd.table_reader->GetTableProperties()->file_creation_time;
if (file_creation_time > 0 && file_creation_time < allowed_time_limit) {
files_marked_for_periodic_compaction_.emplace_back(level, f);
}
}
}
}
}
namespace { namespace {
// used to sort files by size // used to sort files by size

View File

@ -139,6 +139,12 @@ class VersionStorageInfo {
void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions, void ComputeExpiredTtlFiles(const ImmutableCFOptions& ioptions,
const uint64_t ttl); const uint64_t ttl);
// This computes files_marked_for_periodic_compaction_ and is called by
// ComputeCompactionScore()
void ComputeFilesMarkedForPeriodicCompaction(
const ImmutableCFOptions& ioptions,
const uint64_t periodic_compaction_seconds);
// This computes bottommost_files_marked_for_compaction_ and is called by // This computes bottommost_files_marked_for_compaction_ and is called by
// ComputeCompactionScore() or UpdateOldestSnapshot(). // ComputeCompactionScore() or UpdateOldestSnapshot().
// //
@ -300,6 +306,14 @@ class VersionStorageInfo {
return expired_ttl_files_; return expired_ttl_files_;
} }
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
// REQUIRES: DB mutex held during access
const autovector<std::pair<int, FileMetaData*>>&
FilesMarkedForPeriodicCompaction() const {
assert(finalized_);
return files_marked_for_periodic_compaction_;
}
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)
// REQUIRES: DB mutex held during access // REQUIRES: DB mutex held during access
const autovector<std::pair<int, FileMetaData*>>& const autovector<std::pair<int, FileMetaData*>>&
@ -465,6 +479,9 @@ class VersionStorageInfo {
autovector<std::pair<int, FileMetaData*>> expired_ttl_files_; autovector<std::pair<int, FileMetaData*>> expired_ttl_files_;
autovector<std::pair<int, FileMetaData*>>
files_marked_for_periodic_compaction_;
// These files are considered bottommost because none of their keys can exist // These files are considered bottommost because none of their keys can exist
// at lower levels. They are not necessarily all in the same level. The marked // at lower levels. They are not necessarily all in the same level. The marked
// ones are eligible for compaction because they contain duplicate key // ones are eligible for compaction because they contain duplicate key

View File

@ -644,6 +644,16 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API
uint64_t ttl = 0; uint64_t ttl = 0;
// Files older than this value will be picked up for compaction, and
// re-written to the same level as they were before.
// Only supported in Level compaction.
// Pre-req: max_open_file == -1.
// unit: seconds. Ex: 7 days = 7 * 24 * 60 * 60
// Default: 0 (disabled)
//
// Dynamically changeable through SetOptions() API
uint64_t periodic_compaction_seconds = 0;
// Create ColumnFamilyOptions with default values for all fields // Create ColumnFamilyOptions with default values for all fields
AdvancedColumnFamilyOptions(); AdvancedColumnFamilyOptions();
// Create ColumnFamilyOptions from Options // Create ColumnFamilyOptions from Options

View File

@ -89,6 +89,8 @@ enum class CompactionReason : int {
kFlush, kFlush,
// Compaction caused by external sst file ingestion // Compaction caused by external sst file ingestion
kExternalSstIngestion, kExternalSstIngestion,
// Compaction due to SST file being too old
kPeriodicCompaction,
// total number of compaction reasons, new reasons must be added above this. // total number of compaction reasons, new reasons must be added above this.
kNumOfReasons, kNumOfReasons,
}; };

View File

@ -55,6 +55,7 @@ struct TablePropertiesNames {
static const std::string kCompression; static const std::string kCompression;
static const std::string kCreationTime; static const std::string kCreationTime;
static const std::string kOldestKeyTime; static const std::string kOldestKeyTime;
static const std::string kFileCreationTime;
}; };
extern const std::string kPropertiesBlock; extern const std::string kPropertiesBlock;
@ -168,11 +169,14 @@ struct TableProperties {
// by column_family_name. // by column_family_name.
uint64_t column_family_id = uint64_t column_family_id =
rocksdb::TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; rocksdb::TablePropertiesCollectorFactory::Context::kUnknownColumnFamily;
// The time when the SST file was created. // Timestamp of the latest key. 0 means unknown.
// Since SST files are immutable, this is equivalent to last modified time. // TODO(sagar0): Should be changed to latest_key_time ... but don't know the
// full implications of backward compatibility. Hence retaining for now.
uint64_t creation_time = 0; uint64_t creation_time = 0;
// Timestamp of the earliest key. 0 means unknown. // Timestamp of the earliest key. 0 means unknown.
uint64_t oldest_key_time = 0; uint64_t oldest_key_time = 0;
// Actual SST file creation time. 0 means unknown.
uint64_t file_creation_time = 0;
// Name of the column family with which this SST file is associated. // Name of the column family with which this SST file is associated.
// If column family is unknown, `column_family_name` will be an empty string. // If column family is unknown, `column_family_name` will be an empty string.

View File

@ -173,6 +173,8 @@ void MutableCFOptions::Dump(Logger* log) const {
max_bytes_for_level_multiplier); max_bytes_for_level_multiplier);
ROCKS_LOG_INFO(log, " ttl: %" PRIu64, ROCKS_LOG_INFO(log, " ttl: %" PRIu64,
ttl); ttl);
ROCKS_LOG_INFO(log, " periodic_compaction_seconds: %" PRIu64,
periodic_compaction_seconds);
std::string result; std::string result;
char buf[10]; char buf[10];
for (const auto m : max_bytes_for_level_multiplier_additional) { for (const auto m : max_bytes_for_level_multiplier_additional) {

View File

@ -151,6 +151,7 @@ struct MutableCFOptions {
max_bytes_for_level_base(options.max_bytes_for_level_base), max_bytes_for_level_base(options.max_bytes_for_level_base),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
ttl(options.ttl), ttl(options.ttl),
periodic_compaction_seconds(options.periodic_compaction_seconds),
max_bytes_for_level_multiplier_additional( max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional), options.max_bytes_for_level_multiplier_additional),
compaction_options_fifo(options.compaction_options_fifo), compaction_options_fifo(options.compaction_options_fifo),
@ -185,6 +186,7 @@ struct MutableCFOptions {
max_bytes_for_level_base(0), max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0), max_bytes_for_level_multiplier(0),
ttl(0), ttl(0),
periodic_compaction_seconds(0),
compaction_options_fifo(), compaction_options_fifo(),
max_sequential_skip_in_iterations(0), max_sequential_skip_in_iterations(0),
paranoid_file_checks(false), paranoid_file_checks(false),
@ -234,6 +236,7 @@ struct MutableCFOptions {
uint64_t max_bytes_for_level_base; uint64_t max_bytes_for_level_base;
double max_bytes_for_level_multiplier; double max_bytes_for_level_multiplier;
uint64_t ttl; uint64_t ttl;
uint64_t periodic_compaction_seconds;
std::vector<int> max_bytes_for_level_multiplier_additional; std::vector<int> max_bytes_for_level_multiplier_additional;
CompactionOptionsFIFO compaction_options_fifo; CompactionOptionsFIFO compaction_options_fifo;
CompactionOptionsUniversal compaction_options_universal; CompactionOptionsUniversal compaction_options_universal;

View File

@ -87,7 +87,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
paranoid_file_checks(options.paranoid_file_checks), paranoid_file_checks(options.paranoid_file_checks),
force_consistency_checks(options.force_consistency_checks), force_consistency_checks(options.force_consistency_checks),
report_bg_io_stats(options.report_bg_io_stats), report_bg_io_stats(options.report_bg_io_stats),
ttl(options.ttl) { ttl(options.ttl),
periodic_compaction_seconds(options.periodic_compaction_seconds) {
assert(memtable_factory.get() != nullptr); assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() < if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) { static_cast<unsigned int>(num_levels)) {
@ -349,7 +350,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
force_consistency_checks); force_consistency_checks);
ROCKS_LOG_HEADER(log, " Options.report_bg_io_stats: %d", ROCKS_LOG_HEADER(log, " Options.report_bg_io_stats: %d",
report_bg_io_stats); report_bg_io_stats);
ROCKS_LOG_HEADER(log, " Options.ttl: %d", ttl); ROCKS_LOG_HEADER(log, " Options.ttl: %" PRIu64,
ttl);
ROCKS_LOG_HEADER(log,
" Options.periodic_compaction_seconds: %" PRIu64,
periodic_compaction_seconds);
} // ColumnFamilyOptions::Dump } // ColumnFamilyOptions::Dump
void Options::Dump(Logger* log) const { void Options::Dump(Logger* log) const {

View File

@ -176,6 +176,8 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
cf_opts.max_bytes_for_level_multiplier = cf_opts.max_bytes_for_level_multiplier =
mutable_cf_options.max_bytes_for_level_multiplier; mutable_cf_options.max_bytes_for_level_multiplier;
cf_opts.ttl = mutable_cf_options.ttl; cf_opts.ttl = mutable_cf_options.ttl;
cf_opts.periodic_compaction_seconds =
mutable_cf_options.periodic_compaction_seconds;
cf_opts.max_bytes_for_level_multiplier_additional.clear(); cf_opts.max_bytes_for_level_multiplier_additional.clear();
for (auto value : for (auto value :
@ -1911,7 +1913,11 @@ std::unordered_map<std::string, OptionTypeInfo>
{"ttl", {"ttl",
{offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T, {offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T,
OptionVerificationType::kNormal, true, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, ttl)}}}; offsetof(struct MutableCFOptions, ttl)}},
{"periodic_compaction_seconds",
{offset_of(&ColumnFamilyOptions::periodic_compaction_seconds),
OptionType::kUInt64T, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, periodic_compaction_seconds)}}};
std::unordered_map<std::string, OptionTypeInfo> std::unordered_map<std::string, OptionTypeInfo>
OptionsHelper::fifo_compaction_options_type_info = { OptionsHelper::fifo_compaction_options_type_info = {

View File

@ -451,6 +451,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"disable_auto_compactions=false;" "disable_auto_compactions=false;"
"report_bg_io_stats=true;" "report_bg_io_stats=true;"
"ttl=60;" "ttl=60;"
"periodic_compaction_seconds=3600;"
"compaction_options_fifo={max_table_files_size=3;allow_" "compaction_options_fifo={max_table_files_size=3;allow_"
"compaction=false;};", "compaction=false;};",
new_options)); new_options));

View File

@ -318,6 +318,7 @@ struct BlockBasedTableBuilder::Rep {
uint64_t creation_time = 0; uint64_t creation_time = 0;
uint64_t oldest_key_time = 0; uint64_t oldest_key_time = 0;
const uint64_t target_file_size; const uint64_t target_file_size;
uint64_t file_creation_time = 0;
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors; std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
@ -330,7 +331,8 @@ struct BlockBasedTableBuilder::Rep {
const CompressionType _compression_type, const CompressionType _compression_type,
const CompressionOptions& _compression_opts, const bool skip_filters, const CompressionOptions& _compression_opts, const bool skip_filters,
const std::string& _column_family_name, const uint64_t _creation_time, const std::string& _column_family_name, const uint64_t _creation_time,
const uint64_t _oldest_key_time, const uint64_t _target_file_size) const uint64_t _oldest_key_time, const uint64_t _target_file_size,
const uint64_t _file_creation_time)
: ioptions(_ioptions), : ioptions(_ioptions),
moptions(_moptions), moptions(_moptions),
table_options(table_opt), table_options(table_opt),
@ -366,7 +368,8 @@ struct BlockBasedTableBuilder::Rep {
column_family_name(_column_family_name), column_family_name(_column_family_name),
creation_time(_creation_time), creation_time(_creation_time),
oldest_key_time(_oldest_key_time), oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size) { target_file_size(_target_file_size),
file_creation_time(_file_creation_time) {
if (table_options.index_type == if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) { BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
@ -417,7 +420,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
const CompressionType compression_type, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters, const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time, const std::string& column_family_name, const uint64_t creation_time,
const uint64_t oldest_key_time, const uint64_t target_file_size) { const uint64_t oldest_key_time, const uint64_t target_file_size,
const uint64_t file_creation_time) {
BlockBasedTableOptions sanitized_table_options(table_options); BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 && if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) { sanitized_table_options.checksum != kCRC32c) {
@ -434,7 +438,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
internal_comparator, int_tbl_prop_collector_factories, internal_comparator, int_tbl_prop_collector_factories,
column_family_id, file, compression_type, compression_opts, column_family_id, file, compression_type, compression_opts,
skip_filters, column_family_name, creation_time, skip_filters, column_family_name, creation_time,
oldest_key_time, target_file_size); oldest_key_time, target_file_size, file_creation_time);
if (rep_->filter_builder != nullptr) { if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0); rep_->filter_builder->StartBlock(0);
@ -908,6 +912,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
rep_->use_delta_encoding_for_index_values; rep_->use_delta_encoding_for_index_values;
rep_->props.creation_time = rep_->creation_time; rep_->props.creation_time = rep_->creation_time;
rep_->props.oldest_key_time = rep_->oldest_key_time; rep_->props.oldest_key_time = rep_->oldest_key_time;
rep_->props.file_creation_time = rep_->file_creation_time;
// Add basic properties // Add basic properties
property_block_builder.AddTableProperty(rep_->props); property_block_builder.AddTableProperty(rep_->props);

View File

@ -47,7 +47,8 @@ class BlockBasedTableBuilder : public TableBuilder {
const CompressionType compression_type, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters, const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time = 0, const std::string& column_family_name, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0); const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0,
const uint64_t file_creation_time = 0);
// REQUIRES: Either Finish() or Abandon() has been called. // REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder(); ~BlockBasedTableBuilder();

View File

@ -219,7 +219,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
table_builder_options.column_family_name, table_builder_options.column_family_name,
table_builder_options.creation_time, table_builder_options.creation_time,
table_builder_options.oldest_key_time, table_builder_options.oldest_key_time,
table_builder_options.target_file_size); table_builder_options.target_file_size,
table_builder_options.file_creation_time);
return table_builder; return table_builder;
} }

View File

@ -89,6 +89,9 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id); Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id);
Add(TablePropertiesNames::kCreationTime, props.creation_time); Add(TablePropertiesNames::kCreationTime, props.creation_time);
Add(TablePropertiesNames::kOldestKeyTime, props.oldest_key_time); Add(TablePropertiesNames::kOldestKeyTime, props.oldest_key_time);
if (props.file_creation_time > 0) {
Add(TablePropertiesNames::kFileCreationTime, props.file_creation_time);
}
if (!props.filter_policy_name.empty()) { if (!props.filter_policy_name.empty()) {
Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name); Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name);
@ -247,6 +250,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
&new_table_properties->creation_time}, &new_table_properties->creation_time},
{TablePropertiesNames::kOldestKeyTime, {TablePropertiesNames::kOldestKeyTime,
&new_table_properties->oldest_key_time}, &new_table_properties->oldest_key_time},
{TablePropertiesNames::kFileCreationTime,
&new_table_properties->file_creation_time},
}; };
std::string last_key; std::string last_key;

View File

@ -77,7 +77,8 @@ struct TableBuilderOptions {
const CompressionOptions& _compression_opts, bool _skip_filters, const CompressionOptions& _compression_opts, bool _skip_filters,
const std::string& _column_family_name, int _level, const std::string& _column_family_name, int _level,
const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0, const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0,
const uint64_t _target_file_size = 0) const uint64_t _target_file_size = 0,
const uint64_t _file_creation_time = 0)
: ioptions(_ioptions), : ioptions(_ioptions),
moptions(_moptions), moptions(_moptions),
internal_comparator(_internal_comparator), internal_comparator(_internal_comparator),
@ -89,7 +90,8 @@ struct TableBuilderOptions {
level(_level), level(_level),
creation_time(_creation_time), creation_time(_creation_time),
oldest_key_time(_oldest_key_time), oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size) {} target_file_size(_target_file_size),
file_creation_time(_file_creation_time) {}
const ImmutableCFOptions& ioptions; const ImmutableCFOptions& ioptions;
const MutableCFOptions& moptions; const MutableCFOptions& moptions;
const InternalKeyComparator& internal_comparator; const InternalKeyComparator& internal_comparator;
@ -103,6 +105,7 @@ struct TableBuilderOptions {
const uint64_t creation_time; const uint64_t creation_time;
const int64_t oldest_key_time; const int64_t oldest_key_time;
const uint64_t target_file_size; const uint64_t target_file_size;
const uint64_t file_creation_time;
}; };
// TableBuilder provides the interface used to build a Table // TableBuilder provides the interface used to build a Table

View File

@ -158,6 +158,9 @@ std::string TableProperties::ToString(
AppendProperty(result, "time stamp of earliest key", oldest_key_time, AppendProperty(result, "time stamp of earliest key", oldest_key_time,
prop_delim, kv_delim); prop_delim, kv_delim);
AppendProperty(result, "file creation time", file_creation_time, prop_delim,
kv_delim);
return result; return result;
} }
@ -226,6 +229,8 @@ const std::string TablePropertiesNames::kCompression = "rocksdb.compression";
const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time"; const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time";
const std::string TablePropertiesNames::kOldestKeyTime = const std::string TablePropertiesNames::kOldestKeyTime =
"rocksdb.oldest.key.time"; "rocksdb.oldest.key.time";
const std::string TablePropertiesNames::kFileCreationTime =
"rocksdb.file.creation.time";
extern const std::string kPropertiesBlock = "rocksdb.properties"; extern const std::string kPropertiesBlock = "rocksdb.properties";
// Old property block name for backward compatibility // Old property block name for backward compatibility

View File

@ -346,6 +346,7 @@ void RandomInitCFOptions(ColumnFamilyOptions* cf_opt, Random* rnd) {
// uint64_t options // uint64_t options
static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX); static const uint64_t uint_max = static_cast<uint64_t>(UINT_MAX);
cf_opt->ttl = uint_max + rnd->Uniform(10000); cf_opt->ttl = uint_max + rnd->Uniform(10000);
cf_opt->periodic_compaction_seconds = uint_max + rnd->Uniform(10000);
cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000); cf_opt->max_sequential_skip_in_iterations = uint_max + rnd->Uniform(10000);
cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000); cf_opt->target_file_size_base = uint_max + rnd->Uniform(10000);
cf_opt->max_compaction_bytes = cf_opt->max_compaction_bytes =