Support multiple DB directories in universal compaction style

Summary:
This patch adds a target size parameter in options.db_paths and universal compaction will base it to determine which DB path to place a new file.
Level-style stays the same.

Test Plan: Add new unit tests

Reviewers: ljin, yhchiang

Reviewed By: yhchiang

Subscribers: MarkCallaghan, dhruba, igor, leveldb

Differential Revision: https://reviews.facebook.net/D19869
This commit is contained in:
sdong 2014-07-14 15:34:30 -07:00
parent 20c056306b
commit 0abaed2e08
13 changed files with 326 additions and 31 deletions

View File

@ -1,5 +1,13 @@
# Rocksdb Change Log
### Unrelease
### New Features
* Support Multiple DB paths in universal style compactions
### Public API changes
* DBOptions.db_paths now is a vector of a DBPath structure which indicates both of path and target size
## 3.3.0 (7/10/2014)
### New Features
* Added JSON API prototype.

View File

@ -635,6 +635,37 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
return c;
}
uint32_t UniversalCompactionPicker::GetPathId(const Options& options,
uint64_t file_size) {
// Two conditions need to be satisfied:
// (1) the target path needs to be able to hold the file's size
// (2) Total size left in this and previous paths need to be not
// smaller than expected future file size before this new file is
// compacted, which is estimated based on size_ratio.
// For example, if now we are compacting files of size (1, 1, 2, 4, 8),
// we will make sure the target file, probably with size of 16, will be
// placed in a path so that eventually when new files are generated and
// compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or
// before the path we chose.
//
// TODO(sdong): now the case of multiple column families is not
// considered in this algorithm. So the target size can be violated in
// that case. We need to improve it.
uint64_t accumulated_size = 0;
uint64_t future_size =
file_size * (100 - options.compaction_options_universal.size_ratio) / 100;
uint32_t p = 0;
for (; p < options.db_paths.size() - 1; p++) {
uint64_t target_size = options.db_paths[p].target_size;
if (target_size > file_size &&
accumulated_size + (target_size - file_size) > future_size) {
return p;
}
accumulated_size += target_size;
}
return p;
}
//
// Consider compaction files based on their size differences with
// the next file in time order.
@ -765,8 +796,15 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
}
}
}
uint64_t estimated_total_size = 0;
for (unsigned int i = 0; i < first_index_after; i++) {
estimated_total_size += files[i]->fd.GetFileSize();
}
uint32_t path_id = GetPathId(*options_, estimated_total_size);
Compaction* c = new Compaction(
version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, 0,
version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, path_id,
GetCompressionType(*options_, level, enable_compression));
c->score_ = score;
@ -865,11 +903,18 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
}
assert(start_index >= 0 && start_index < files.size() - 1);
// Estimate total file size
uint64_t estimated_total_size = 0;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
estimated_total_size += files[loop]->fd.GetFileSize();
}
uint32_t path_id = GetPathId(*options_, estimated_total_size);
// create a compaction request
// We always compact all the files, so always compress.
Compaction* c =
new Compaction(version, level, level, MaxFileSizeForLevel(level),
LLONG_MAX, 0, GetCompressionType(*options_, level));
LLONG_MAX, path_id, GetCompressionType(*options_, level));
c->score_ = score;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
f = c->input_version_->files_[level][loop];

View File

