Implement DB::PromoteL0 method
Summary: This diff implements a new `DB` method `PromoteL0` which moves all files in L0 to a given level skipping compaction, provided that the files have disjoint ranges and all levels up to the target level are empty. This method provides finer-grain control for trivial compactions, and it is useful for bulk-loading pre-sorted keys. Compared to D34797, it does not change the semantics of an existing operation, which can impact existing code. PromoteL0 is designed to work well in combination with the proposed `GetSstFileWriter`/`AddFile` interface, enabling to "design" the level structure by populating one level at a time. Such fine-grained control can be very useful for static or mostly-static databases. Test Plan: `make check` Reviewers: IslamAbdelRahman, philipp, MarkCallaghan, yhchiang, igor, sdong Reviewed By: sdong Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D37107
This commit is contained in:
parent
9bf40b64d0
commit
2dc421df48
@ -187,6 +187,8 @@ class DBImpl : public DB {
|
||||
Status SuggestCompactRange(ColumnFamilyHandle* column_family,
|
||||
const Slice* begin, const Slice* end);
|
||||
|
||||
Status PromoteL0(ColumnFamilyHandle* column_family, int target_level);
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// checks if all live files exist on file system and that their file sizes
|
||||
|
@ -13,9 +13,11 @@
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#endif
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <vector>
|
||||
|
||||
#include "db/column_family.h"
|
||||
#include "db/job_context.h"
|
||||
#include "db/version_set.h"
|
||||
#include "rocksdb/status.h"
|
||||
|
||||
@ -54,6 +56,95 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family,
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
|
||||
assert(column_family);
|
||||
|
||||
if (target_level < 1) {
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"PromoteL0 FAILED. Invalid target level %d\n", target_level);
|
||||
return Status::InvalidArgument("Invalid target level");
|
||||
}
|
||||
|
||||
Status status;
|
||||
VersionEdit edit;
|
||||
JobContext job_context(next_job_id_.fetch_add(1), true);
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
||||
const auto* vstorage = cfd->current()->storage_info();
|
||||
|
||||
if (target_level >= vstorage->num_levels()) {
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"PromoteL0 FAILED. Target level %d does not exist\n", target_level);
|
||||
job_context.Clean();
|
||||
return Status::InvalidArgument("Target level does not exist");
|
||||
}
|
||||
|
||||
// Sort L0 files by range.
|
||||
const InternalKeyComparator* icmp = &cfd->internal_comparator();
|
||||
auto l0_files = vstorage->LevelFiles(0);
|
||||
std::sort(l0_files.begin(), l0_files.end(),
|
||||
[icmp](FileMetaData* f1, FileMetaData* f2) {
|
||||
return icmp->Compare(f1->largest, f2->largest) < 0;
|
||||
});
|
||||
|
||||
// Check that no L0 file is being compacted and that they have
|
||||
// non-overlapping ranges.
|
||||
for (size_t i = 0; i < l0_files.size(); ++i) {
|
||||
auto f = l0_files[i];
|
||||
if (f->being_compacted) {
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"PromoteL0 FAILED. File %" PRIu64 " being compacted\n",
|
||||
f->fd.GetNumber());
|
||||
job_context.Clean();
|
||||
return Status::InvalidArgument("PromoteL0 called during L0 compaction");
|
||||
}
|
||||
|
||||
if (i == 0) continue;
|
||||
auto prev_f = l0_files[i - 1];
|
||||
if (icmp->Compare(prev_f->largest, f->smallest) >= 0) {
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"PromoteL0 FAILED. Files %" PRIu64 " and %" PRIu64
|
||||
" have overlapping ranges\n",
|
||||
prev_f->fd.GetNumber(), f->fd.GetNumber());
|
||||
job_context.Clean();
|
||||
return Status::InvalidArgument("L0 has overlapping files");
|
||||
}
|
||||
}
|
||||
|
||||
// Check that all levels up to target_level are empty.
|
||||
for (int level = 1; level <= target_level; ++level) {
|
||||
if (vstorage->NumLevelFiles(level) > 0) {
|
||||
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
|
||||
"PromoteL0 FAILED. Level %d not empty\n", level);
|
||||
job_context.Clean();
|
||||
return Status::InvalidArgument(
|
||||
"All levels up to target_level "
|
||||
"must be empty");
|
||||
}
|
||||
}
|
||||
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
for (const auto& f : l0_files) {
|
||||
edit.DeleteFile(0, f->fd.GetNumber());
|
||||
edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->smallest_seqno, f->largest_seqno);
|
||||
}
|
||||
|
||||
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
|
||||
&edit, &mutex_, directories_.GetDbDir());
|
||||
if (status.ok()) {
|
||||
InstallSuperVersionBackground(cfd, &job_context,
|
||||
*cfd->GetLatestMutableCFOptions());
|
||||
}
|
||||
} // lock released here
|
||||
LogFlush(db_options_.info_log);
|
||||
job_context.Clean();
|
||||
|
||||
return status;
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -12619,6 +12619,72 @@ TEST_F(DBTest, SuggestCompactRangeTest) {
|
||||
ASSERT_EQ("0,1,13", FilesPerLevel(0));
|
||||
}
|
||||
|
||||
TEST_F(DBTest, PromoteL0) {
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 10 * 1024 * 1024;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
// non overlapping ranges
|
||||
std::vector<std::pair<int32_t, int32_t>> ranges = {
|
||||
{81, 160}, {0, 80}, {161, 240}, {241, 320}};
|
||||
|
||||
int32_t value_size = 10 * 1024; // 10 KB
|
||||
|
||||
Random rnd(301);
|
||||
std::map<int32_t, std::string> values;
|
||||
for (const auto& range : ranges) {
|
||||
for (int32_t j = range.first; j < range.second; j++) {
|
||||
values[j] = RandomString(&rnd, value_size);
|
||||
ASSERT_OK(Put(Key(j), values[j]));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
|
||||
int32_t level0_files = NumTableFilesAtLevel(0, 0);
|
||||
ASSERT_EQ(level0_files, ranges.size());
|
||||
ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
|
||||
|
||||
// Promote L0 level to L2.
|
||||
ASSERT_OK(experimental::PromoteL0(db_, db_->DefaultColumnFamily(), 2));
|
||||
// We expect that all the files were trivially moved from L0 to L2
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
|
||||
ASSERT_EQ(NumTableFilesAtLevel(2, 0), level0_files);
|
||||
|
||||
for (const auto& kv : values) {
|
||||
ASSERT_EQ(Get(Key(kv.first)), kv.second);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DBTest, PromoteL0Failure) {
|
||||
Options options = CurrentOptions();
|
||||
options.disable_auto_compactions = true;
|
||||
options.write_buffer_size = 10 * 1024 * 1024;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
// Produce two L0 files with overlapping ranges.
|
||||
ASSERT_OK(Put(Key(0), ""));
|
||||
ASSERT_OK(Put(Key(3), ""));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put(Key(1), ""));
|
||||
ASSERT_OK(Flush());
|
||||
|
||||
Status status;
|
||||
// Fails because L0 has overlapping files.
|
||||
status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
|
||||
ASSERT_TRUE(status.IsInvalidArgument());
|
||||
|
||||
ASSERT_OK(db_->CompactRange(nullptr, nullptr));
|
||||
// Now there is a file in L1.
|
||||
ASSERT_GE(NumTableFilesAtLevel(1, 0), 1);
|
||||
|
||||
ASSERT_OK(Put(Key(5), ""));
|
||||
ASSERT_OK(Flush());
|
||||
// Fails because L1 is non-empty.
|
||||
status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
|
||||
ASSERT_TRUE(status.IsInvalidArgument());
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -16,11 +16,20 @@ Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
|
||||
const Slice* begin, const Slice* end) {
|
||||
auto dbimpl = dynamic_cast<DBImpl*>(db);
|
||||
if (dbimpl == nullptr) {
|
||||
return Status::NotSupported("Didn't recognize DB object");
|
||||
return Status::InvalidArgument("Didn't recognize DB object");
|
||||
}
|
||||
|
||||
return dbimpl->SuggestCompactRange(column_family, begin, end);
|
||||
}
|
||||
|
||||
Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) {
|
||||
auto dbimpl = dynamic_cast<DBImpl*>(db);
|
||||
if (dbimpl == nullptr) {
|
||||
return Status::InvalidArgument("Didn't recognize DB object");
|
||||
}
|
||||
return dbimpl->PromoteL0(column_family, target_level);
|
||||
}
|
||||
|
||||
#else // ROCKSDB_LITE
|
||||
|
||||
Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
|
||||
@ -28,6 +37,10 @@ Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
|
||||
return Status::NotSupported("Not supported in RocksDB LITE");
|
||||
}
|
||||
|
||||
Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) {
|
||||
return Status::NotSupported("Not supported in RocksDB LITE");
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) {
|
||||
|
@ -16,5 +16,14 @@ Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family,
|
||||
const Slice* begin, const Slice* end);
|
||||
Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end);
|
||||
|
||||
// Move all L0 files to target_level skipping compaction.
|
||||
// This operation succeeds only if the files in L0 have disjoint ranges; this
|
||||
// is guaranteed to happen, for instance, if keys are inserted in sorted
|
||||
// order. Furthermore, all levels between 1 and target_level must be empty.
|
||||
// If any of the above condition is violated, InvalidArgument will be
|
||||
// returned.
|
||||
Status PromoteL0(DB* db, ColumnFamilyHandle* column_family,
|
||||
int target_level = 1);
|
||||
|
||||
} // namespace experimental
|
||||
} // namespace rocksdb
|
||||
|
Loading…
Reference in New Issue
Block a user