RocksDB: Allow Level-Style Compaction to Place Files in Different Paths

Summary:
Allow Level-style compaction to place files in different paths
This diff provides the code for task 4854591. We now support level-compaction
to place files in different paths by specifying  them in db_paths  along with
the minimum level for files to store in that path.

Test Plan: ManualLevelCompactionOutputPathId in db_test.cc

Reviewers: yhchiang, MarkCallaghan, dhruba, yoshinorim, sdong

Reviewed By: sdong

Subscribers: yoshinorim, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D29799
This commit is contained in:
Venkatesh Radhakrishnan 2014-12-15 21:48:16 -08:00
parent 06eed650a0
commit 153f4f0719
7 changed files with 394 additions and 9 deletions

View File

@ -17,6 +17,10 @@
* New API LinkFile added to Env. If you implement your own Env class, an
implementation of the API LinkFile will have to be provided.
* MemTableRep takes MemTableAllocator instead of Arena
* We now allow level-compaction to place files in different paths by
specifying them in db_paths along with the target_size.
Lower numbered levels will be placed earlier in the db_paths and higher
numbered levels will be placed later in the db_paths vector.
### Improvements
* RocksDBLite library now becomes smaller and will be compiled with -fno-exceptions flag.

View File

@ -769,6 +769,44 @@ Compaction* LevelCompactionPicker::PickCompaction(
return c;
}
/*
* Find the optimal path to place a file
* Given a level, finds the path where levels up to it will fit in levels
* up to and including this path
*/
uint32_t LevelCompactionPicker::GetPathId(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, int level) {
uint32_t p = 0;
assert(!ioptions.db_paths.empty());
// size remaining in the most recent path
uint64_t current_path_size = ioptions.db_paths[0].target_size;
uint64_t level_size;
int cur_level = 0;
level_size = mutable_cf_options.max_bytes_for_level_base;
// Last path is the fallback
while (p < ioptions.db_paths.size() - 1) {
if (level_size <= current_path_size) {
if (cur_level == level) {
// Does desired level fit in this path?
return p;
} else {
current_path_size -= level_size;
level_size *= mutable_cf_options.max_bytes_for_level_multiplier;
cur_level++;
continue;
}
}
p++;
current_path_size = ioptions.db_paths[p].target_size;
}
return p;
}
Compaction* LevelCompactionPicker::PickCompactionBySize(
const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage,
int level, double score) {
@ -786,7 +824,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(
assert(level + 1 < NumberLevels());
c = new Compaction(vstorage->num_levels(), level, level + 1,
mutable_cf_options.MaxFileSizeForLevel(level + 1),
mutable_cf_options.MaxGrandParentOverlapBytes(level), 0,
mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, level + 1),
GetCompressionType(ioptions_, level + 1));
c->score_ = score;
@ -960,6 +999,7 @@ uint32_t UniversalCompactionPicker::GetPathId(
uint64_t future_size = file_size *
(100 - ioptions.compaction_options_universal.size_ratio) / 100;
uint32_t p = 0;
assert(!ioptions.db_paths.empty());
for (; p < ioptions.db_paths.size() - 1; p++) {
uint64_t target_size = ioptions.db_paths[p].target_size;
if (target_size > file_size &&

View File

@ -188,6 +188,11 @@ class LevelCompactionPicker : public CompactionPicker {
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const
override;
// Pick a path ID to place a newly generated file, with its level
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options,
int level);
private:
// For the specfied level, pick a compaction.
// Returns nullptr if there is no compaction to be done.

View File

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/compaction_picker.h"
#include <limits>
#include <string>
#include "util/logging.h"
#include "util/testharness.h"
@ -47,6 +48,8 @@ class CompactionPickerTest {
fifo_options_.max_table_files_size = 1;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
size_being_compacted_.resize(options_.num_levels);
ioptions_.db_paths.emplace_back("dummy",
std::numeric_limits<uint64_t>::max());
}
~CompactionPickerTest() {

View File

@ -67,6 +67,7 @@
#include "util/build_version.h"
#include "util/coding.h"
#include "util/db_info_dumper.h"
#include "util/file_util.h"
#include "util/hash_skiplist_rep.h"
#include "util/hash_linklist_rep.h"
#include "util/logging.h"
@ -2059,10 +2060,31 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
FileMetaData ftemp;
uint64_t fdnum = f->fd.GetNumber();
uint32_t fdpath = f->fd.GetPathId();
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
// Need to move file if file is to be stored in a new path
if (c->GetOutputPathId() != f->fd.GetPathId()) {
fdnum = versions_->NewFileNumber();
std::string source = TableFileName(db_options_.db_paths,
f->fd.GetNumber(), f->fd.GetPathId());
std::string destination =
TableFileName(db_options_.db_paths, fdnum, c->GetOutputPathId());
Status s = CopyFile(env_, source, destination, 0);
if (s.ok()) {
fdpath = c->GetOutputPathId();
} else {
fdnum = f->fd.GetNumber();
if (!s.IsShutdownInProgress()) {
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"Compaction error: %s", s.ToString().c_str());
}
}
}
c->edit()->AddFile(c->level() + 1, fdnum, fdpath, f->fd.GetFileSize(),
f->smallest, f->largest, f->smallest_seqno,
f->largest_seqno);
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(),
c->edit(), &mutex_, db_directory_.get());
@ -3519,12 +3541,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (!s.ok()) {
return s;
}
if (db_options.db_paths.size() > 1) {
for (auto& cfd : column_families) {
if (cfd.options.compaction_style != kCompactionStyleUniversal) {
if ((cfd.options.compaction_style != kCompactionStyleUniversal) &&
(cfd.options.compaction_style != kCompactionStyleLevel)) {
return Status::NotSupported(
"More than one DB paths are only supported in "
"universal compaction style. ");
"universal and level compaction styles. ");
}
}

View File

@ -1004,6 +1004,12 @@ class DBTest {
return size;
}
void Compact(int cf, const Slice& start, const Slice& limit,
uint32_t target_path_id) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit, false, -1,
target_path_id));
}
void Compact(int cf, const Slice& start, const Slice& limit) {
ASSERT_OK(db_->CompactRange(handles_[cf], &start, &limit));
}
@ -4087,6 +4093,233 @@ TEST(DBTest, UniversalCompactionSecondPathRatio) {
Destroy(options);
}
TEST(DBTest, LevelCompactionThirdPath) {
Options options;
options.db_paths.emplace_back(dbname_, 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024;
// options = CurrentOptions(options);
std::vector<std::string> filenames;
env_->GetChildren(options.db_paths[1].path, &filenames);
// Delete archival files.
for (size_t i = 0; i < filenames.size(); ++i) {
env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
}
env_->DeleteDir(options.db_paths[1].path);
Reopen(options);
Random rnd(301);
int key_idx = 0;
// First three 110KB files are not going to second path.
// After that, (100K, 200K)
for (int num = 0; num < 3; num++) {
GenerateNewFile(&rnd, &key_idx);
}
// Another 110KB triggers a compaction to 400K file to fill up first path
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path));
// (1, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4", FilesPerLevel(0));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 1)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,1", FilesPerLevel(0));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 2)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,2", FilesPerLevel(0));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 3)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,3", FilesPerLevel(0));
ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,4", FilesPerLevel(0));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 5)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,5", FilesPerLevel(0));
ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 6)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,6", FilesPerLevel(0));
ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 7)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,7", FilesPerLevel(0));
ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,8", FilesPerLevel(0));
ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Reopen(options);
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Destroy(options);
}
TEST(DBTest, LevelCompactionPathUse) {
Options options;
options.db_paths.emplace_back(dbname_, 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024;
// options = CurrentOptions(options);
std::vector<std::string> filenames;
env_->GetChildren(options.db_paths[1].path, &filenames);
// Delete archival files.
for (size_t i = 0; i < filenames.size(); ++i) {
env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
}
env_->DeleteDir(options.db_paths[1].path);
Reopen(options);
Random rnd(301);
int key_idx = 0;
// Always gets compacted into 1 Level1 file,
// 0/1 Level 0 file
for (int num = 0; num < 3; num++) {
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
}
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,1", FilesPerLevel(0));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("0,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,1", FilesPerLevel(0));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Reopen(options);
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Destroy(options);
}
TEST(DBTest, UniversalCompactionFourPaths) {
Options options;
options.db_paths.emplace_back(dbname_, 300 * 1024);
@ -5953,6 +6186,75 @@ TEST(DBTest, ManualCompactionOutputPathId) {
.IsInvalidArgument());
}
TEST(DBTest, ManualLevelCompactionOutputPathId) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
options.max_background_flushes = 1;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
// iter - 0 with 7 levels
// iter - 1 with 3 levels
for (int iter = 0; iter < 2; ++iter) {
MakeTables(3, "p", "q", 1);
ASSERT_EQ("3", FilesPerLevel(1));
ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// Compaction range falls before files
Compact(1, "", "c");
ASSERT_EQ("3", FilesPerLevel(1));
// Compaction range falls after files
Compact(1, "r", "z");
ASSERT_EQ("3", FilesPerLevel(1));
// Compaction range overlaps files
Compact(1, "p1", "p9", 1);
ASSERT_EQ("0,1", FilesPerLevel(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// Populate a different range
MakeTables(3, "c", "e", 1);
ASSERT_EQ("3,1", FilesPerLevel(1));
// Compact just the new range
Compact(1, "b", "f", 1);
ASSERT_EQ("0,2", FilesPerLevel(1));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// Compact all
MakeTables(1, "a", "z", 1);
ASSERT_EQ("1,2", FilesPerLevel(1));
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
db_->CompactRange(handles_[1], nullptr, nullptr, false, 1, 1);
ASSERT_EQ("0,1", FilesPerLevel(1));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
if (iter == 0) {
DestroyAndReopen(options);
options = CurrentOptions();
options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
options.max_background_flushes = 1;
options.num_levels = 3;
options.create_if_missing = true;
CreateAndReopenWithCF({"pikachu"}, options);
}
}
}
TEST(DBTest, DBOpen_Options) {
Options options = CurrentOptions();
std::string dbname = test::TmpDir(env_) + "/db_options_test";

View File

@ -40,6 +40,7 @@
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/file_util.h"
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
@ -194,6 +195,12 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->storage_info()->PickLevelForMemTableOutput(
mutable_cf_options_, min_user_key, max_user_key);
// If level does not match path id, reset level back to 0
uint32_t fdpath = LevelCompactionPicker::GetPathId(
*cfd_->ioptions(), mutable_cf_options_, level);
if (fdpath != 0) {
level = 0;
}
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,