@ -145,6 +145,10 @@ class UniversalCompactionPicker : public CompactionPicker {
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score,
LogBuffer* log_buffer);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
static uint32_t GetPathId(const Options& options, uint64_t file_size);
};
class LevelCompactionPicker : public CompactionPicker {

View File

@ -299,7 +299,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
}
if (result.db_paths.size() == 0) {
result.db_paths.push_back(dbname);
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
}
return result;
@ -1105,8 +1105,8 @@ Status DBImpl::Recover(
return s;
}
for (auto db_path : options_.db_paths) {
s = env_->CreateDirIfMissing(db_path);
for (auto& db_path : options_.db_paths) {
s = env_->CreateDirIfMissing(db_path.path);
if (!s.ok()) {
return s;
}
@ -4609,8 +4609,18 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
if (db_options.db_paths.size() > 1) {
return Status::NotSupported(
"More than one DB paths are not supported yet. ");
for (auto& cfd : column_families) {
if (cfd.options.compaction_style != kCompactionStyleUniversal) {
return Status::NotSupported(
"More than one DB paths are only supported in "
"universal compaction style. ");
}
}
if (db_options.db_paths.size() > 4) {
return Status::NotSupported(
"More than four DB paths are not supported yet. ");
}
}
*dbptr = nullptr;
@ -4629,8 +4639,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
DBImpl* impl = new DBImpl(db_options, dbname);
Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
if (s.ok()) {
for (auto path : impl->options_.db_paths) {
s = impl->env_->CreateDirIfMissing(path);
for (auto db_path : impl->options_.db_paths) {
s = impl->env_->CreateDirIfMissing(db_path.path);
if (!s.ok()) {
break;
}
@ -4798,14 +4808,14 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
}
}
for (auto db_path : options.db_paths) {
env->GetChildren(db_path, &filenames);
for (auto& db_path : options.db_paths) {
env->GetChildren(db_path.path, &filenames);
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
Status del = env->DeleteFile(db_path + "/" + filenames[i]);
Status del = env->DeleteFile(db_path.path + "/" + filenames[i]);
if (result.ok() && !del.ok()) {
result = del;
}

View File

@ -370,8 +370,10 @@ class DBTest {
~DBTest() {
Close();
Options options;
options.db_paths.push_back(dbname_);
options.db_paths.push_back(dbname_ + "_2");
options.db_paths.emplace_back(dbname_, 0);
options.db_paths.emplace_back(dbname_ + "_2", 0);
options.db_paths.emplace_back(dbname_ + "_3", 0);
options.db_paths.emplace_back(dbname_ + "_4", 0);
ASSERT_OK(DestroyDB(dbname_, options));
delete env_;
delete filter_policy_;
@ -3474,10 +3476,206 @@ TEST(DBTest, UniversalCompactionCompressRatio2) {
TEST(DBTest, FailMoreDbPaths) {
Options options;
options.db_paths.push_back(dbname_);
options.db_paths.push_back(dbname_ + "_2");
options.db_paths.emplace_back(dbname_, 10000000);
options.db_paths.emplace_back(dbname_ + "_2", 1000000);
options.db_paths.emplace_back(dbname_ + "_3", 1000000);
options.db_paths.emplace_back(dbname_ + "_4", 1000000);
options.db_paths.emplace_back(dbname_ + "_5", 1000000);
ASSERT_TRUE(TryReopen(&options).IsNotSupported());
}
TEST(DBTest, UniversalCompactionSecondPathRatio) {
Options options;
options.db_paths.emplace_back(dbname_, 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 1024 * 1024 * 1024);
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
options = CurrentOptions(options);
std::vector<std::string> filenames;
env_->GetChildren(options.db_paths[1].path, &filenames);
// Delete archival files.
for (size_t i = 0; i < filenames.size(); ++i) {
env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
}
env_->DeleteDir(options.db_paths[1].path);
Reopen(&options);
Random rnd(301);
int key_idx = 0;
// First three 110KB files are not going to second path.
// After that, (100K, 200K)
for (int num = 0; num < 3; num++) {
GenerateNewFile(&rnd, &key_idx);
}
// Another 110KB triggers a compaction to 400K file to second path
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
// (1, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1,1,4) -> (2, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 2, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(2, GetSstFileCount(dbname_));
// (1, 1, 2, 4) -> (8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 1, 8) -> (2, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 2, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(2, GetSstFileCount(dbname_));
// (1, 1, 2, 8) -> (4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Reopen(&options);
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Destroy(&options);
}
TEST(DBTest, UniversalCompactionFourPaths) {
Options options;
options.db_paths.emplace_back(dbname_, 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
options = CurrentOptions(options);
std::vector<std::string> filenames;
env_->GetChildren(options.db_paths[1].path, &filenames);
// Delete archival files.
for (size_t i = 0; i < filenames.size(); ++i) {
env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
}
env_->DeleteDir(options.db_paths[1].path);
Reopen(&options);
Random rnd(301);
int key_idx = 0;
// First three 110KB files are not going to second path.
// After that, (100K, 200K)
for (int num = 0; num < 3; num++) {
GenerateNewFile(&rnd, &key_idx);
}
// Another 110KB triggers a compaction to 400K file to second path
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
// (1, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1,1,4) -> (2, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// (1, 2, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 1, 2, 4) -> (8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
// (1, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 1, 8) -> (2, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
// (1, 2, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
// (1, 1, 2, 8) -> (4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
// (1, 4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(1, GetSstFileCount(options.db_paths[3].path));
ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
ASSERT_EQ(1, GetSstFileCount(dbname_));
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Reopen(&options);
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Destroy(&options);
}
#endif
TEST(DBTest, ConvertCompactionStyle) {

View File

@ -71,14 +71,14 @@ std::string MakeTableFileName(const std::string& path, uint64_t number) {
return MakeFileName(path, number, "sst");
}
std::string TableFileName(const std::vector<std::string> db_paths,
uint64_t number, uint32_t path_id) {
std::string TableFileName(const std::vector<DbPath>& db_paths, uint64_t number,
uint32_t path_id) {
assert(number > 0);
std::string path;
if (path_id >= db_paths.size()) {
path = db_paths.back();
path = db_paths.back().path;
} else {
path = db_paths[path_id];
path = db_paths[path_id].path;
}
return MakeTableFileName(path, number);
}

View File

@ -58,7 +58,7 @@ extern std::string MakeTableFileName(const std::string& name, uint64_t number);
// Return the name of the sstable with the specified number
// in the db named by "dbname". The result will be prefixed with
// "dbname".
extern std::string TableFileName(const std::vector<std::string> db_paths,
extern std::string TableFileName(const std::vector<DbPath>& db_paths,
uint64_t number, uint32_t path_id);
extern std::string FormatFileNumber(uint64_t number, uint32_t path_id);

View File

@ -108,8 +108,9 @@ TEST(FileNameTest, Construction) {
ASSERT_EQ(192U, number);
ASSERT_EQ(kLogFile, type);
fname = TableFileName({"bar"}, 200, 0);
std::string fname1 = TableFileName({"foo", "bar"}, 200, 1);
fname = TableFileName({DbPath("bar", 0)}, 200, 0);
std::string fname1 =
TableFileName({DbPath("foo", 0), DbPath("bar", 0)}, 200, 1);
ASSERT_EQ(fname, fname1);
ASSERT_EQ("bar/", std::string(fname.data(), 4));
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));

View File

@ -126,7 +126,8 @@ class Repairer {
std::vector<std::string> filenames;
bool found_file = false;
for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) {
Status status = env_->GetChildren(options_.db_paths[path_id], &filenames);
Status status =
env_->GetChildren(options_.db_paths[path_id].path, &filenames);
if (!status.ok()) {
return status;
}

View File

@ -85,7 +85,7 @@ class TableCache {
private:
Env* const env_;
const std::vector<std::string> db_paths_;
const std::vector<DbPath> db_paths_;
const Options* options_;
const EnvOptions& storage_options_;
Cache* const cache_;

View File

@ -2857,10 +2857,10 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
filemetadata.column_family_name = cfd->GetName();
uint32_t path_id = file->fd.GetPathId();
if (path_id < options_->db_paths.size()) {
filemetadata.db_path = options_->db_paths[path_id];
filemetadata.db_path = options_->db_paths[path_id].path;
} else {
assert(!options_->db_paths.empty());
filemetadata.db_path = options_->db_paths.back();
filemetadata.db_path = options_->db_paths.back().path;
}
filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
filemetadata.level = level;

View File

@ -83,6 +83,14 @@ enum UpdateStatus { // Return status For inplace update callback
UPDATED = 2, // No inplace update. Merged value set
};
struct DbPath {
std::string path;
uint64_t target_size; // Target size of total files under the path, in byte.
DbPath() : target_size(0) {}
DbPath(const std::string& p, uint64_t t) : path(p), target_size(t) {}
};
struct Options;
struct ColumnFamilyOptions {
@ -677,12 +685,31 @@ struct DBOptions {
// This options is not used!!
int db_stats_log_interval;
// A list paths where SST files can be put into. A compaction style can
// determine which of those paths it will put the file to.
// A list of paths where SST files can be put into, with its target size.
// Newer data is placed into paths specified earlier in the vector while
// older data gradually moves to paths specified later in the vector.
//
// For example, you have a flash device with 10GB allocated for the DB,
// as well as a hard drive of 2TB, you should config it to be:
// [{"/flash_path", 10GB}, {"/hard_drive", 2TB}]
//
// The system will try to guarantee data under each path is close to but
// not larger than the target size. But current and future file sizes used
// by determining where to place a file are based on best-effort estimation,
// which means there is a chance that the actual size under the directory
// is slightly more than target size under some workloads. User should give
// some buffer room for those cases.
//
// If none of the paths has sufficient room to place a file, the file will
// be placed to the last path anyway, despite to the target size.
//
// Placing newer data to ealier paths is also best-efforts. User should
// expect user files to be placed in higher levels in some extreme cases.
//
// If left empty, only one path will be used, which is db_name passed when
// opening the DB.
// Default: empty
std::vector<std::string> db_paths;
std::vector<DbPath> db_paths;
// This specifies the info LOG dir.
// If it is empty, the log files will be in the same dir as data.

View File

@ -18,6 +18,7 @@
#include <ctime>
#include <dirent.h>
#include <limits>
#include <sstream>
#include <string>
#include <stdexcept>
@ -287,7 +288,7 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
}
if (opt.db_paths.size() == 0) {
opt.db_paths.push_back(db_path_);
opt.db_paths.emplace_back(db_path_, std::numeric_limits<uint64_t>::max());
}
return opt;