Allowing L0 -> L1 trivial move on sorted data
Summary: This diff updates the logic of how we do trivial move, now trivial move can run on any number of files in input level as long as they are not overlapping The conditions for trivial move have been updated Introduced conditions: - Trivial move cannot happen if we have a compaction filter (except if the compaction is not manual) - Input level files cannot be overlapping Removed conditions: - Trivial move only run when the compaction is not manual - Input level should can contain only 1 file More context on what tests failed because of Trivial move ``` DBTest.CompactionsGenerateMultipleFiles This test is expecting compaction on a file in L0 to generate multiple files in L1, this test will fail with trivial move because we end up with one file in L1 ``` ``` DBTest.NoSpaceCompactRange This test expect compaction to fail when we force environment to report running out of space, of course this is not valid in trivial move situation because trivial move does not need any extra space, and did not check for that ``` ``` DBTest.DropWrites Similar to DBTest.NoSpaceCompactRange ``` ``` DBTest.DeleteObsoleteFilesPendingOutputs This test expect that a file in L2 is deleted after it's moved to L3, this is not valid with trivial move because although the file was moved it is now used by L3 ``` ``` CuckooTableDBTest.CompactionIntoMultipleFiles Same as DBTest.CompactionsGenerateMultipleFiles ``` This diff is based on a work by @sdong https://reviews.facebook.net/D34149 Test Plan: make -j64 check Reviewers: rven, sdong, igor Reviewed By: igor Subscribers: yhchiang, ott, march, dhruba, sdong Differential Revision: https://reviews.facebook.net/D34797
This commit is contained in:
parent
bb808eaddb
commit
3ce3bb3da2
@ -16,6 +16,7 @@
|
||||
#include <inttypes.h>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
#include "db/column_family.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/sync_point.h"
|
||||
@ -148,9 +149,27 @@ bool Compaction::IsTrivialMove() const {
|
||||
// Otherwise, the move could create a parent file that will require
|
||||
// a very expensive merge later on.
|
||||
// If start_level_== output_level_, the purpose is to force compaction
|
||||
// filter to be applied to that level, and thus cannot be a trivia move.
|
||||
// filter to be applied to that level, and thus cannot be a trivial move.
|
||||
|
||||
// Check if start level have files with overlapping ranges
|
||||
if (start_level_ == 0 &&
|
||||
input_version_->storage_info()->level0_non_overlapping() == false) {
|
||||
// We cannot move files from L0 to L1 if the files are overlapping
|
||||
return false;
|
||||
}
|
||||
|
||||
if (is_manual_compaction_ &&
|
||||
(cfd_->ioptions()->compaction_filter != nullptr ||
|
||||
!dynamic_cast<DefaultCompactionFilterFactory*>(
|
||||
cfd_->ioptions()->compaction_filter_factory) ||
|
||||
!dynamic_cast<DefaultCompactionFilterFactoryV2*>(
|
||||
cfd_->ioptions()->compaction_filter_factory_v2))) {
|
||||
// This is a manual compaction and we have a compaction filter that should
|
||||
// be executed, we cannot do a trivial move
|
||||
return false;
|
||||
}
|
||||
|
||||
return (start_level_ != output_level_ && num_input_levels() == 1 &&
|
||||
num_input_files(0) == 1 &&
|
||||
input(0, 0)->fd.GetPathId() == GetOutputPathId() &&
|
||||
InputCompressionMatchesOutput() &&
|
||||
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
|
||||
@ -340,4 +359,22 @@ uint64_t Compaction::OutputFilePreallocationSize() {
|
||||
return preallocation_size * 1.1;
|
||||
}
|
||||
|
||||
std::unique_ptr<CompactionFilter> Compaction::CreateCompactionFilter() const {
|
||||
CompactionFilter::Context context;
|
||||
context.is_full_compaction = is_full_compaction_;
|
||||
context.is_manual_compaction = is_manual_compaction_;
|
||||
return cfd_->ioptions()->compaction_filter_factory->CreateCompactionFilter(
|
||||
context);
|
||||
}
|
||||
|
||||
std::unique_ptr<CompactionFilterV2>
|
||||
Compaction::CreateCompactionFilterV2() const {
|
||||
CompactionFilterContext context;
|
||||
context.is_full_compaction = is_full_compaction_;
|
||||
context.is_manual_compaction = is_manual_compaction_;
|
||||
return
|
||||
cfd_->ioptions()->compaction_filter_factory_v2->CreateCompactionFilterV2(
|
||||
context);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -29,6 +29,8 @@ struct CompactionInputFiles {
|
||||
class Version;
|
||||
class ColumnFamilyData;
|
||||
class VersionStorageInfo;
|
||||
class CompactionFilter;
|
||||
class CompactionFilterV2;
|
||||
|
||||
// A Compaction encapsulates information about a compaction.
|
||||
class Compaction {
|
||||
@ -179,6 +181,12 @@ class Compaction {
|
||||
// to pick up the next file to be compacted from files_by_size_
|
||||
void ResetNextCompactionIndex();
|
||||
|
||||
// Create a CompactionFilter from compaction_filter_factory
|
||||
std::unique_ptr<CompactionFilter> CreateCompactionFilter() const;
|
||||
|
||||
// Create a CompactionFilterV2 from compaction_filter_factory_v2
|
||||
std::unique_ptr<CompactionFilterV2> CreateCompactionFilterV2() const;
|
||||
|
||||
private:
|
||||
// mark (or clear) all files that are being compacted
|
||||
void MarkFilesBeingCompacted(bool mark_as_compacted);
|
||||
|
@ -83,22 +83,6 @@ struct CompactionJob::CompactionState {
|
||||
num_input_records(0),
|
||||
num_output_records(0) {}
|
||||
|
||||
// Create a client visible context of this compaction
|
||||
CompactionFilter::Context GetFilterContextV1() {
|
||||
CompactionFilter::Context context;
|
||||
context.is_full_compaction = compaction->IsFullCompaction();
|
||||
context.is_manual_compaction = compaction->IsManualCompaction();
|
||||
return context;
|
||||
}
|
||||
|
||||
// Create a client visible context of this compaction
|
||||
CompactionFilterContext GetFilterContext() {
|
||||
CompactionFilterContext context;
|
||||
context.is_full_compaction = compaction->IsFullCompaction();
|
||||
context.is_manual_compaction = compaction->IsManualCompaction();
|
||||
return context;
|
||||
}
|
||||
|
||||
std::vector<std::string> key_str_buf_;
|
||||
std::vector<std::string> existing_value_str_buf_;
|
||||
// new_value_buf_ will only be appended if a value changes
|
||||
@ -360,11 +344,7 @@ Status CompactionJob::Run() {
|
||||
Status status;
|
||||
ParsedInternalKey ikey;
|
||||
std::unique_ptr<CompactionFilterV2> compaction_filter_from_factory_v2 =
|
||||
nullptr;
|
||||
auto context = compact_->GetFilterContext();
|
||||
compaction_filter_from_factory_v2 =
|
||||
cfd->ioptions()->compaction_filter_factory_v2->CreateCompactionFilterV2(
|
||||
context);
|
||||
compact_->compaction->CreateCompactionFilterV2();
|
||||
auto compaction_filter_v2 = compaction_filter_from_factory_v2.get();
|
||||
|
||||
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
||||
@ -629,10 +609,8 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros,
|
||||
auto compaction_filter = cfd->ioptions()->compaction_filter;
|
||||
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
|
||||
if (!compaction_filter) {
|
||||
auto context = compact_->GetFilterContextV1();
|
||||
compaction_filter_from_factory =
|
||||
cfd->ioptions()->compaction_filter_factory->CreateCompactionFilter(
|
||||
context);
|
||||
compact_->compaction->CreateCompactionFilter();
|
||||
compaction_filter = compaction_filter_from_factory.get();
|
||||
}
|
||||
|
||||
|
@ -321,6 +321,11 @@ class CompactionJobStatsTest : public testing::Test {
|
||||
ASSERT_OK(db_->CompactRange(&start, &limit));
|
||||
}
|
||||
|
||||
void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
|
||||
ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
|
||||
true /* disallow trivial move */));
|
||||
}
|
||||
|
||||
// Do n memtable compactions, each of which produces an sstable
|
||||
// covering the range [small,large].
|
||||
void MakeTables(int n, const std::string& small, const std::string& large,
|
||||
@ -588,7 +593,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
|
||||
1, num_keys_per_L0_file,
|
||||
compression_ratio, 0));
|
||||
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
|
||||
Compact(1, smallest_key, largest_key);
|
||||
TEST_Compact(0, 1, smallest_key, largest_key);
|
||||
snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
|
||||
ASSERT_EQ(std::string(buf), FilesPerLevel(1));
|
||||
}
|
||||
@ -606,7 +611,7 @@ TEST_F(CompactionJobStatsTest, CompactionJobStatsTest) {
|
||||
1, num_keys_per_L0_file * num_remaining_L0,
|
||||
compression_ratio, 0));
|
||||
ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
|
||||
Compact(1, smallest_key, largest_key);
|
||||
TEST_Compact(0, 1, smallest_key, largest_key);
|
||||
|
||||
int num_L1_files = num_L0_files - num_remaining_L0 + 1;
|
||||
num_L0_files = 0;
|
||||
|
@ -90,6 +90,7 @@ class CompactionPickerTest : public testing::Test {
|
||||
vstorage_->GenerateFileIndexer();
|
||||
vstorage_->GenerateLevelFilesBrief();
|
||||
vstorage_->ComputeCompactionScore(mutable_cf_options_, fifo_options_);
|
||||
vstorage_->GenerateLevel0NonOverlapping();
|
||||
vstorage_->SetFinalized();
|
||||
}
|
||||
};
|
||||
|
@ -243,7 +243,8 @@ TEST_F(CuckooTableDBTest, CompactionIntoMultipleFiles) {
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ("1", FilesPerLevel());
|
||||
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
|
||||
true /* disallow trivial move */);
|
||||
ASSERT_EQ("0,2", FilesPerLevel());
|
||||
for (int idx = 0; idx < 28; ++idx) {
|
||||
ASSERT_EQ(std::string(10000, 'a' + idx), Get(Key(idx)));
|
||||
|
@ -1825,7 +1825,8 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const {
|
||||
|
||||
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
int output_level, uint32_t output_path_id,
|
||||
const Slice* begin, const Slice* end) {
|
||||
const Slice* begin, const Slice* end,
|
||||
bool disallow_trivial_move) {
|
||||
assert(input_level == ColumnFamilyData::kCompactAllLevels ||
|
||||
input_level >= 0);
|
||||
|
||||
@ -1838,6 +1839,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
manual.output_path_id = output_path_id;
|
||||
manual.done = false;
|
||||
manual.in_progress = false;
|
||||
manual.disallow_trivial_move = disallow_trivial_move;
|
||||
// For universal compaction, we enforce every manual compaction to compact
|
||||
// all files.
|
||||
if (begin == nullptr ||
|
||||
@ -2271,6 +2273,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||
|
||||
bool is_manual = (manual_compaction_ != nullptr) &&
|
||||
(manual_compaction_->in_progress == false);
|
||||
bool trivial_move_disallowed = is_manual &&
|
||||
manual_compaction_->disallow_trivial_move;
|
||||
|
||||
CompactionJobStats compaction_job_stats;
|
||||
Status status = bg_error_;
|
||||
@ -2431,7 +2435,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||
c->column_family_data()->GetName().c_str(),
|
||||
c->num_input_files(0));
|
||||
*madeProgress = true;
|
||||
} else if (!is_manual && c->IsTrivialMove()) {
|
||||
} else if (!trivial_move_disallowed && c->IsTrivialMove()) {
|
||||
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
|
||||
// Instrument for event update
|
||||
// TODO(yhchiang): add op details for showing trivial-move.
|
||||
@ -2440,13 +2444,23 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||
|
||||
compaction_job_stats.num_input_files = c->num_input_files(0);
|
||||
|
||||
// Move file to next level
|
||||
assert(c->num_input_files(0) == 1);
|
||||
FileMetaData* f = c->input(0, 0);
|
||||
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
||||
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->smallest_seqno, f->largest_seqno);
|
||||
// Move files to next level
|
||||
int32_t moved_files = 0;
|
||||
int64_t moved_bytes = 0;
|
||||
for (size_t i = 0; i < c->num_input_files(0); i++) {
|
||||
FileMetaData* f = c->input(0, i);
|
||||
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
||||
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->smallest_seqno, f->largest_seqno);
|
||||
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
|
||||
c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
|
||||
c->level() + 1, f->fd.GetFileSize());
|
||||
++moved_files;
|
||||
moved_bytes += f->fd.GetFileSize();
|
||||
}
|
||||
status = versions_->LogAndApply(c->column_family_data(),
|
||||
*c->mutable_cf_options(), c->edit(),
|
||||
&mutex_, directories_.GetDbDir());
|
||||
@ -2455,20 +2469,20 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||
*c->mutable_cf_options());
|
||||
|
||||
VersionStorageInfo::LevelSummaryStorage tmp;
|
||||
c->column_family_data()->internal_stats()->IncBytesMoved(
|
||||
c->level() + 1, f->fd.GetFileSize());
|
||||
c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1,
|
||||
moved_bytes);
|
||||
{
|
||||
event_logger_.LogToBuffer(log_buffer)
|
||||
<< "job" << job_context->job_id << "event"
|
||||
<< "trivial_move"
|
||||
<< "destination_level" << c->level() + 1 << "file_number"
|
||||
<< f->fd.GetNumber() << "file_size" << f->fd.GetFileSize();
|
||||
<< "destination_level" << c->level() + 1 << "files" << moved_files
|
||||
<< "total_files_size" << moved_bytes;
|
||||
}
|
||||
LogToBuffer(
|
||||
log_buffer,
|
||||
"[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 " bytes %s: %s\n",
|
||||
c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
|
||||
c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(),
|
||||
"[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
|
||||
c->column_family_data()->GetName().c_str(), moved_files, c->level() + 1,
|
||||
moved_bytes, status.ToString().c_str(),
|
||||
c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
|
||||
*madeProgress = true;
|
||||
|
||||
|
15
db/db_impl.h
15
db/db_impl.h
@ -233,7 +233,8 @@ class DBImpl : public DB {
|
||||
|
||||
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
||||
int output_level, uint32_t output_path_id,
|
||||
const Slice* begin, const Slice* end);
|
||||
const Slice* begin, const Slice* end,
|
||||
bool disallow_trivial_move = false);
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// Extra methods (for testing) that are not in the public DB interface
|
||||
@ -241,7 +242,8 @@ class DBImpl : public DB {
|
||||
|
||||
// Compact any files in the named level that overlap [*begin, *end]
|
||||
Status TEST_CompactRange(int level, const Slice* begin, const Slice* end,
|
||||
ColumnFamilyHandle* column_family = nullptr);
|
||||
ColumnFamilyHandle* column_family = nullptr,
|
||||
bool disallow_trivial_move = false);
|
||||
|
||||
// Force current memtable contents to be flushed.
|
||||
Status TEST_FlushMemTable(bool wait = true);
|
||||
@ -640,10 +642,11 @@ class DBImpl : public DB {
|
||||
uint32_t output_path_id;
|
||||
bool done;
|
||||
Status status;
|
||||
bool in_progress; // compaction request being processed?
|
||||
const InternalKey* begin; // nullptr means beginning of key range
|
||||
const InternalKey* end; // nullptr means end of key range
|
||||
InternalKey tmp_storage; // Used to keep track of compaction progress
|
||||
bool in_progress; // compaction request being processed?
|
||||
const InternalKey* begin; // nullptr means beginning of key range
|
||||
const InternalKey* end; // nullptr means end of key range
|
||||
InternalKey tmp_storage; // Used to keep track of compaction progress
|
||||
bool disallow_trivial_move; // Force actual compaction to run
|
||||
};
|
||||
ManualCompaction* manual_compaction_;
|
||||
|
||||
|
@ -73,7 +73,8 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
|
||||
|
||||
Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
|
||||
const Slice* end,
|
||||
ColumnFamilyHandle* column_family) {
|
||||
ColumnFamilyHandle* column_family,
|
||||
bool disallow_trivial_move) {
|
||||
ColumnFamilyData* cfd;
|
||||
if (column_family == nullptr) {
|
||||
cfd = default_cf_handle_->cfd();
|
||||
@ -86,7 +87,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
|
||||
cfd->ioptions()->compaction_style == kCompactionStyleFIFO)
|
||||
? level
|
||||
: level + 1;
|
||||
return RunManualCompaction(cfd, level, output_level, 0, begin, end);
|
||||
return RunManualCompaction(cfd, level, output_level, 0, begin, end,
|
||||
disallow_trivial_move);
|
||||
}
|
||||
|
||||
Status DBImpl::TEST_FlushMemTable(bool wait) {
|
||||
|
167
db/db_test.cc
167
db/db_test.cc
@ -1496,6 +1496,7 @@ TEST_F(DBTest, CompactedDB) {
|
||||
ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
|
||||
Flush();
|
||||
ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
|
||||
ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
|
||||
Flush();
|
||||
Close();
|
||||
|
||||
@ -1509,7 +1510,6 @@ TEST_F(DBTest, CompactedDB) {
|
||||
// Full compaction
|
||||
Reopen(options);
|
||||
// Add more keys
|
||||
ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
|
||||
ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
|
||||
ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
|
||||
ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
|
||||
@ -2049,7 +2049,8 @@ TEST_F(DBTest, KeyMayExist) {
|
||||
ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD));
|
||||
|
||||
ASSERT_OK(Flush(1));
|
||||
db_->CompactRange(handles_[1], nullptr, nullptr);
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
|
||||
true /* disallow trivial move */);
|
||||
|
||||
numopen = TestGetTickerCount(options, NO_FILE_OPENS);
|
||||
cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
|
||||
@ -3736,7 +3737,8 @@ TEST_F(DBTest, CompactionsGenerateMultipleFiles) {
|
||||
|
||||
// Reopening moves updates to level-0
|
||||
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]);
|
||||
dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
|
||||
true /* disallow trivial move */);
|
||||
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
|
||||
ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
|
||||
@ -3745,6 +3747,154 @@ TEST_F(DBTest, CompactionsGenerateMultipleFiles) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, TrivialMoveOneFile) {
|
||||
int32_t trivial_move = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCompaction:TrivialMove",
|
||||
[&](void* arg) { trivial_move++; });
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Options options;
|
||||
options.write_buffer_size = 100000000;
|
||||
options = CurrentOptions(options);
|
||||
DestroyAndReopen(options);
|
||||
|
||||
int32_t num_keys = 80;
|
||||
int32_t value_size = 100 * 1024; // 100 KB
|
||||
|
||||
Random rnd(301);
|
||||
std::vector<std::string> values;
|
||||
for (int i = 0; i < num_keys; i++) {
|
||||
values.push_back(RandomString(&rnd, value_size));
|
||||
ASSERT_OK(Put(Key(i), values[i]));
|
||||
}
|
||||
|
||||
// Reopening moves updates to L0
|
||||
Reopen(options);
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 1); // 1 file in L0
|
||||
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // 0 files in L1
|
||||
|
||||
std::vector<LiveFileMetaData> metadata;
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(metadata.size(), 1U);
|
||||
LiveFileMetaData level0_file = metadata[0]; // L0 file meta
|
||||
|
||||
// Compaction will initiate a trivial move from L0 to L1
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
|
||||
// File moved From L0 to L1
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0
|
||||
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // 1 file in L1
|
||||
|
||||
metadata.clear();
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(metadata.size(), 1U);
|
||||
ASSERT_EQ(metadata[0].name /* level1_file.name */, level0_file.name);
|
||||
ASSERT_EQ(metadata[0].size /* level1_file.size */, level0_file.size);
|
||||
|
||||
for (int i = 0; i < num_keys; i++) {
|
||||
ASSERT_EQ(Get(Key(i)), values[i]);
|
||||
}
|
||||
|
||||
ASSERT_EQ(trivial_move, 1);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest, TrivialMoveNonOverlappingFiles) {
|
||||
int32_t trivial_move = 0;
|
||||
int32_t non_trivial_move = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCompaction:TrivialMove",
|
||||
[&](void* arg) { trivial_move++; });
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DBImpl::BackgroundCompaction:NonTrivial",
|
||||
[&](void* arg) { non_trivial_move++; });
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 10 * 1024 * 1024;
|
||||
|
||||
DestroyAndReopen(options);
|
||||
// non overlapping ranges
|
||||
std::vector<std::pair<int32_t, int32_t>> ranges = {
|
||||
{100, 199},
|
||||
{300, 399},
|
||||
{0, 99},
|
||||
{200, 299},
|
||||
{600, 699},
|
||||
{400, 499},
|
||||
{500, 550},
|
||||
{551, 599},
|
||||
};
|
||||
int32_t value_size = 10 * 1024; // 10 KB
|
||||
|
||||
Random rnd(301);
|
||||
std::map<int32_t, std::string> values;
|
||||
for (uint32_t i = 0; i < ranges.size(); i++) {
|
||||
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
|
||||
values[j] = RandomString(&rnd, value_size);
|
||||
ASSERT_OK(Put(Key(j), values[j]));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
|
||||
int32_t level0_files = NumTableFilesAtLevel(0, 0);
|
||||
ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0
|
||||
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
|
||||
|
||||
// Since data is non-overlapping we expect compaction to initiate
|
||||
// a trivial move
|
||||
db_->CompactRange(nullptr, nullptr);
|
||||
// We expect that all the files were trivially moved from L0 to L1
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
|
||||
ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
|
||||
|
||||
for (uint32_t i = 0; i < ranges.size(); i++) {
|
||||
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
|
||||
ASSERT_EQ(Get(Key(j)), values[j]);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ(trivial_move, 1);
|
||||
ASSERT_EQ(non_trivial_move, 0);
|
||||
|
||||
trivial_move = 0;
|
||||
non_trivial_move = 0;
|
||||
values.clear();
|
||||
DestroyAndReopen(options);
|
||||
// Same ranges as above but overlapping
|
||||
ranges = {
|
||||
{100, 199},
|
||||
{300, 399},
|
||||
{0, 99},
|
||||
{200, 299},
|
||||
{600, 699},
|
||||
{400, 499},
|
||||
{500, 560}, // this range overlap with the next one
|
||||
{551, 599},
|
||||
};
|
||||
for (uint32_t i = 0; i < ranges.size(); i++) {
|
||||
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
|
||||
values[j] = RandomString(&rnd, value_size);
|
||||
ASSERT_OK(Put(Key(j), values[j]));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
|
||||
db_->CompactRange(nullptr, nullptr);
|
||||
|
||||
for (uint32_t i = 0; i < ranges.size(); i++) {
|
||||
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
|
||||
ASSERT_EQ(Get(Key(j)), values[j]);
|
||||
}
|
||||
}
|
||||
ASSERT_EQ(trivial_move, 0);
|
||||
ASSERT_EQ(non_trivial_move, 1);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest, CompactionTrigger) {
|
||||
Options options;
|
||||
options.write_buffer_size = 100<<10; //100KB
|
||||
@ -7194,7 +7344,8 @@ TEST_F(DBTest, DropWrites) {
|
||||
if (level > 0 && level == dbfull()->NumberLevels() - 1) {
|
||||
break;
|
||||
}
|
||||
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
|
||||
dbfull()->TEST_CompactRange(level, nullptr, nullptr, nullptr,
|
||||
true /* disallow trivial move */);
|
||||
}
|
||||
} else {
|
||||
dbfull()->CompactRange(nullptr, nullptr);
|
||||
@ -7257,7 +7408,8 @@ TEST_F(DBTest, NoSpaceCompactRange) {
|
||||
// Force out-of-space errors
|
||||
env_->no_space_.store(true, std::memory_order_release);
|
||||
|
||||
Status s = db_->CompactRange(nullptr, nullptr);
|
||||
Status s = dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
|
||||
true /* disallow trivial move */);
|
||||
ASSERT_TRUE(s.IsIOError());
|
||||
|
||||
env_->no_space_.store(false, std::memory_order_release);
|
||||
@ -12489,7 +12641,8 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
|
||||
auto file_on_L2 = metadata[0].name;
|
||||
listener->SetExpectedFileName(dbname_ + file_on_L2);
|
||||
|
||||
ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr));
|
||||
ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr,
|
||||
true /* disallow trivial move */));
|
||||
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
|
||||
|
||||
// finish the flush!
|
||||
@ -12502,7 +12655,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(metadata.size(), 2U);
|
||||
|
||||
// This file should have been deleted
|
||||
// This file should have been deleted during last compaction
|
||||
ASSERT_TRUE(!env_->FileExists(dbname_ + file_on_L2));
|
||||
listener->VerifyMatchedCount(1);
|
||||
}
|
||||
|
@ -82,6 +82,7 @@ class VersionBuilderTest : public testing::Test {
|
||||
vstorage_.GenerateFileIndexer();
|
||||
vstorage_.GenerateLevelFilesBrief();
|
||||
vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_);
|
||||
vstorage_.GenerateLevel0NonOverlapping();
|
||||
vstorage_.SetFinalized();
|
||||
}
|
||||
};
|
||||
|
@ -733,6 +733,7 @@ VersionStorageInfo::VersionStorageInfo(
|
||||
files_(new std::vector<FileMetaData*>[num_levels_]),
|
||||
base_level_(num_levels_ == 1 ? -1 : 1),
|
||||
files_by_size_(num_levels_),
|
||||
level0_non_overlapping_(false),
|
||||
next_file_to_compact_by_size_(num_levels_),
|
||||
compaction_score_(num_levels_),
|
||||
compaction_level_(num_levels_),
|
||||
@ -871,6 +872,7 @@ void Version::PrepareApply(const MutableCFOptions& mutable_cf_options) {
|
||||
storage_info_.UpdateFilesBySize();
|
||||
storage_info_.GenerateFileIndexer();
|
||||
storage_info_.GenerateLevelFilesBrief();
|
||||
storage_info_.GenerateLevel0NonOverlapping();
|
||||
}
|
||||
|
||||
bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
|
||||
@ -1121,6 +1123,7 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
|
||||
// 3. UpdateFilesBySize();
|
||||
// 4. GenerateFileIndexer();
|
||||
// 5. GenerateLevelFilesBrief();
|
||||
// 6. GenerateLevel0NonOverlapping();
|
||||
void VersionStorageInfo::SetFinalized() {
|
||||
finalized_ = true;
|
||||
#ifndef NDEBUG
|
||||
@ -1206,6 +1209,33 @@ void VersionStorageInfo::UpdateFilesBySize() {
|
||||
}
|
||||
}
|
||||
|
||||
void VersionStorageInfo::GenerateLevel0NonOverlapping() {
|
||||
assert(!finalized_);
|
||||
level0_non_overlapping_ = true;
|
||||
if (level_files_brief_.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// A copy of L0 files sorted by smallest key
|
||||
std::vector<FdWithKeyRange> level0_sorted_file(
|
||||
level_files_brief_[0].files,
|
||||
level_files_brief_[0].files + level_files_brief_[0].num_files);
|
||||
sort(level0_sorted_file.begin(), level0_sorted_file.end(),
|
||||
[this](FdWithKeyRange& f1, FdWithKeyRange& f2) -> bool {
|
||||
return (internal_comparator_->Compare(f1.smallest_key,
|
||||
f2.smallest_key) < 0);
|
||||
});
|
||||
|
||||
for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
|
||||
FdWithKeyRange& f = level0_sorted_file[i];
|
||||
FdWithKeyRange& prev = level0_sorted_file[i - 1];
|
||||
if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
|
||||
level0_non_overlapping_ = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Version::Ref() {
|
||||
++refs_;
|
||||
}
|
||||
|
@ -131,6 +131,11 @@ class VersionStorageInfo {
|
||||
// record results in files_by_size_. The largest files are listed first.
|
||||
void UpdateFilesBySize();
|
||||
|
||||
void GenerateLevel0NonOverlapping();
|
||||
bool level0_non_overlapping() const {
|
||||
return level0_non_overlapping_;
|
||||
}
|
||||
|
||||
int MaxInputLevel() const;
|
||||
|
||||
// Returns the maxmimum compaction score for levels 1 to max
|
||||
@ -343,6 +348,9 @@ class VersionStorageInfo {
|
||||
// This vector stores the index of the file from files_.
|
||||
std::vector<std::vector<int>> files_by_size_;
|
||||
|
||||
// If true, means that files in L0 have keys with non overlapping ranges
|
||||
bool level0_non_overlapping_;
|
||||
|
||||
// An index into files_by_size_ that specifies the first
|
||||
// file that is not yet compacted
|
||||
std::vector<int> next_file_to_compact_by_size_;
|
||||
|
Loading…
Reference in New Issue
Block a user