Try to start TTL earlier with kMinOverlappingRatio is used (#8749)
Summary: Right now, when options.ttl is set, compactions are triggered around the time when TTL is reached. This might cause extra compactions which are often bursty. This commit tries to mitigate it by picking those files earlier in normal compaction picking process. This is only implemented using kMinOverlappingRatio with Leveled compaction as it is the default value and it is more complicated to change other styles. When a file is aged more than ttl/2, RocksDB starts to boost the compaction priority of files in normal compaction picking process, and hope by the time TTL is reached, very few extra compaction is needed. In order for this to work, another change is made: during a compaction, if an output level file is older than ttl/2, cut output files based on original boundary (if it is not in the last level). This is to make sure that after an old file is moved to the next level, and new data is merged from the upper level, the new data falling into this range isn't reset with old timestamp. Without this change, in many cases, most files from one level will keep having old timestamp, even if they have newer data and we stuck in it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8749 Test Plan: Add a unit test to test the boosting logic. Will add a unit test to test it end-to-end. Reviewed By: jay-zhuang Differential Revision: D30735261 fbshipit-source-id: 503c2d89250b22911eb99e72b379be154de3428e
This commit is contained in:
parent
a5ec5e3ea0
commit
a2b9be42b6
@ -11,6 +11,9 @@
|
||||
### Behavior Changes
|
||||
* `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files.
|
||||
|
||||
### Public Interface Change
|
||||
* When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly.
|
||||
|
||||
## 6.26.0 (2021-10-20)
|
||||
### Bug Fixes
|
||||
* Fixes a bug in directed IO mode when calling MultiGet() for blobs in the same blob file. The bug is caused by not sorting the blob read requests by file offsets.
|
||||
|
@ -612,10 +612,19 @@ bool Compaction::DoesInputReferenceBlobFiles() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
uint64_t Compaction::MinInputFileOldestAncesterTime() const {
|
||||
uint64_t Compaction::MinInputFileOldestAncesterTime(
|
||||
const InternalKey* start, const InternalKey* end) const {
|
||||
uint64_t min_oldest_ancester_time = port::kMaxUint64;
|
||||
const InternalKeyComparator& icmp =
|
||||
column_family_data()->internal_comparator();
|
||||
for (const auto& level_files : inputs_) {
|
||||
for (const auto& file : level_files.files) {
|
||||
if (start != nullptr && icmp.Compare(file->largest, *start) < 0) {
|
||||
continue;
|
||||
}
|
||||
if (end != nullptr && icmp.Compare(file->smallest, *end) > 0) {
|
||||
continue;
|
||||
}
|
||||
uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
|
||||
if (oldest_ancester_time != 0) {
|
||||
min_oldest_ancester_time =
|
||||
|
@ -304,7 +304,10 @@ class Compaction {
|
||||
|
||||
uint32_t max_subcompactions() const { return max_subcompactions_; }
|
||||
|
||||
uint64_t MinInputFileOldestAncesterTime() const;
|
||||
// start and end are sub compact range. Null if no boundary.
|
||||
// This is used to filter out some input files' ancester's time range.
|
||||
uint64_t MinInputFileOldestAncesterTime(const InternalKey* start,
|
||||
const InternalKey* end) const;
|
||||
|
||||
// Called by DBImpl::NotifyOnCompactionCompleted to make sure number of
|
||||
// compaction begin and compaction completion callbacks match.
|
||||
|
@ -170,6 +170,13 @@ struct CompactionJob::SubcompactionState {
|
||||
}
|
||||
}
|
||||
|
||||
// Some identified files with old oldest ancester time and the range should be
|
||||
// isolated out so that the output file(s) in that range can be merged down
|
||||
// for TTL and clear the timestamps for the range.
|
||||
std::vector<FileMetaData*> files_to_cut_for_ttl;
|
||||
int cur_files_to_cut_for_ttl = -1;
|
||||
int next_files_to_cut_for_ttl = 0;
|
||||
|
||||
uint64_t current_output_file_size = 0;
|
||||
|
||||
// State during the subcompaction
|
||||
@ -212,6 +219,8 @@ struct CompactionJob::SubcompactionState {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void FillFilesToCutForTtl();
|
||||
|
||||
// Returns true iff we should stop building the current output
|
||||
// before processing "internal_key".
|
||||
bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
|
||||
@ -244,6 +253,40 @@ struct CompactionJob::SubcompactionState {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!files_to_cut_for_ttl.empty()) {
|
||||
if (cur_files_to_cut_for_ttl != -1) {
|
||||
// Previous key is inside the range of a file
|
||||
if (icmp->Compare(internal_key,
|
||||
files_to_cut_for_ttl[cur_files_to_cut_for_ttl]
|
||||
->largest.Encode()) > 0) {
|
||||
next_files_to_cut_for_ttl = cur_files_to_cut_for_ttl + 1;
|
||||
cur_files_to_cut_for_ttl = -1;
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
// Look for the key position
|
||||
while (next_files_to_cut_for_ttl <
|
||||
static_cast<int>(files_to_cut_for_ttl.size())) {
|
||||
if (icmp->Compare(internal_key,
|
||||
files_to_cut_for_ttl[next_files_to_cut_for_ttl]
|
||||
->smallest.Encode()) >= 0) {
|
||||
if (icmp->Compare(internal_key,
|
||||
files_to_cut_for_ttl[next_files_to_cut_for_ttl]
|
||||
->largest.Encode()) <= 0) {
|
||||
// With in the current file
|
||||
cur_files_to_cut_for_ttl = next_files_to_cut_for_ttl;
|
||||
return true;
|
||||
}
|
||||
// Beyond the current file
|
||||
next_files_to_cut_for_ttl++;
|
||||
} else {
|
||||
// Still fall into the gap
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -256,6 +299,46 @@ struct CompactionJob::SubcompactionState {
|
||||
}
|
||||
};
|
||||
|
||||
void CompactionJob::SubcompactionState::FillFilesToCutForTtl() {
|
||||
if (compaction->immutable_options()->compaction_style !=
|
||||
CompactionStyle::kCompactionStyleLevel ||
|
||||
compaction->immutable_options()->compaction_pri !=
|
||||
CompactionPri::kMinOverlappingRatio ||
|
||||
compaction->mutable_cf_options()->ttl == 0 ||
|
||||
compaction->num_input_levels() < 2 || compaction->bottommost_level()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// We define new file with oldest ancestor time to be younger than 1/4 TTL,
|
||||
// and an old one to be older than 1/2 TTL time.
|
||||
int64_t temp_current_time;
|
||||
auto get_time_status = compaction->immutable_options()->clock->GetCurrentTime(
|
||||
&temp_current_time);
|
||||
if (!get_time_status.ok()) {
|
||||
return;
|
||||
}
|
||||
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||
if (current_time < compaction->mutable_cf_options()->ttl) {
|
||||
return;
|
||||
}
|
||||
uint64_t old_age_thres =
|
||||
current_time - compaction->mutable_cf_options()->ttl / 2;
|
||||
|
||||
const std::vector<FileMetaData*>& olevel =
|
||||
*(compaction->inputs(compaction->num_input_levels() - 1));
|
||||
for (FileMetaData* file : olevel) {
|
||||
// Worth filtering out by start and end?
|
||||
uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
|
||||
// We put old files if they are not too small to prevent a flood
|
||||
// of small files.
|
||||
if (oldest_ancester_time < old_age_thres &&
|
||||
file->fd.GetFileSize() >
|
||||
compaction->mutable_cf_options()->target_file_size_base / 2) {
|
||||
files_to_cut_for_ttl.push_back(file);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Maintains state for the entire compaction
|
||||
struct CompactionJob::CompactionState {
|
||||
Compaction* const compaction;
|
||||
@ -1291,6 +1374,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
|
||||
auto c_iter = sub_compact->c_iter.get();
|
||||
c_iter->SeekToFirst();
|
||||
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
|
||||
sub_compact->FillFilesToCutForTtl();
|
||||
// ShouldStopBefore() maintains state based on keys processed so far. The
|
||||
// compaction loop always calls it on the "next" key, thus won't tell it the
|
||||
// first key. So we do that here.
|
||||
@ -1739,7 +1823,6 @@ Status CompactionJob::FinishCompactionOutputFile(
|
||||
meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
|
||||
tombstone.seq_,
|
||||
cfd->internal_comparator());
|
||||
|
||||
// The smallest key in a file is used for range tombstone truncation, so
|
||||
// it cannot have a seqnum of 0 (unless the smallest data key in a file
|
||||
// has a seqnum of 0). Otherwise, the truncated tombstone may expose
|
||||
@ -1763,6 +1846,23 @@ Status CompactionJob::FinishCompactionOutputFile(
|
||||
if (s.ok()) {
|
||||
meta->fd.file_size = current_bytes;
|
||||
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
|
||||
// With accurate smallest and largest key, we can get a slightly more
|
||||
// accurate oldest ancester time.
|
||||
// This makes oldest ancester time in manifest more accurate than in
|
||||
// table properties. Not sure how to resolve it.
|
||||
if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
|
||||
uint64_t refined_oldest_ancester_time;
|
||||
Slice new_smallest = meta->smallest.user_key();
|
||||
Slice new_largest = meta->largest.user_key();
|
||||
if (!new_largest.empty() && !new_smallest.empty()) {
|
||||
refined_oldest_ancester_time =
|
||||
sub_compact->compaction->MinInputFileOldestAncesterTime(
|
||||
&(meta->smallest), &(meta->largest));
|
||||
if (refined_oldest_ancester_time != port::kMaxUint64) {
|
||||
meta->oldest_ancester_time = refined_oldest_ancester_time;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sub_compact->current_output()->finished = true;
|
||||
sub_compact->total_bytes += current_bytes;
|
||||
@ -2033,8 +2133,17 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||
get_time_status.ToString().c_str());
|
||||
}
|
||||
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
|
||||
InternalKey tmp_start, tmp_end;
|
||||
if (sub_compact->start != nullptr) {
|
||||
tmp_start.SetMinPossibleForUserKey(*(sub_compact->start));
|
||||
}
|
||||
if (sub_compact->end != nullptr) {
|
||||
tmp_end.SetMinPossibleForUserKey(*(sub_compact->end));
|
||||
}
|
||||
uint64_t oldest_ancester_time =
|
||||
sub_compact->compaction->MinInputFileOldestAncesterTime();
|
||||
sub_compact->compaction->MinInputFileOldestAncesterTime(
|
||||
(sub_compact->start != nullptr) ? &tmp_start : nullptr,
|
||||
(sub_compact->end != nullptr) ? &tmp_end : nullptr);
|
||||
if (oldest_ancester_time == port::kMaxUint64) {
|
||||
oldest_ancester_time = current_time;
|
||||
}
|
||||
|
@ -3,15 +3,15 @@
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "db/compaction/compaction.h"
|
||||
#include "db/compaction/compaction_picker_fifo.h"
|
||||
#include "db/compaction/compaction_picker_level.h"
|
||||
#include "db/compaction/compaction_picker_universal.h"
|
||||
|
||||
#include "db/compaction/file_pri.h"
|
||||
#include "test_util/testharness.h"
|
||||
#include "test_util/testutil.h"
|
||||
#include "util/string_util.h"
|
||||
@ -149,7 +149,7 @@ class CompactionPickerTest : public testing::Test {
|
||||
vstorage_ = std::move(temp_vstorage_);
|
||||
}
|
||||
vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
||||
vstorage_->UpdateFilesByCompactionPri(ioptions_.compaction_pri);
|
||||
vstorage_->UpdateFilesByCompactionPri(ioptions_, mutable_cf_options_);
|
||||
vstorage_->UpdateNumNonEmptyLevels();
|
||||
vstorage_->GenerateFileIndexer();
|
||||
vstorage_->GenerateLevelFilesBrief();
|
||||
@ -1673,6 +1673,66 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys11) {
|
||||
ASSERT_EQ(7U, compaction->input(1, 0)->fd.GetNumber());
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, FileTtlBooster) {
|
||||
// Set TTL to 2048
|
||||
// TTL boosting for all levels starts at 1024,
|
||||
// Whole TTL range is 2048 * 31 / 32 - 1024 = 1984 - 1024 = 960.
|
||||
// From second last level (L5), range starts at
|
||||
// 1024 + 480, 1024 + 240, 1024 + 120 (which is L3).
|
||||
// Boosting step 124 / 16 = 7.75 -> 7
|
||||
//
|
||||
const uint64_t kCurrentTime = 1000000;
|
||||
FileMetaData meta;
|
||||
|
||||
{
|
||||
FileTtlBooster booster(kCurrentTime, 2048, 7, 3);
|
||||
|
||||
// Not triggering if the file is younger than ttl/2
|
||||
meta.oldest_ancester_time = kCurrentTime - 1023;
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
meta.oldest_ancester_time = kCurrentTime - 1024;
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
meta.oldest_ancester_time = kCurrentTime + 10;
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
|
||||
// Within one boosting step
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 120 + 6);
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
|
||||
// One boosting step
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 120 + 7);
|
||||
ASSERT_EQ(2, booster.GetBoostScore(&meta));
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 120 + 8);
|
||||
ASSERT_EQ(2, booster.GetBoostScore(&meta));
|
||||
|
||||
// Multiple boosting steps
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 120 + 30);
|
||||
ASSERT_EQ(5, booster.GetBoostScore(&meta));
|
||||
|
||||
// Very high boosting steps
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 120 + 700);
|
||||
ASSERT_EQ(101, booster.GetBoostScore(&meta));
|
||||
}
|
||||
{
|
||||
// Test second last level
|
||||
FileTtlBooster booster(kCurrentTime, 2048, 7, 5);
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 480);
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 480 + 60);
|
||||
ASSERT_EQ(3, booster.GetBoostScore(&meta));
|
||||
}
|
||||
{
|
||||
// Test last level
|
||||
FileTtlBooster booster(kCurrentTime, 2048, 7, 6);
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 480);
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
meta.oldest_ancester_time = kCurrentTime - (1024 + 480 + 60);
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
meta.oldest_ancester_time = kCurrentTime - 3000;
|
||||
ASSERT_EQ(1, booster.GetBoostScore(&meta));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(CompactionPickerTest, NotScheduleL1IfL0WithHigherPri1) {
|
||||
NewVersionStorage(6, kCompactionStyleLevel);
|
||||
mutable_cf_options_.level0_file_num_compaction_trigger = 2;
|
||||
|
92
db/compaction/file_pri.h
Normal file
92
db/compaction/file_pri.h
Normal file
@ -0,0 +1,92 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
|
||||
#pragma once
|
||||
#include <algorithm>
|
||||
|
||||
#include "db/version_edit.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
// We boost files that are closer to TTL limit. This boosting could be
|
||||
// through FileMetaData.compensated_file_size but this compensated size
|
||||
// is widely used as something similar to file size so dramatically boost
|
||||
// the value might cause unintended consequences.
|
||||
//
|
||||
// This boosting algorithm can go very fancy, but here we use a simple
|
||||
// formula which can satisify:
|
||||
// (1) Different levels are triggered slightly differently to avoid
|
||||
// too many cascading cases
|
||||
// (2) Files in the same level get boosting more when TTL gets closer.
|
||||
//
|
||||
// Don't do any boosting before TTL has past by half. This is to make
|
||||
// sure lower write amp for most of the case. And all levels should be
|
||||
// fully boosted when total TTL compaction threshold triggers.
|
||||
// Differientiate boosting ranges of each level by 1/2. This will make
|
||||
// range for each level exponentially increasing. We could do it by
|
||||
// having them to be equal, or go even fancier. We can adjust it after
|
||||
// we observe the behavior in production.
|
||||
// The threshold starting boosting:
|
||||
// +------------------------------------------------------------------ +
|
||||
// ^ ^ ^ ^ ^ ^
|
||||
// Age 0 ... | | second last level thresold
|
||||
// | |
|
||||
// | third last level
|
||||
// |
|
||||
// forth last level
|
||||
//
|
||||
// We arbitrarily set with 0 when a file is aged boost_age_start and
|
||||
// grow linearly. The ratio is arbitrarily set so that when the next level
|
||||
// starts to boost, the previous level's boosting amount is 16.
|
||||
class FileTtlBooster {
|
||||
public:
|
||||
FileTtlBooster(uint64_t current_time, uint64_t ttl, int num_non_empty_levels,
|
||||
int level)
|
||||
: current_time_(current_time) {
|
||||
if (ttl == 0 || level == 0 || level >= num_non_empty_levels - 1) {
|
||||
enabled_ = false;
|
||||
boost_age_start_ = 0;
|
||||
boost_step_ = 1;
|
||||
} else {
|
||||
enabled_ = true;
|
||||
uint64_t all_boost_start_age = ttl / 2;
|
||||
uint64_t all_boost_age_range = (ttl / 32) * 31 - all_boost_start_age;
|
||||
uint64_t boost_age_range =
|
||||
all_boost_age_range >> (num_non_empty_levels - level - 1);
|
||||
boost_age_start_ = all_boost_start_age + boost_age_range;
|
||||
const uint64_t kBoostRatio = 16;
|
||||
// prevent 0 value to avoid divide 0 error.
|
||||
boost_step_ = std::max(boost_age_range / kBoostRatio, uint64_t{1});
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t GetBoostScore(FileMetaData* f) {
|
||||
if (!enabled_) {
|
||||
return 1;
|
||||
}
|
||||
uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
|
||||
if (oldest_ancester_time >= current_time_) {
|
||||
return 1;
|
||||
}
|
||||
uint64_t age = current_time_ - oldest_ancester_time;
|
||||
if (age > boost_age_start_) {
|
||||
// Use integer just for convenience.
|
||||
// We could make all file_to_order double if we want.
|
||||
// Technically this can overflow if users override timing and
|
||||
// give a very high current time. Ignore the case for simplicity.
|
||||
// Boosting is addition to current value, so +1. This will effectively
|
||||
// make boosting to kick in after the first boost_step_ is reached.
|
||||
return (age - boost_age_start_) / boost_step_ + 1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
private:
|
||||
bool enabled_;
|
||||
uint64_t current_time_;
|
||||
uint64_t boost_age_start_;
|
||||
uint64_t boost_step_;
|
||||
};
|
||||
} // namespace ROCKSDB_NAMESPACE
|
@ -4302,6 +4302,67 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, LevelTtlBooster) {
|
||||
const int kNumKeysPerFile = 32;
|
||||
const int kNumLevelFiles = 3;
|
||||
const int kValueSize = 1000;
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.ttl = 10 * 60 * 60; // 10 hours
|
||||
options.periodic_compaction_seconds = 480 * 60 * 60; // very long
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.max_bytes_for_level_base = 5 * uint64_t{kNumKeysPerFile * kValueSize};
|
||||
options.max_open_files = -1; // needed for both periodic and ttl compactions
|
||||
options.compaction_pri = CompactionPri::kMinOverlappingRatio;
|
||||
env_->SetMockSleep();
|
||||
options.env = env_;
|
||||
|
||||
// NOTE: Presumed unnecessary and removed: resetting mock time in env
|
||||
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
for (int i = 0; i < kNumLevelFiles; ++i) {
|
||||
for (int j = 0; j < kNumKeysPerFile; ++j) {
|
||||
ASSERT_OK(
|
||||
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
|
||||
MoveFilesToLevel(2);
|
||||
|
||||
ASSERT_EQ("0,0,3", FilesPerLevel());
|
||||
|
||||
// Create some files for L1
|
||||
for (int i = 0; i < 2; i++) {
|
||||
for (int j = 0; j < kNumKeysPerFile; ++j) {
|
||||
ASSERT_OK(Put(Key(2 * j + i), rnd.RandomString(kValueSize)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
}
|
||||
|
||||
ASSERT_EQ("0,1,3", FilesPerLevel());
|
||||
|
||||
// Make the new L0 files qualify TTL boosting and generate one more to trigger
|
||||
// L1 -> L2 compaction. Old files will be picked even if their priority is
|
||||
// lower without boosting.
|
||||
env_->MockSleepForSeconds(8 * 60 * 60);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
for (int j = 0; j < kNumKeysPerFile; ++j) {
|
||||
ASSERT_OK(Put(Key(kNumKeysPerFile * 2 + 2 * j + i),
|
||||
rnd.RandomString(kValueSize * 2)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
}
|
||||
ASSERT_EQ("0,1,2", FilesPerLevel());
|
||||
|
||||
ASSERT_GT(SizeAtLevel(1), kNumKeysPerFile * 4 * kValueSize);
|
||||
}
|
||||
|
||||
TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
|
||||
class TestCompactionFilter : public CompactionFilter {
|
||||
const char* Name() const override { return "TestCompactionFilter"; }
|
||||
|
@ -149,7 +149,7 @@ class VersionBuilderTest : public testing::Test {
|
||||
}
|
||||
|
||||
void UpdateVersionStorageInfo() {
|
||||
vstorage_.UpdateFilesByCompactionPri(ioptions_.compaction_pri);
|
||||
vstorage_.UpdateFilesByCompactionPri(ioptions_, mutable_cf_options_);
|
||||
vstorage_.UpdateNumNonEmptyLevels();
|
||||
vstorage_.GenerateFileIndexer();
|
||||
vstorage_.GenerateLevelFilesBrief();
|
||||
|
@ -20,12 +20,13 @@
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "compaction/compaction.h"
|
||||
#include "db/blob/blob_fetcher.h"
|
||||
#include "db/blob/blob_file_cache.h"
|
||||
#include "db/blob/blob_file_reader.h"
|
||||
#include "db/blob/blob_index.h"
|
||||
#include "db/blob/blob_log_format.h"
|
||||
#include "db/compaction/compaction.h"
|
||||
#include "db/compaction/file_pri.h"
|
||||
#include "db/internal_stats.h"
|
||||
#include "db/log_reader.h"
|
||||
#include "db/log_writer.h"
|
||||
@ -2426,7 +2427,8 @@ void Version::PrepareApply(
|
||||
UpdateAccumulatedStats(update_stats);
|
||||
storage_info_.UpdateNumNonEmptyLevels();
|
||||
storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
|
||||
storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
|
||||
storage_info_.UpdateFilesByCompactionPri(*cfd_->ioptions(),
|
||||
mutable_cf_options);
|
||||
storage_info_.GenerateFileIndexer();
|
||||
storage_info_.GenerateLevelFilesBrief();
|
||||
storage_info_.GenerateLevel0NonOverlapping();
|
||||
@ -3138,11 +3140,22 @@ namespace {
|
||||
// Sort `temp` based on ratio of overlapping size over file size
|
||||
void SortFileByOverlappingRatio(
|
||||
const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
|
||||
const std::vector<FileMetaData*>& next_level_files,
|
||||
const std::vector<FileMetaData*>& next_level_files, SystemClock* clock,
|
||||
int level, int num_non_empty_levels, uint64_t ttl,
|
||||
std::vector<Fsize>* temp) {
|
||||
std::unordered_map<uint64_t, uint64_t> file_to_order;
|
||||
auto next_level_it = next_level_files.begin();
|
||||
|
||||
int64_t curr_time;
|
||||
Status status = clock->GetCurrentTime(&curr_time);
|
||||
if (!status.ok()) {
|
||||
// If we can't get time, disable TTL.
|
||||
ttl = 0;
|
||||
}
|
||||
|
||||
FileTtlBooster ttl_booster(static_cast<uint64_t>(curr_time), ttl,
|
||||
num_non_empty_levels, level);
|
||||
|
||||
for (auto& file : files) {
|
||||
uint64_t overlapping_bytes = 0;
|
||||
// Skip files in next level that is smaller than current file
|
||||
@ -3162,9 +3175,12 @@ void SortFileByOverlappingRatio(
|
||||
next_level_it++;
|
||||
}
|
||||
|
||||
uint64_t ttl_boost_score = (ttl > 0) ? ttl_booster.GetBoostScore(file) : 1;
|
||||
assert(ttl_boost_score > 0);
|
||||
assert(file->compensated_file_size != 0);
|
||||
file_to_order[file->fd.GetNumber()] =
|
||||
overlapping_bytes * 1024u / file->compensated_file_size;
|
||||
file_to_order[file->fd.GetNumber()] = overlapping_bytes * 1024U /
|
||||
file->compensated_file_size /
|
||||
ttl_boost_score;
|
||||
}
|
||||
|
||||
std::sort(temp->begin(), temp->end(),
|
||||
@ -3176,7 +3192,7 @@ void SortFileByOverlappingRatio(
|
||||
} // namespace
|
||||
|
||||
void VersionStorageInfo::UpdateFilesByCompactionPri(
|
||||
CompactionPri compaction_pri) {
|
||||
const ImmutableOptions& ioptions, const MutableCFOptions& options) {
|
||||
if (compaction_style_ == kCompactionStyleNone ||
|
||||
compaction_style_ == kCompactionStyleFIFO ||
|
||||
compaction_style_ == kCompactionStyleUniversal) {
|
||||
@ -3201,7 +3217,7 @@ void VersionStorageInfo::UpdateFilesByCompactionPri(
|
||||
if (num > temp.size()) {
|
||||
num = temp.size();
|
||||
}
|
||||
switch (compaction_pri) {
|
||||
switch (ioptions.compaction_pri) {
|
||||
case kByCompensatedSize:
|
||||
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
|
||||
CompareCompensatedSizeDescending);
|
||||
@ -3222,7 +3238,8 @@ void VersionStorageInfo::UpdateFilesByCompactionPri(
|
||||
break;
|
||||
case kMinOverlappingRatio:
|
||||
SortFileByOverlappingRatio(*internal_comparator_, files_[level],
|
||||
files_[level + 1], &temp);
|
||||
files_[level + 1], ioptions.clock, level,
|
||||
num_non_empty_levels_, options.ttl, &temp);
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
|
@ -197,7 +197,8 @@ class VersionStorageInfo {
|
||||
// Sort all files for this version based on their file size and
|
||||
// record results in files_by_compaction_pri_. The largest files are listed
|
||||
// first.
|
||||
void UpdateFilesByCompactionPri(CompactionPri compaction_pri);
|
||||
void UpdateFilesByCompactionPri(const ImmutableOptions& immutable_options,
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
|
||||
void GenerateLevel0NonOverlapping();
|
||||
bool level0_non_overlapping() const {
|
||||
|
@ -177,7 +177,7 @@ class VersionStorageInfoTestBase : public testing::Test {
|
||||
void Finalize() {
|
||||
vstorage_.UpdateNumNonEmptyLevels();
|
||||
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
||||
vstorage_.UpdateFilesByCompactionPri(ioptions_.compaction_pri);
|
||||
vstorage_.UpdateFilesByCompactionPri(ioptions_, mutable_cf_options_);
|
||||
vstorage_.GenerateFileIndexer();
|
||||
vstorage_.GenerateLevelFilesBrief();
|
||||
vstorage_.GenerateLevel0NonOverlapping();
|
||||
|
@ -808,6 +808,8 @@ DEFINE_uint64(periodic_compaction_seconds,
|
||||
"Files older than this will be picked up for compaction and"
|
||||
" rewritten to the same level");
|
||||
|
||||
DEFINE_uint64(ttl_seconds, ROCKSDB_NAMESPACE::Options().ttl, "Set options.ttl");
|
||||
|
||||
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
|
||||
if (value <= 0 || value>=100) {
|
||||
fprintf(stderr, "Invalid value for --%s: %d, 0< pct <100 \n",
|
||||
@ -4285,7 +4287,7 @@ class Benchmark {
|
||||
options.disable_auto_compactions = FLAGS_disable_auto_compactions;
|
||||
options.optimize_filters_for_hits = FLAGS_optimize_filters_for_hits;
|
||||
options.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
|
||||
|
||||
options.ttl = FLAGS_ttl_seconds;
|
||||
// fill storage options
|
||||
options.advise_random_on_open = FLAGS_advise_random_on_open;
|
||||
options.access_hint_on_compaction_start = FLAGS_compaction_fadvice_e;
|
||||
|
Loading…
Reference in New Issue
Block a user