diff --git a/db/db_impl.h b/db/db_impl.h index e1354731e..cf7e7fa76 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -187,6 +187,8 @@ class DBImpl : public DB { Status SuggestCompactRange(ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end); + Status PromoteL0(ColumnFamilyHandle* column_family, int target_level); + #endif // ROCKSDB_LITE // checks if all live files exist on file system and that their file sizes diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index 8d4b176eb..e8d5b1561 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -13,9 +13,11 @@ #define __STDC_FORMAT_MACROS #endif +#include #include #include "db/column_family.h" +#include "db/job_context.h" #include "db/version_set.h" #include "rocksdb/status.h" @@ -54,6 +56,95 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family, } return Status::OK(); } + +Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { + assert(column_family); + + if (target_level < 1) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "PromoteL0 FAILED. Invalid target level %d\n", target_level); + return Status::InvalidArgument("Invalid target level"); + } + + Status status; + VersionEdit edit; + JobContext job_context(next_job_id_.fetch_add(1), true); + { + InstrumentedMutexLock l(&mutex_); + auto* cfd = static_cast(column_family)->cfd(); + const auto* vstorage = cfd->current()->storage_info(); + + if (target_level >= vstorage->num_levels()) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "PromoteL0 FAILED. Target level %d does not exist\n", target_level); + job_context.Clean(); + return Status::InvalidArgument("Target level does not exist"); + } + + // Sort L0 files by range. + const InternalKeyComparator* icmp = &cfd->internal_comparator(); + auto l0_files = vstorage->LevelFiles(0); + std::sort(l0_files.begin(), l0_files.end(), + [icmp](FileMetaData* f1, FileMetaData* f2) { + return icmp->Compare(f1->largest, f2->largest) < 0; + }); + + // Check that no L0 file is being compacted and that they have + // non-overlapping ranges. + for (size_t i = 0; i < l0_files.size(); ++i) { + auto f = l0_files[i]; + if (f->being_compacted) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "PromoteL0 FAILED. File %" PRIu64 " being compacted\n", + f->fd.GetNumber()); + job_context.Clean(); + return Status::InvalidArgument("PromoteL0 called during L0 compaction"); + } + + if (i == 0) continue; + auto prev_f = l0_files[i - 1]; + if (icmp->Compare(prev_f->largest, f->smallest) >= 0) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "PromoteL0 FAILED. Files %" PRIu64 " and %" PRIu64 + " have overlapping ranges\n", + prev_f->fd.GetNumber(), f->fd.GetNumber()); + job_context.Clean(); + return Status::InvalidArgument("L0 has overlapping files"); + } + } + + // Check that all levels up to target_level are empty. + for (int level = 1; level <= target_level; ++level) { + if (vstorage->NumLevelFiles(level) > 0) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "PromoteL0 FAILED. Level %d not empty\n", level); + job_context.Clean(); + return Status::InvalidArgument( + "All levels up to target_level " + "must be empty"); + } + } + + edit.SetColumnFamily(cfd->GetID()); + for (const auto& f : l0_files) { + edit.DeleteFile(0, f->fd.GetNumber()); + edit.AddFile(target_level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); + } + + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_, directories_.GetDbDir()); + if (status.ok()) { + InstallSuperVersionBackground(cfd, &job_context, + *cfd->GetLatestMutableCFOptions()); + } + } // lock released here + LogFlush(db_options_.info_log); + job_context.Clean(); + + return status; +} #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index 0ebde90b1..f94e0ce21 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12619,6 +12619,72 @@ TEST_F(DBTest, SuggestCompactRangeTest) { ASSERT_EQ("0,1,13", FilesPerLevel(0)); } +TEST_F(DBTest, PromoteL0) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.write_buffer_size = 10 * 1024 * 1024; + DestroyAndReopen(options); + + // non overlapping ranges + std::vector> ranges = { + {81, 160}, {0, 80}, {161, 240}, {241, 320}}; + + int32_t value_size = 10 * 1024; // 10 KB + + Random rnd(301); + std::map values; + for (const auto& range : ranges) { + for (int32_t j = range.first; j < range.second; j++) { + values[j] = RandomString(&rnd, value_size); + ASSERT_OK(Put(Key(j), values[j])); + } + ASSERT_OK(Flush()); + } + + int32_t level0_files = NumTableFilesAtLevel(0, 0); + ASSERT_EQ(level0_files, ranges.size()); + ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1 + + // Promote L0 level to L2. + ASSERT_OK(experimental::PromoteL0(db_, db_->DefaultColumnFamily(), 2)); + // We expect that all the files were trivially moved from L0 to L2 + ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); + ASSERT_EQ(NumTableFilesAtLevel(2, 0), level0_files); + + for (const auto& kv : values) { + ASSERT_EQ(Get(Key(kv.first)), kv.second); + } +} + +TEST_F(DBTest, PromoteL0Failure) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.write_buffer_size = 10 * 1024 * 1024; + DestroyAndReopen(options); + + // Produce two L0 files with overlapping ranges. + ASSERT_OK(Put(Key(0), "")); + ASSERT_OK(Put(Key(3), "")); + ASSERT_OK(Flush()); + ASSERT_OK(Put(Key(1), "")); + ASSERT_OK(Flush()); + + Status status; + // Fails because L0 has overlapping files. + status = experimental::PromoteL0(db_, db_->DefaultColumnFamily()); + ASSERT_TRUE(status.IsInvalidArgument()); + + ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + // Now there is a file in L1. + ASSERT_GE(NumTableFilesAtLevel(1, 0), 1); + + ASSERT_OK(Put(Key(5), "")); + ASSERT_OK(Flush()); + // Fails because L1 is non-empty. + status = experimental::PromoteL0(db_, db_->DefaultColumnFamily()); + ASSERT_TRUE(status.IsInvalidArgument()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/experimental.cc b/db/experimental.cc index 0056d0a57..0b5018aef 100644 --- a/db/experimental.cc +++ b/db/experimental.cc @@ -16,11 +16,20 @@ Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) { auto dbimpl = dynamic_cast(db); if (dbimpl == nullptr) { - return Status::NotSupported("Didn't recognize DB object"); + return Status::InvalidArgument("Didn't recognize DB object"); } + return dbimpl->SuggestCompactRange(column_family, begin, end); } +Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) { + auto dbimpl = dynamic_cast(db); + if (dbimpl == nullptr) { + return Status::InvalidArgument("Didn't recognize DB object"); + } + return dbimpl->PromoteL0(column_family, target_level); +} + #else // ROCKSDB_LITE Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, @@ -28,6 +37,10 @@ Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, return Status::NotSupported("Not supported in RocksDB LITE"); } +Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) { + return Status::NotSupported("Not supported in RocksDB LITE"); +} + #endif // ROCKSDB_LITE Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) { diff --git a/include/rocksdb/experimental.h b/include/rocksdb/experimental.h index 35e7c240d..1d02e0238 100644 --- a/include/rocksdb/experimental.h +++ b/include/rocksdb/experimental.h @@ -16,5 +16,14 @@ Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end); Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end); +// Move all L0 files to target_level skipping compaction. +// This operation succeeds only if the files in L0 have disjoint ranges; this +// is guaranteed to happen, for instance, if keys are inserted in sorted +// order. Furthermore, all levels between 1 and target_level must be empty. +// If any of the above condition is violated, InvalidArgument will be +// returned. +Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, + int target_level = 1); + } // namespace experimental } // namespace rocksdb