Compacting column families

Summary: This diff enables non-default column families to get compacted both automatically and also by calling CompactRange()

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15813
This commit is contained in:
Igor Canadi 2014-01-31 16:45:20 -08:00
parent 5661ed8b80
commit 27a8856c23
6 changed files with 177 additions and 64 deletions

View File

@ -83,6 +83,46 @@ class ColumnFamilyTest {
return result;
}
void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit));
}
int NumTableFilesAtLevel(int cf, int level) {
string property;
ASSERT_TRUE(db_->GetProperty(
handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
}
// Return spread of files per level
string FilesPerLevel(int cf) {
string result;
int last_non_zero_offset = 0;
for (int level = 0; level < column_family_options_.num_levels; level++) {
int f = NumTableFilesAtLevel(cf, level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
// Do n memtable flushes, each of which produces an sstable
// covering the range [small,large].
void MakeTables(int cf, int n, const string& small,
const string& large) {
for (int i = 0; i < n; i++) {
ASSERT_OK(Put(cf, small, "begin"));
ASSERT_OK(Put(cf, large, "end"));
ASSERT_OK(db_->Flush(FlushOptions(), handles_[cf]));
}
}
void CopyFile(const string& source, const string& destination,
uint64_t size = 0) {
const EnvOptions soptions;
@ -111,7 +151,7 @@ class ColumnFamilyTest {
ColumnFamilyOptions column_family_options_;
DBOptions db_options_;
string dbname_;
DB* db_;
DB* db_ = nullptr;
Env* env_;
};
@ -274,6 +314,52 @@ TEST(ColumnFamilyTest, FlushTest) {
Close();
}
// This is the same as DBTest::ManualCompaction, but it does all
// operations on non-default column family
TEST(ColumnFamilyTest, ManualCompaction) {
// iter - 0 with 7 levels
// iter - 1 with 3 levels
int cf = 1;
for (int iter = 0; iter < 2; ++iter) {
column_family_options_.num_levels = (iter == 0) ? 3 : 7;
Destroy();
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"one"});
Close();
ASSERT_OK(Open({"default", "one"}));
MakeTables(cf, 3, "p", "q");
ASSERT_EQ("1,1,1", FilesPerLevel(cf));
// Compaction range falls before files
Compact(cf, "", "c");
ASSERT_EQ("1,1,1", FilesPerLevel(cf));
// Compaction range falls after files
Compact(cf, "r", "z");
ASSERT_EQ("1,1,1", FilesPerLevel(cf));
// Compaction range overlaps files
Compact(cf, "p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel(cf));
// Populate a different range
MakeTables(cf, 3, "c", "e");
ASSERT_EQ("1,1,2", FilesPerLevel(cf));
// Compact just the new range
Compact(cf, "b", "f");
ASSERT_EQ("0,0,2", FilesPerLevel(cf));
// Compact all
MakeTables(cf, 1, "a", "z");
ASSERT_EQ("0,1,2", FilesPerLevel(cf));
Compact(cf, "", "zzz");
ASSERT_EQ("0,0,1", FilesPerLevel(cf));
}
Close();
}
} // namespace rocksdb

View File

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction.h"
#include "db/column_family.h"
namespace rocksdb {
@ -29,6 +30,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(input_version),
number_levels_(input_version_->NumberLevels()),
cfd_(input_version_->cfd_),
seek_compaction_(seek_compaction),
enable_compression_(enable_compression),
grandparent_index_(0),
@ -43,6 +45,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
input_version_->Ref();
edit_ = new VersionEdit();
edit_->SetColumnFamily(cfd_->GetID());
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
}
@ -170,6 +173,10 @@ void Compaction::ReleaseInputs() {
}
}
void Compaction::ReleaseCompactionFiles(Status status) {
cfd_->compaction_picker()->ReleaseCompactionFiles(this, status);
}
void Compaction::ResetNextCompactionIndex() {
input_version_->ResetNextCompactionIndex(level_);
}

View File

@ -13,6 +13,7 @@
namespace rocksdb {
class Version;
class ColumnFamilyData;
// A Compaction encapsulates information about a compaction.
class Compaction {
@ -36,6 +37,8 @@ class Compaction {
// Returns input version of the compaction
Version* input_version() const { return input_version_; }
ColumnFamilyData* column_family_data() const { return cfd_; }
// Return the ith input file at "level()+which" ("which" must be 0 or 1).
FileMetaData* input(int which, int i) const { return inputs_[which][i]; }
@ -67,6 +70,10 @@ class Compaction {
// is successful.
void ReleaseInputs();
// Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions.
void ReleaseCompactionFiles(Status status);
void Summary(char* output, int len);
// Return the score that was used to pick this compaction run.
@ -94,6 +101,7 @@ class Compaction {
Version* input_version_;
VersionEdit* edit_;
int number_levels_;
ColumnFamilyData* cfd_;
bool seek_compaction_;
bool enable_compression_;

View File

@ -1264,7 +1264,14 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
const Slice* begin, const Slice* end,
bool reduce_level, int target_level) {
Status s = FlushMemTable(default_cfd_, FlushOptions());
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
mutex_.Unlock();
// this is asserting because client calling DB methods with undefined
// ColumnFamilyHandle is undefined behavior.
assert(cfd != nullptr);
Status s = FlushMemTable(cfd, FlushOptions());
if (!s.ok()) {
LogFlush(options_.info_log);
return s;
@ -1273,8 +1280,8 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = default_cfd_->current();
for (int level = 1; level < NumberLevels(); level++) {
Version* base = cfd->current();
for (int level = 1; level < cfd->NumberLevels(); level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
@ -1285,9 +1292,9 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
// bottom-most level, the output level will be the same as input one
if (options_.compaction_style == kCompactionStyleUniversal ||
level == max_level_with_files) {
s = RunManualCompaction(level, level, begin, end);
s = RunManualCompaction(cfd, level, level, begin, end);
} else {
s = RunManualCompaction(level, level + 1, begin, end);
s = RunManualCompaction(cfd, level, level + 1, begin, end);
}
if (!s.ok()) {
LogFlush(options_.info_log);
@ -1296,7 +1303,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
}
if (reduce_level) {
s = ReFitLevel(max_level_with_files, target_level);
s = ReFitLevel(cfd, max_level_with_files, target_level);
}
LogFlush(options_.info_log);
@ -1304,15 +1311,15 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
}
// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(int level) {
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) {
mutex_.AssertHeld();
Version* current = default_cfd_->current();
Version* current = cfd->current();
int minimum_level = level;
for (int i = level - 1; i > 0; --i) {
// stop if level i is not empty
if (current->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files)
if (default_cfd_->compaction_picker()->MaxBytesForLevel(i) <
if (cfd->compaction_picker()->MaxBytesForLevel(i) <
current->NumLevelBytes(level)) {
break;
}
@ -1322,8 +1329,8 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) {
return minimum_level;
}
Status DBImpl::ReFitLevel(int level, int target_level) {
assert(level < NumberLevels());
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
assert(level < cfd->NumberLevels());
SuperVersion* superversion_to_free = nullptr;
SuperVersion* new_superversion = new SuperVersion();
@ -1351,7 +1358,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
// move to a smaller level
int to_level = target_level;
if (target_level < 0) {
to_level = FindMinimumEmptyLevelFitting(level);
to_level = FindMinimumEmptyLevelFitting(cfd, level);
}
assert(to_level <= level);
@ -1359,10 +1366,11 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Status status;
if (to_level < level) {
Log(options_.info_log, "Before refitting:\n%s",
default_cfd_->current()->DebugString().data());
cfd->current()->DebugString().data());
VersionEdit edit;
for (const auto& f : default_cfd_->current()->files_[level]) {
edit.SetColumnFamily(cfd->GetID());
for (const auto& f : cfd->current()->files_[level]) {
edit.DeleteFile(level, f->number);
edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
@ -1370,16 +1378,15 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Log(options_.info_log, "Apply version edit:\n%s",
edit.DebugString().data());
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_,
db_directory_.get());
superversion_to_free = default_cfd_->InstallSuperVersion(new_superversion);
status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
superversion_to_free = cfd->InstallSuperVersion(new_superversion);
new_superversion = nullptr;
Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data());
if (status.ok()) {
Log(options_.info_log, "After refitting:\n%s",
default_cfd_->current()->DebugString().data());
cfd->current()->DebugString().data());
}
}
@ -1607,15 +1614,15 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
return status;
}
Status DBImpl::RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, const Slice* begin,
const Slice* end) {
assert(input_level >= 0);
InternalKey begin_storage, end_storage;
ManualCompaction manual;
manual.cfd = cfd;
manual.input_level = input_level;
manual.output_level = output_level;
manual.done = false;
@ -1630,7 +1637,7 @@ Status DBImpl::RunManualCompaction(int input_level,
manual.begin = &begin_storage;
}
if (end == nullptr ||
options_.compaction_style == kCompactionStyleUniversal) {
cfd->options()->compaction_style == kCompactionStyleUniversal) {
manual.end = nullptr;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
@ -1686,7 +1693,7 @@ Status DBImpl::TEST_CompactRange(int level,
int output_level = (options_.compaction_style == kCompactionStyleUniversal)
? level
: level + 1;
return RunManualCompaction(level, output_level, begin, end);
return RunManualCompaction(default_cfd_, level, output_level, begin, end);
}
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
@ -1946,8 +1953,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (is_manual) {
ManualCompaction* m = manual_compaction_;
assert(m->in_progress);
c.reset(default_cfd_->CompactRange(m->input_level, m->output_level,
m->begin, m->end, &manual_end));
c.reset(m->cfd->CompactRange(m->input_level, m->output_level, m->begin,
m->end, &manual_end));
if (!c) {
m->done = true;
}
@ -1962,7 +1969,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
? "(end)"
: manual_end->DebugString().c_str()));
} else if (!options_.disable_auto_compactions) {
c.reset(default_cfd_->PickCompaction());
for (auto cfd : *versions_->GetColumnFamilySet()) {
c.reset(cfd->PickCompaction());
if (c != nullptr) {
break;
}
}
}
Status status;
@ -1977,23 +1989,23 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(default_cfd_, c->edit(), &mutex_,
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
InstallSuperVersion(default_cfd_, deletion_state);
InstallSuperVersion(c->column_family_data(), deletion_state);
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), default_cfd_->current()->LevelSummary(&tmp));
default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status);
status.ToString().c_str(), c->input_version()->LevelSummary(&tmp));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact, deletion_state);
CleanupCompaction(compact, status);
default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status);
c->ReleaseCompactionFiles(status);
c->ReleaseInputs();
*madeProgress = true;
}
@ -2123,8 +2135,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
if (s.ok()) {
// Over-estimate slightly so we don't end up just barely crossing
// the threshold.
ColumnFamilyData* cfd = compact->compaction->column_family_data();
compact->outfile->SetPreallocationBlockSize(
1.1 * default_cfd_->compaction_picker()->MaxFileSizeForLevel(
1.1 * cfd->compaction_picker()->MaxFileSizeForLevel(
compact->compaction->output_level()));
CompressionType compression_type = GetCompressionType(
@ -2228,8 +2241,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
compact->compaction->output_level(), out.number, out.file_size,
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(default_cfd_, compact->compaction->edit(),
&mutex_, db_directory_.get());
return versions_->LogAndApply(compact->compaction->column_family_data(),
compact->compaction->edit(), &mutex_,
db_directory_.get());
}
//
@ -2264,13 +2278,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
DeletionState& deletion_state) {
assert(compact);
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
ColumnFamilyData* cfd = compact->compaction->column_family_data();
Log(options_.info_log,
"Compacting %d@%d + %d@%d files, score %.2f slots available %d",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->output_level(),
compact->compaction->score(),
"[CF %u] Compacting %d@%d + %d@%d files, score %.2f slots available %d",
cfd->GetID(), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->num_input_files(1),
compact->compaction->output_level(), compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[256];
compact->compaction->Summary(scratch, sizeof(scratch));
@ -2321,7 +2334,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
MergeHelper merge(user_comparator(), options_.merge_operator.get(),
options_.info_log.get(),
false /* internal key corruption is expected */);
auto compaction_filter = options_.compaction_filter;
auto compaction_filter = cfd->options()->compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (!compaction_filter) {
auto context = compact->GetFilterContext();
@ -2333,12 +2346,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work
// TODO: remove memtable flush from normal compaction work
if (default_cfd_->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
if (cfd->imm()->imm_flush_needed.NoBarrier_Load() != nullptr) {
const uint64_t imm_start = env_->NowMicros();
LogFlush(options_.info_log);
mutex_.Lock();
if (default_cfd_->imm()->IsFlushPending()) {
FlushMemTableToOutputFile(default_cfd_, nullptr, deletion_state);
if (cfd->imm()->IsFlushPending()) {
FlushMemTableToOutputFile(cfd, nullptr, deletion_state);
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
}
mutex_.Unlock();
@ -2388,11 +2401,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// the entry with a delete marker.
bool value_changed = false;
compaction_filter_value.clear();
bool to_delete =
compaction_filter->Filter(compact->compaction->level(),
ikey.user_key, value,
&compaction_filter_value,
&value_changed);
bool to_delete = compaction_filter->Filter(
compact->compaction->level(), ikey.user_key, value,
&compaction_filter_value, &value_changed);
if (to_delete) {
// make a copy of the original key
delete_key.assign(key.data(), key.data() + key.size());
@ -2410,7 +2421,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
value = compaction_filter_value;
}
}
}
// If there are no snapshots, then this kv affect visibility at tip.
@ -2649,7 +2659,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (status.ok()) {
status = InstallCompactionResults(compact);
InstallSuperVersion(default_cfd_, deletion_state);
InstallSuperVersion(cfd, deletion_state);
}
Version::LevelSummaryStorage tmp;
Log(options_.info_log,
@ -2847,7 +2857,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
// this is asserting because client calling Get() with undefined
// this is asserting because client calling DB methods with undefined
// ColumnFamilyHandle is undefined behavior.
assert(cfd != nullptr);
SuperVersion* get_version = cfd->GetSuperVersion()->Ref();
@ -3538,7 +3548,8 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
const Slice& property, std::string* value) {
value->clear();
MutexLock l(&mutex_);
return internal_stats_.GetProperty(property, value, default_cfd_);
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
return internal_stats_.GetProperty(property, value, cfd);
}
void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family,

View File

@ -129,9 +129,8 @@ class DBImpl : public DB {
virtual Status GetDbIdentity(std::string& identity);
Status RunManualCompaction(int input_level,
int output_level,
const Slice* begin,
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, const Slice* begin,
const Slice* end);
// Extra methods (for testing) that are not in the public DB interface
@ -361,12 +360,12 @@ class DBImpl : public DB {
// Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(int level);
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level);
// Move the files in the input level to the target level.
// If target_level < 0, automatically calculate the minimum level that could
// hold the data set.
Status ReFitLevel(int level, int target_level = -1);
Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1);
// Returns the current SuperVersion number.
uint64_t CurrentVersionNumber() const;
@ -428,6 +427,7 @@ class DBImpl : public DB {
// Information for a manual compaction
struct ManualCompaction {
ColumnFamilyData* cfd;
int input_level;
int output_level;
bool done;

View File

@ -241,7 +241,8 @@ bool Version::PrefixMayMatch(const ReadOptions& options,
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
const EnvOptions& soptions,
int level) const {
Iterator* level_iter = new LevelFileNumIterator(vset_->icmp_, &files_[level]);
Iterator* level_iter =
new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]);
if (options.prefix) {
InternalKey internal_prefix(*options.prefix, 0, kTypeValue);
if (!PrefixMayMatch(options, soptions,
@ -2283,7 +2284,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(
c->input_version()->cfd_->internal_comparator(),
c->column_family_data()->internal_comparator(),
c->inputs(which)),
&GetFileIterator, table_cache_, options, storage_options_,
true /* for compaction */);
@ -2291,7 +2292,8 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
}
}
assert(num <= space);
Iterator* result = NewMergingIterator(&icmp_, list, num);
Iterator* result = NewMergingIterator(
&c->column_family_data()->internal_comparator(), list, num);
delete[] list;
return result;
}
@ -2300,8 +2302,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
// in the current version
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
#ifndef NDEBUG
// TODO this only works for default column family now
Version* version = column_family_set_->GetDefault()->current();
Version* version = c->column_family_data()->current();
if (c->input_version() != version) {
Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch");
}