From e7c24168d85b2ebb43fe939f8d79797fa7308a41 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 9 Aug 2021 12:50:19 -0700 Subject: [PATCH] Move old files to warm tier in FIFO compactions (#8310) Summary: Some FIFO users want to keep the data for longer, but the old data is rarely accessed. This feature allows users to configure FIFO compaction so that data older than a threshold is moved to a warm storage tier. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8310 Test Plan: Add several unit tests. Reviewed By: ajkr Differential Revision: D28493792 fbshipit-source-id: c14824ea634814dee5278b449ab5c98b6e0b5501 --- HISTORY.md | 1 + db/compaction/compaction.cc | 13 +- db/compaction/compaction.h | 6 +- db/compaction/compaction_job.cc | 9 +- db/compaction/compaction_job_test.cc | 4 +- db/compaction/compaction_picker.cc | 8 +- db/compaction/compaction_picker_fifo.cc | 117 ++++++++- db/compaction/compaction_picker_fifo.h | 6 + db/compaction/compaction_picker_level.cc | 1 + db/compaction/compaction_picker_test.cc | 245 +++++++++++++++++- db/compaction/compaction_picker_universal.cc | 3 + db/db_compaction_test.cc | 63 +++++ db/version_set.cc | 8 +- file/read_write_util.cc | 2 + include/rocksdb/advanced_options.h | 4 + include/rocksdb/listener.h | 2 + java/rocksjni/portal.h | 4 + .../java/org/rocksdb/CompactionReason.java | 12 +- options/cf_options.cc | 4 + options/options_settable_test.cc | 2 +- tools/db_bench_tool.cc | 3 + 21 files changed, 497 insertions(+), 20 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 29955d31a..d6a6b5ce2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ * EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file. * Insert warm blocks (data blocks, uncompressed dict blocks, index and filter blocks) in Block cache during flush under option BlockBasedTableOptions.prepopulate_block_cache. Previously it was enabled for only data blocks. * BlockBasedTableOptions.prepopulate_block_cache can be dynamically configured using DB::SetOptions. +* Add CompactionOptionsFIFO.age_for_warm, which allows RocksDB to move old files to warm tier in FIFO compactions. Note that file temperature is still an experimental feature. ### Performance Improvements * Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value. diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index f2de4f0e8..1f5ab9c7d 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -211,9 +211,9 @@ Compaction::Compaction( std::vector _inputs, int _output_level, uint64_t _target_file_size, uint64_t _max_compaction_bytes, uint32_t _output_path_id, CompressionType _compression, - CompressionOptions _compression_opts, uint32_t _max_subcompactions, - std::vector _grandparents, bool _manual_compaction, - double _score, bool _deletion_compaction, + CompressionOptions _compression_opts, Temperature _output_temperature, + uint32_t _max_subcompactions, std::vector _grandparents, + bool _manual_compaction, double _score, bool _deletion_compaction, CompactionReason _compaction_reason) : input_vstorage_(vstorage), start_level_(_inputs[0].level), @@ -229,6 +229,7 @@ Compaction::Compaction( output_path_id_(_output_path_id), output_compression_(_compression), output_compression_opts_(_compression_opts), + output_temperature_(_output_temperature), deletion_compaction_(_deletion_compaction), inputs_(PopulateWithAtomicBoundaries(vstorage, std::move(_inputs))), grandparents_(std::move(_grandparents)), @@ -308,6 +309,12 @@ bool Compaction::IsTrivialMove() const { return false; } + if (start_level_ == output_level_) { + // It doesn't make sense if compaction picker picks files just to trivial + // move to the same level. + return false; + } + // Used in universal compaction, where trivial move can be done if the // input files are non overlapping if ((mutable_cf_options_.compaction_options_universal.allow_trivial_move) && diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 7854c1c7a..6791ee5a2 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -76,7 +76,8 @@ class Compaction { std::vector inputs, int output_level, uint64_t target_file_size, uint64_t max_compaction_bytes, uint32_t output_path_id, CompressionType compression, - CompressionOptions compression_opts, uint32_t max_subcompactions, + CompressionOptions compression_opts, + Temperature output_temperature, uint32_t max_subcompactions, std::vector grandparents, bool manual_compaction = false, double score = -1, bool deletion_compaction = false, @@ -299,6 +300,8 @@ class Compaction { uint64_t max_compaction_bytes() const { return max_compaction_bytes_; } + Temperature output_temperature() const { return output_temperature_; } + uint32_t max_subcompactions() const { return max_subcompactions_; } uint64_t MinInputFileOldestAncesterTime() const; @@ -356,6 +359,7 @@ class Compaction { const uint32_t output_path_id_; CompressionType output_compression_; CompressionOptions output_compression_opts_; + Temperature output_temperature_; // If true, then the compaction can be done by simply deleting input files. const bool deletion_compaction_; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 071ec6cf8..e6ff030fb 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -107,6 +107,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) { return "ExternalSstIngestion"; case CompactionReason::kPeriodicCompaction: return "PeriodicCompaction"; + case CompactionReason::kChangeTemperature: + return "ChangeTemperature"; case CompactionReason::kNumOfReasons: // fall through default: @@ -1927,11 +1929,12 @@ Status CompactionJob::OpenCompactionOutputFile( // Pass temperature of botommost files to FileSystem. FileOptions fo_copy = file_options_; - Temperature temperature = Temperature::kUnknown; - if (bottommost_level_) { - fo_copy.temperature = temperature = + Temperature temperature = sub_compact->compaction->output_temperature(); + if (temperature == Temperature::kUnknown && bottommost_level_) { + temperature = sub_compact->compaction->mutable_cf_options()->bottommost_temperature; } + fo_copy.temperature = temperature; Status s; IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy); diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 7437f1249..b4e965387 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -334,8 +334,8 @@ class CompactionJobTestBase : public testing::Test { cfd->current()->storage_info(), *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), mutable_db_options_, compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0, - kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, 0, - {}, true); + kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, + Temperature::kUnknown, 0, {}, true); compaction.SetInputVersion(cfd->current()); LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index 6d1092134..fe2fea1ef 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -358,7 +358,7 @@ Compaction* CompactionPicker::CompactFiles( output_level, compact_options.output_file_size_limit, mutable_cf_options.max_compaction_bytes, output_path_id, compression_type, GetCompressionOptions(mutable_cf_options, vstorage, output_level), - compact_options.max_subcompactions, + Temperature::kUnknown, compact_options.max_subcompactions, /* grandparents */ {}, true); RegisterCompaction(c); return c; @@ -634,7 +634,8 @@ Compaction* CompactionPicker::CompactRange( GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, 1), GetCompressionOptions(mutable_cf_options, vstorage, output_level), - compact_range_options.max_subcompactions, /* grandparents */ {}, + Temperature::kUnknown, compact_range_options.max_subcompactions, + /* grandparents */ {}, /* is manual */ true); RegisterCompaction(c); vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); @@ -812,7 +813,8 @@ Compaction* CompactionPicker::CompactRange( GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, vstorage->base_level()), GetCompressionOptions(mutable_cf_options, vstorage, output_level), - compact_range_options.max_subcompactions, std::move(grandparents), + Temperature::kUnknown, compact_range_options.max_subcompactions, + std::move(grandparents), /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); diff --git a/db/compaction/compaction_picker_fifo.cc b/db/compaction/compaction_picker_fifo.cc index 4b4c09b80..01932f483 100644 --- a/db/compaction/compaction_picker_fifo.cc +++ b/db/compaction/compaction_picker_fifo.cc @@ -111,7 +111,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, mutable_db_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, - mutable_cf_options.compression_opts, + mutable_cf_options.compression_opts, Temperature::kUnknown, /* max_subcompactions */ 0, {}, /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ true, CompactionReason::kFIFOTtl); @@ -154,7 +154,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( {comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */, 0 /* max compaction bytes, not applicable */, 0 /* output path ID */, mutable_cf_options.compression, - mutable_cf_options.compression_opts, 0 /* max_subcompactions */, {}, + mutable_cf_options.compression_opts, Temperature::kUnknown, + 0 /* max_subcompactions */, {}, /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ false, CompactionReason::kFIFOReduceNumFiles); @@ -203,13 +204,119 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( Compaction* c = new Compaction( vstorage, ioptions_, mutable_cf_options, mutable_db_options, std::move(inputs), 0, 0, 0, 0, kNoCompression, - mutable_cf_options.compression_opts, + mutable_cf_options.compression_opts, Temperature::kUnknown, /* max_subcompactions */ 0, {}, /* is manual */ false, vstorage->CompactionScore(0), /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); return c; } +Compaction* FIFOCompactionPicker::PickCompactionToWarm( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, + LogBuffer* log_buffer) { + if (mutable_cf_options.compaction_options_fifo.age_for_warm == 0) { + return nullptr; + } + + const int kLevel0 = 0; + const std::vector& level_files = vstorage->LevelFiles(kLevel0); + + int64_t _current_time; + auto status = ioptions_.clock->GetCurrentTime(&_current_time); + if (!status.ok()) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: Couldn't get current time: %s. " + "Not doing compactions based on warm threshold. ", + cf_name.c_str(), status.ToString().c_str()); + return nullptr; + } + const uint64_t current_time = static_cast(_current_time); + + if (!level0_compactions_in_progress_.empty()) { + ROCKS_LOG_BUFFER( + log_buffer, + "[%s] FIFO compaction: Already executing compaction. Parallel " + "compactions are not supported", + cf_name.c_str()); + return nullptr; + } + + std::vector inputs; + inputs.emplace_back(); + inputs[0].level = 0; + + // avoid underflow + if (current_time > mutable_cf_options.compaction_options_fifo.age_for_warm) { + uint64_t create_time_threshold = + current_time - mutable_cf_options.compaction_options_fifo.age_for_warm; + uint64_t compaction_size = 0; + // We will ideally identify a file qualifying for warm tier by knowing + // the timestamp for the youngest entry in the file. However, right now + // we don't have the information. We infer it by looking at timestamp + // of the next file's (which is just younger) oldest entry's timestamp. + FileMetaData* prev_file = nullptr; + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + FileMetaData* f = *ritr; + assert(f); + if (f->being_compacted) { + // Right now this probably won't happen as we never try to schedule + // two compactions in parallel, so here we just simply don't schedule + // anything. + return nullptr; + } + uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime(); + if (oldest_ancester_time == kUnknownOldestAncesterTime) { + // Older files might not have enough information. It is possible to + // handle these files by looking at newer files, but maintaining the + // logic isn't worth it. + break; + } + if (oldest_ancester_time > create_time_threshold) { + // The previous file (which has slightly older data) doesn't qualify + // for warm tier. + break; + } + if (prev_file != nullptr) { + compaction_size += prev_file->fd.GetFileSize(); + if (compaction_size > mutable_cf_options.max_compaction_bytes) { + break; + } + inputs[0].files.push_back(prev_file); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] FIFO compaction: picking file %" PRIu64 + " with next file's oldest time %" PRIu64 " for warm", + cf_name.c_str(), prev_file->fd.GetNumber(), + oldest_ancester_time); + } + if (f->temperature == Temperature::kUnknown || + f->temperature == Temperature::kHot) { + prev_file = f; + } else if (!inputs[0].files.empty()) { + // A warm file newer than files picked. + break; + } else { + assert(prev_file == nullptr); + } + } + } + + if (inputs[0].files.empty()) { + return nullptr; + } + + Compaction* c = new Compaction( + vstorage, ioptions_, mutable_cf_options, mutable_db_options, + std::move(inputs), 0, 0 /* output file size limit */, + 0 /* max compaction bytes, not applicable */, 0 /* output path ID */, + mutable_cf_options.compression, mutable_cf_options.compression_opts, + Temperature::kWarm, + /* max_subcompactions */ 0, {}, /* is manual */ false, + vstorage->CompactionScore(0), + /* is deletion compaction */ false, CompactionReason::kChangeTemperature); + return c; +} + Compaction* FIFOCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, @@ -225,6 +332,10 @@ Compaction* FIFOCompactionPicker::PickCompaction( c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options, vstorage, log_buffer); } + if (c == nullptr) { + c = PickCompactionToWarm(cf_name, mutable_cf_options, mutable_db_options, + vstorage, log_buffer); + } RegisterCompaction(c); return c; } diff --git a/db/compaction/compaction_picker_fifo.h b/db/compaction/compaction_picker_fifo.h index 2a07f8df7..b0d58aa9d 100644 --- a/db/compaction/compaction_picker_fifo.h +++ b/db/compaction/compaction_picker_fifo.h @@ -52,6 +52,12 @@ class FIFOCompactionPicker : public CompactionPicker { const MutableDBOptions& mutable_db_options, VersionStorageInfo* version, LogBuffer* log_buffer); + + Compaction* PickCompactionToWarm(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + const MutableDBOptions& mutable_db_options, + VersionStorageInfo* version, + LogBuffer* log_buffer); }; } // namespace ROCKSDB_NAMESPACE #endif // !ROCKSDB_LITE diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index 08c48c8f0..28c565b1b 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -336,6 +336,7 @@ Compaction* LevelCompactionBuilder::GetCompaction() { GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, output_level_, vstorage_->base_level()), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level_), + Temperature::kUnknown, /* max_subcompactions */ 0, std::move(grandparents_), is_manual_, start_level_score_, false /* deletion_compaction */, compaction_reason_); diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 5d543048f..bce3a2076 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -98,7 +98,9 @@ class CompactionPickerTest : public testing::Test { void Add(int level, uint32_t file_number, const char* smallest, const char* largest, uint64_t file_size = 1, uint32_t path_id = 0, SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100, - size_t compensated_file_size = 0, bool marked_for_compact = false) { + size_t compensated_file_size = 0, bool marked_for_compact = false, + Temperature temperature = Temperature::kUnknown, + uint64_t oldest_ancestor_time = kUnknownOldestAncesterTime) { VersionStorageInfo* vstorage; if (temp_vstorage_) { vstorage = temp_vstorage_.get(); @@ -115,6 +117,8 @@ class CompactionPickerTest : public testing::Test { kUnknownFileChecksum, kUnknownFileChecksumFuncName); f->compensated_file_size = (compensated_file_size != 0) ? compensated_file_size : file_size; + f->temperature = temperature; + f->oldest_ancester_time = oldest_ancestor_time; vstorage->AddFile(level, f); files_.emplace_back(f); file_map_.insert({file_number, {f, level}}); @@ -757,6 +761,245 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { vstorage_->CompactionScore(0) >= 1); } } + +TEST_F(CompactionPickerTest, FIFOToWarm1) { + NewVersionStorage(1, kCompactionStyleFIFO); + const uint64_t kFileSize = 100000; + const uint64_t kMaxSize = kFileSize * 100000; + uint64_t kWarmThreshold = 2000; + + fifo_options_.max_table_files_size = kMaxSize; + fifo_options_.age_for_warm = kWarmThreshold; + mutable_cf_options_.compaction_options_fifo = fifo_options_; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_compaction_bytes = kFileSize * 100; + FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); + + int64_t current_time = 0; + ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time)); + uint64_t threshold_time = + static_cast(current_time) - kWarmThreshold; + Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true, + Temperature::kUnknown, static_cast(current_time) - 100); + Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true, + Temperature::kUnknown, threshold_time + 100); + Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true, + Temperature::kUnknown, threshold_time - 2000); + Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true, + Temperature::kUnknown, threshold_time - 3000); + UpdateVersionStorageInfo(); + + ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); + std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(3U, compaction->input(0, 0)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, FIFOToWarm2) { + NewVersionStorage(1, kCompactionStyleFIFO); + const uint64_t kFileSize = 100000; + const uint64_t kMaxSize = kFileSize * 100000; + uint64_t kWarmThreshold = 2000; + + fifo_options_.max_table_files_size = kMaxSize; + fifo_options_.age_for_warm = kWarmThreshold; + mutable_cf_options_.compaction_options_fifo = fifo_options_; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_compaction_bytes = kFileSize * 100; + FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); + + int64_t current_time = 0; + ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time)); + uint64_t threshold_time = + static_cast(current_time) - kWarmThreshold; + Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true, + Temperature::kUnknown, static_cast(current_time) - 100); + Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true, + Temperature::kUnknown, threshold_time + 100); + Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true, + Temperature::kUnknown, threshold_time - 2000); + Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true, + Temperature::kUnknown, threshold_time - 3000); + Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true, + Temperature::kUnknown, threshold_time - 4000); + UpdateVersionStorageInfo(); + + ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); + std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, FIFOToWarmMaxSize) { + NewVersionStorage(1, kCompactionStyleFIFO); + const uint64_t kFileSize = 100000; + const uint64_t kMaxSize = kFileSize * 100000; + uint64_t kWarmThreshold = 2000; + + fifo_options_.max_table_files_size = kMaxSize; + fifo_options_.age_for_warm = kWarmThreshold; + mutable_cf_options_.compaction_options_fifo = fifo_options_; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_compaction_bytes = kFileSize * 9; + FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); + + int64_t current_time = 0; + ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time)); + uint64_t threshold_time = + static_cast(current_time) - kWarmThreshold; + Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true, + Temperature::kUnknown, static_cast(current_time) - 100); + Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true, + Temperature::kUnknown, threshold_time + 100); + Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true, + Temperature::kUnknown, threshold_time - 2000); + Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true, + Temperature::kUnknown, threshold_time - 3000); + Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true, + Temperature::kUnknown, threshold_time - 4000); + Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true, + Temperature::kUnknown, threshold_time - 5000); + UpdateVersionStorageInfo(); + + ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); + std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, FIFOToWarmWithExistingWarm) { + NewVersionStorage(1, kCompactionStyleFIFO); + const uint64_t kFileSize = 100000; + const uint64_t kMaxSize = kFileSize * 100000; + uint64_t kWarmThreshold = 2000; + + fifo_options_.max_table_files_size = kMaxSize; + fifo_options_.age_for_warm = kWarmThreshold; + mutable_cf_options_.compaction_options_fifo = fifo_options_; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_compaction_bytes = kFileSize * 100; + FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); + + int64_t current_time = 0; + ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time)); + uint64_t threshold_time = + static_cast(current_time) - kWarmThreshold; + Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true, + Temperature::kUnknown, static_cast(current_time) - 100); + Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true, + Temperature::kUnknown, threshold_time + 100); + Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true, + Temperature::kUnknown, threshold_time - 2000); + Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true, + Temperature::kUnknown, threshold_time - 3000); + Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true, + Temperature::kUnknown, threshold_time - 4000); + Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true, + Temperature::kWarm, threshold_time - 5000); + UpdateVersionStorageInfo(); + + ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); + std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, FIFOToWarmWithOngoing) { + NewVersionStorage(1, kCompactionStyleFIFO); + const uint64_t kFileSize = 100000; + const uint64_t kMaxSize = kFileSize * 100000; + uint64_t kWarmThreshold = 2000; + + fifo_options_.max_table_files_size = kMaxSize; + fifo_options_.age_for_warm = kWarmThreshold; + mutable_cf_options_.compaction_options_fifo = fifo_options_; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_compaction_bytes = kFileSize * 100; + FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); + + int64_t current_time = 0; + ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time)); + uint64_t threshold_time = + static_cast(current_time) - kWarmThreshold; + Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true, + Temperature::kUnknown, static_cast(current_time) - 100); + Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true, + Temperature::kUnknown, threshold_time + 100); + Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true, + Temperature::kUnknown, threshold_time - 2000); + Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true, + Temperature::kUnknown, threshold_time - 3000); + Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true, + Temperature::kUnknown, threshold_time - 4000); + Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true, + Temperature::kWarm, threshold_time - 5000); + file_map_[2].first->being_compacted = true; + UpdateVersionStorageInfo(); + + ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); + std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + // Stop if a file is being compacted + ASSERT_TRUE(compaction.get() == nullptr); +} + +TEST_F(CompactionPickerTest, FIFOToWarmWithHotBetweenWarms) { + NewVersionStorage(1, kCompactionStyleFIFO); + const uint64_t kFileSize = 100000; + const uint64_t kMaxSize = kFileSize * 100000; + uint64_t kWarmThreshold = 2000; + + fifo_options_.max_table_files_size = kMaxSize; + fifo_options_.age_for_warm = kWarmThreshold; + mutable_cf_options_.compaction_options_fifo = fifo_options_; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_compaction_bytes = kFileSize * 100; + FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); + + int64_t current_time = 0; + ASSERT_OK(Env::Default()->GetCurrentTime(¤t_time)); + uint64_t threshold_time = + static_cast(current_time) - kWarmThreshold; + Add(0, 6U, "240", "290", 2 * kFileSize, 0, 2900, 3000, 0, true, + Temperature::kUnknown, static_cast(current_time) - 100); + Add(0, 5U, "240", "290", 2 * kFileSize, 0, 2700, 2800, 0, true, + Temperature::kUnknown, threshold_time + 100); + Add(0, 4U, "260", "300", 1 * kFileSize, 0, 2500, 2600, 0, true, + Temperature::kUnknown, threshold_time - 2000); + Add(0, 3U, "200", "300", 4 * kFileSize, 0, 2300, 2400, 0, true, + Temperature::kWarm, threshold_time - 3000); + Add(0, 2U, "200", "300", 4 * kFileSize, 0, 2100, 2200, 0, true, + Temperature::kUnknown, threshold_time - 4000); + Add(0, 1U, "200", "300", 4 * kFileSize, 0, 2000, 2100, 0, true, + Temperature::kWarm, threshold_time - 5000); + UpdateVersionStorageInfo(); + + ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), true); + std::unique_ptr compaction(fifo_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + // Stop if a file is being compacted + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); +} + #endif // ROCKSDB_LITE TEST_F(CompactionPickerTest, CompactionPriMinOverlapping1) { diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index b6f38f828..211a4f468 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -728,6 +728,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( 1, enable_compression), GetCompressionOptions(mutable_cf_options_, vstorage_, start_level, enable_compression), + Temperature::kUnknown, /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, score_, false /* deletion_compaction */, compaction_reason); } @@ -955,6 +956,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, output_level, 1), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level), + Temperature::kUnknown, /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, score_, false /* deletion_compaction */, CompactionReason::kFilesMarkedForCompaction); @@ -1029,6 +1031,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest( output_level, 1, true /* enable_compression */), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, true /* enable_compression */), + Temperature::kUnknown, /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, score_, false /* deletion_compaction */, compaction_reason); } diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 20b0a36b4..53dadde71 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6749,6 +6749,69 @@ TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest2) { Destroy(options); } +TEST_F(DBCompactionTest, FIFOWarm) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleFIFO; + options.num_levels = 1; + options.max_open_files = -1; + options.level0_file_num_compaction_trigger = 2; + options.create_if_missing = true; + CompactionOptionsFIFO fifo_options; + fifo_options.age_for_warm = 1000; + fifo_options.max_table_files_size = 100000000; + options.compaction_options_fifo = fifo_options; + env_->SetMockSleep(); + Reopen(options); + + int total_warm = 0; + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile::FileOptions.temperature", [&](void* arg) { + Temperature temperature = *(static_cast(arg)); + if (temperature == Temperature::kWarm) { + total_warm++; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // The file system does not support checksum handoff. The check + // will be ignored. + ASSERT_OK(Put(Key(0), "value1")); + env_->MockSleepForSeconds(800); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(0), "value1")); + env_->MockSleepForSeconds(800); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + + ASSERT_OK(Put(Key(0), "value1")); + env_->MockSleepForSeconds(800); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_OK(Put(Key(0), "value1")); + env_->MockSleepForSeconds(800); + ASSERT_OK(Put(Key(2), "value2")); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + ColumnFamilyMetaData metadata; + db_->GetColumnFamilyMetaData(&metadata); + ASSERT_EQ(4, metadata.file_count); + ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature); + ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[1].temperature); + ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[2].temperature); + ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[3].temperature); + ASSERT_EQ(2, total_warm); + + Destroy(options); +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index a2bfdac48..d8802825f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2615,7 +2615,12 @@ void VersionStorageInfo::ComputeCompactionScore( if (compaction_style_ == kCompactionStyleFIFO) { score = static_cast(total_size) / mutable_cf_options.compaction_options_fifo.max_table_files_size; - if (mutable_cf_options.compaction_options_fifo.allow_compaction) { + if (mutable_cf_options.compaction_options_fifo.allow_compaction || + mutable_cf_options.compaction_options_fifo.age_for_warm > 0) { + // Warm tier move can happen at any time. It's too expensive to + // check very file's timestamp now. For now, just trigger it + // slightly more frequently than FIFO compaction so that this + // happens first. score = std::max( static_cast(num_sorted_runs) / mutable_cf_options.level0_file_num_compaction_trigger, @@ -2627,7 +2632,6 @@ void VersionStorageInfo::ComputeCompactionScore( immutable_options, mutable_cf_options, files_[level])), score); } - } else { score = static_cast(num_sorted_runs) / mutable_cf_options.level0_file_num_compaction_trigger; diff --git a/file/read_write_util.cc b/file/read_write_util.cc index 9df6c5a39..cc4f6b849 100644 --- a/file/read_write_util.cc +++ b/file/read_write_util.cc @@ -17,6 +17,8 @@ namespace ROCKSDB_NAMESPACE { IOStatus NewWritableFile(FileSystem* fs, const std::string& fname, std::unique_ptr* result, const FileOptions& options) { + TEST_SYNC_POINT_CALLBACK("NewWritableFile::FileOptions.temperature", + const_cast(&options.temperature)); IOStatus s = fs->NewWritableFile(fname, options, result, nullptr); TEST_KILL_RANDOM_WITH_WEIGHT("NewWritableFile:0", REDUCE_ODDS2); return s; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 1c4f09b76..680c46317 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -70,6 +70,10 @@ struct CompactionOptionsFIFO { // Default: false; bool allow_compaction = false; + // When not 0, if the data in the file is older than this threshold, RocksDB + // will soon move the file to warm temperature. + uint64_t age_for_warm = 0; + CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} CompactionOptionsFIFO(uint64_t _max_table_files_size, bool _allow_compaction) : max_table_files_size(_max_table_files_size), diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index f59e395de..5216e1e24 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -93,6 +93,8 @@ enum class CompactionReason : int { kExternalSstIngestion, // Compaction due to SST file being too old kPeriodicCompaction, + // Compaction in order to move files to temperature + kChangeTemperature, // total number of compaction reasons, new reasons must be added above this. kNumOfReasons, }; diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index ce2c96a92..c5f34a8a2 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -6989,6 +6989,10 @@ class CompactionReasonJni { return ROCKSDB_NAMESPACE::CompactionReason::kFlush; case 0x0D: return ROCKSDB_NAMESPACE::CompactionReason::kExternalSstIngestion; + case 0x0E: + return ROCKSDB_NAMESPACE::CompactionReason::kPeriodicCompaction; + case 0x0F: + return ROCKSDB_NAMESPACE::CompactionReason::kChangeTemperature; default: // undefined/default return ROCKSDB_NAMESPACE::CompactionReason::kUnknown; diff --git a/java/src/main/java/org/rocksdb/CompactionReason.java b/java/src/main/java/org/rocksdb/CompactionReason.java index f18c48122..24e234450 100644 --- a/java/src/main/java/org/rocksdb/CompactionReason.java +++ b/java/src/main/java/org/rocksdb/CompactionReason.java @@ -78,7 +78,17 @@ public enum CompactionReason { /** * Compaction caused by external sst file ingestion */ - kExternalSstIngestion((byte)0x0D); + kExternalSstIngestion((byte) 0x0D), + + /** + * Compaction due to SST file being too old + */ + kPeriodicCompaction((byte) 0x0E), + + /** + * Compaction in order to move files to temperature + */ + kChangeTemperature((byte) 0x0F); private final byte value; diff --git a/options/cf_options.cc b/options/cf_options.cc index 830f820ef..465413170 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -172,6 +172,10 @@ static std::unordered_map {offsetof(struct CompactionOptionsFIFO, max_table_files_size), OptionType::kUInt64T, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"age_for_warm", + {offsetof(struct CompactionOptionsFIFO, age_for_warm), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"ttl", {0, OptionType::kUInt64T, OptionVerificationType::kDeprecated, OptionTypeFlags::kNone}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index c5fc0b235..5c79a0248 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -516,7 +516,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "enable_blob_garbage_collection=true;" "blob_garbage_collection_age_cutoff=0.5;" "compaction_options_fifo={max_table_files_size=3;allow_" - "compaction=false;};", + "compaction=false;age_for_warm=1;};", new_options)); ASSERT_EQ(unset_bytes_base, diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 1b2ad6988..bb69878a1 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -889,6 +889,8 @@ DEFINE_bool(fifo_compaction_allow_compaction, true, DEFINE_uint64(fifo_compaction_ttl, 0, "TTL for the SST Files in seconds."); +DEFINE_uint64(fifo_age_for_warm, 0, "age_for_warm for FIFO compaction."); + // Stacked BlobDB Options DEFINE_bool(use_blob_db, false, "[Stacked BlobDB] Open a BlobDB instance."); @@ -3964,6 +3966,7 @@ class Benchmark { options.compaction_options_fifo = CompactionOptionsFIFO( FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024, FLAGS_fifo_compaction_allow_compaction); + options.compaction_options_fifo.age_for_warm = FLAGS_fifo_age_for_warm; #endif // ROCKSDB_LITE if (FLAGS_prefix_size != 0) { options.prefix_extractor.reset(