From 4d2ba38b6508f464e0e8b295ffe7ed7ee4752386 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 31 Oct 2014 08:48:19 -0700 Subject: [PATCH] Make VersionBuilder unit testable Summary: Rename Version::Builder to VersionBuilder and expose its definition to a header. Make VerisonBuilder not reference Version or ColumnFamilyData, only working with VersionStorageInfo. Add version_builder_test which has a simple test. Test Plan: make all check Reviewers: rven, yhchiang, igor, ljin Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D27969 --- Makefile | 4 + db/column_family.cc | 6 +- db/compaction.cc | 4 +- db/compaction_picker_test.cc | 7 +- db/db_impl.cc | 23 +- db/db_impl_debug.cc | 7 +- db/flush_job.cc | 2 +- db/forward_iterator.cc | 6 +- db/internal_stats.cc | 6 +- db/version_builder.h | 40 +++ db/version_builder_test.cc | 123 +++++++ db/version_edit.h | 9 +- db/version_set.cc | 346 +++++++++++--------- db/version_set.h | 42 +-- util/ldb_cmd.cc | 2 +- utilities/compacted_db/compacted_db_impl.cc | 2 +- 16 files changed, 421 insertions(+), 208 deletions(-) create mode 100644 db/version_builder.h create mode 100644 db/version_builder_test.cc diff --git a/Makefile b/Makefile index 8642834b8..5ed8a5a67 100644 --- a/Makefile +++ b/Makefile @@ -133,6 +133,7 @@ TESTS = \ version_edit_test \ version_set_test \ compaction_picker_test \ + version_builder_test \ file_indexer_test \ write_batch_test \ write_controller_test\ @@ -464,6 +465,9 @@ version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) compaction_picker_test: db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +version_builder_test: db/version_builder_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/version_builder_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + file_indexer_test : db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/column_family.cc b/db/column_family.cc index e6298692a..b7497ecfe 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -324,7 +324,7 @@ ColumnFamilyData::~ColumnFamilyData() { void ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { if (current_ != nullptr) { - auto* vstorage = current_->GetStorageInfo(); + auto* vstorage = current_->storage_info(); const double score = vstorage->MaxCompactionScore(); const int max_level = vstorage->MaxCompactionScoreLevel(); @@ -405,7 +405,7 @@ void ColumnFamilyData::CreateNewMemtable( Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { auto* result = compaction_picker_->PickCompaction( - GetName(), mutable_options, current_->GetStorageInfo(), log_buffer); + GetName(), mutable_options, current_->storage_info(), log_buffer); if (result != nullptr) { result->SetInputVersion(current_); } @@ -418,7 +418,7 @@ Compaction* ColumnFamilyData::CompactRange( const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { auto* result = compaction_picker_->CompactRange( - GetName(), mutable_cf_options, current_->GetStorageInfo(), input_level, + GetName(), mutable_cf_options, current_->storage_info(), input_level, output_level, output_path_id, begin, end, compaction_end); if (result != nullptr) { result->SetInputVersion(current_); diff --git a/db/compaction.cc b/db/compaction.cc index a739da29e..6c76012db 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -129,7 +129,7 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { const Comparator* user_cmp = cfd_->user_comparator(); for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) { const std::vector& files = - input_version_->GetStorageInfo()->LevelFiles(lvl); + input_version_->storage_info()->LevelFiles(lvl); for (; level_ptrs_[lvl] < files.size(); ) { FileMetaData* f = files[level_ptrs_[lvl]]; if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { @@ -228,7 +228,7 @@ void Compaction::ReleaseCompactionFiles(Status status) { void Compaction::ResetNextCompactionIndex() { assert(input_version_ != nullptr); - input_version_->GetStorageInfo()->ResetNextCompactionIndex(start_level_); + input_version_->storage_info()->ResetNextCompactionIndex(start_level_); } namespace { diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index c302d2a2a..f094fbafb 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -50,9 +50,8 @@ class CompactionPickerTest { } ~CompactionPickerTest() { - auto* files = vstorage.GetFiles(); for (int i = 0; i < vstorage.NumberLevels(); i++) { - for (auto* f : files[i]) { + for (auto* f : vstorage.LevelFiles(i)) { delete f; } } @@ -63,13 +62,13 @@ class CompactionPickerTest { SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100) { assert(level < vstorage.NumberLevels()); - auto& files = vstorage.GetFiles()[level]; FileMetaData* f = new FileMetaData; f->fd = FileDescriptor(file_number, path_id, file_size); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue); f->compensated_file_size = file_size; - files.push_back(f); + f->refs = 0; + vstorage.MaybeAddFile(level, f); } void UpdateVersionStorageInfo() { diff --git a/db/db_impl.cc b/db/db_impl.cc index 78fb4ce13..231325cc3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1181,7 +1181,7 @@ Status DBImpl::FlushMemTableToOutputFile( } VersionStorageInfo::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), - cfd->current()->GetStorageInfo()->LevelSummary(&tmp)); + cfd->current()->storage_info()->LevelSummary(&tmp)); if (disable_delete_obsolete_files_ == 0) { // add to deletion state @@ -1227,7 +1227,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, MutexLock l(&mutex_); Version* base = cfd->current(); for (int level = 1; level < cfd->NumberLevels(); level++) { - if (base->GetStorageInfo()->OverlapInLevel(level, begin, end)) { + if (base->storage_info()->OverlapInLevel(level, begin, end)) { max_level_with_files = level; } } @@ -1305,7 +1305,7 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family, int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, int level) { mutex_.AssertHeld(); - auto* vstorage = cfd->current()->GetStorageInfo(); + const auto* vstorage = cfd->current()->storage_info(); int minimum_level = level; for (int i = level - 1; i > 0; --i) { // stop if level i is not empty @@ -1364,7 +1364,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); - for (const auto& f : cfd->current()->GetStorageInfo()->files_[level]) { + for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) { edit.DeleteFile(level, f->fd.GetNumber()); edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, @@ -1580,7 +1580,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bool is_compaction_needed = false; // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->current()->GetStorageInfo()->NeedsCompaction()) { + if (cfd->current()->storage_info()->NeedsCompaction()) { is_compaction_needed = true; break; } @@ -1956,7 +1956,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, " bytes %s: %s\n", c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(), - c->input_version()->GetStorageInfo()->LevelSummary(&tmp)); + c->input_version()->storage_info()->LevelSummary(&tmp)); c->ReleaseCompactionFiles(status); *madeProgress = true; } else { @@ -2688,7 +2688,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, LogToBuffer(log_buffer, "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); - assert(cfd->current()->GetStorageInfo()->NumLevelFiles( + assert(cfd->current()->storage_info()->NumLevelFiles( compact->compaction->level()) > 0); assert(compact->builder == nullptr); assert(!compact->outfile); @@ -2934,7 +2934,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", cfd->GetName().c_str(), - cfd->current()->GetStorageInfo()->LevelSummary(&tmp), + cfd->current()->storage_info()->LevelSummary(&tmp), (stats.bytes_readn + stats.bytes_readnp1) / static_cast(stats.micros), stats.bytes_written / static_cast(stats.micros), @@ -4040,7 +4040,7 @@ Status DBImpl::DeleteFile(std::string name) { // Only the files in the last level can be deleted externally. // This is to make sure that any deletion tombstones are not // lost. Check that the level passed is the last level. - auto* vstoreage = cfd->current()->GetStorageInfo(); + auto* vstoreage = cfd->current()->storage_info(); for (int i = level + 1; i < cfd->NumberLevels(); i++) { if (vstoreage->NumLevelFiles(i) != 0) { Log(db_options_.info_log, @@ -4049,7 +4049,8 @@ Status DBImpl::DeleteFile(std::string name) { } } // if level == 0, it has to be the oldest file - if (level == 0 && vstoreage->files_[0].back()->fd.GetNumber() != number) { + if (level == 0 && + vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) { return Status::InvalidArgument("File in level 0, but not oldest"); } edit.SetColumnFamily(cfd->GetID()); @@ -4302,7 +4303,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, for (auto cfd : *impl->versions_->GetColumnFamilySet()) { if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - auto* vstorage = cfd->current()->GetStorageInfo(); + auto* vstorage = cfd->current()->storage_info(); for (int i = 1; i < vstorage->NumberLevels(); ++i) { int num_files = vstorage->NumLevelFiles(i); if (num_files > 0) { diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 2d67167ba..283f9393f 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -15,8 +15,7 @@ namespace rocksdb { uint64_t DBImpl::TEST_GetLevel0TotalSize() { MutexLock l(&mutex_); - return default_cf_handle_->cfd()->current()->GetStorageInfo()->NumLevelBytes( - 0); + return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0); } Iterator* DBImpl::TEST_NewInternalIterator(Arena* arena, @@ -46,7 +45,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( cfd = cfh->cfd(); } MutexLock l(&mutex_); - return cfd->current()->GetStorageInfo()->MaxNextLevelOverlappingBytes(); + return cfd->current()->storage_info()->MaxNextLevelOverlappingBytes(); } void DBImpl::TEST_GetFilesMetaData( @@ -58,7 +57,7 @@ void DBImpl::TEST_GetFilesMetaData( metadata->resize(NumberLevels()); for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = - cfd->current()->GetStorageInfo()->LevelFiles(level); + cfd->current()->storage_info()->LevelFiles(level); (*metadata)[level].clear(); for (const auto& f : files) { diff --git a/db/flush_job.cc b/db/flush_job.cc index fda80cea8..c477a5e8d 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -202,7 +202,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, if (base != nullptr && db_options_.max_background_compactions <= 1 && db_options_.max_background_flushes == 0 && cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { - level = base->GetStorageInfo()->PickLevelForMemTableOutput( + level = base->storage_info()->PickLevelForMemTableOutput( mutable_cf_options_, min_user_key, max_user_key); } edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 88415e5b8..154af1147 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -220,7 +220,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (!seek_to_first) { user_key = ExtractUserKey(internal_key); } - VersionStorageInfo* vstorage = sv_->current->GetStorageInfo(); + const VersionStorageInfo* vstorage = sv_->current->storage_info(); const std::vector& l0 = vstorage->LevelFiles(0); for (uint32_t i = 0; i < l0.size(); ++i) { if (seek_to_first) { @@ -430,7 +430,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); - auto* vstorage = sv_->current->GetStorageInfo(); + const auto* vstorage = sv_->current->storage_info(); const auto& l0_files = vstorage->LevelFiles(0); l0_iters_.reserve(l0_files.size()); for (const auto* l0 : l0_files) { @@ -454,7 +454,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { } void ForwardIterator::ResetIncompleteIterators() { - const auto& l0_files = sv_->current->GetStorageInfo()->LevelFiles(0); + const auto& l0_files = sv_->current->storage_info()->LevelFiles(0); for (uint32_t i = 0; i < l0_iters_.size(); ++i) { assert(i < l0_files.size()); if (!l0_iters_[i]->status().IsIncomplete()) { diff --git a/db/internal_stats.cc b/db/internal_stats.cc index ca0a8d62c..1440dbe42 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -170,7 +170,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type, std::string* value) { assert(value != nullptr); auto* current = cfd_->current(); - auto* vstorage = current->GetStorageInfo(); + const auto* vstorage = current->storage_info(); Slice in = property; switch (property_type) { @@ -230,7 +230,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type, bool InternalStats::GetIntProperty(DBPropertyType property_type, uint64_t* value, DBImpl* db) const { - auto* vstorage = cfd_->current()->GetStorageInfo(); + const auto* vstorage = cfd_->current()->storage_info(); switch (property_type) { case kNumImmutableMemTable: @@ -366,7 +366,7 @@ void InternalStats::DumpDBStats(std::string* value) { } void InternalStats::DumpCFStats(std::string* value) { - VersionStorageInfo* vstorage = cfd_->current()->GetStorageInfo(); + const VersionStorageInfo* vstorage = cfd_->current()->storage_info(); int num_levels_to_check = (cfd_->options()->compaction_style != kCompactionStyleUniversal && diff --git a/db/version_builder.h b/db/version_builder.h new file mode 100644 index 000000000..f8c91a88c --- /dev/null +++ b/db/version_builder.h @@ -0,0 +1,40 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +#pragma once +#include "rocksdb/env.h" + +namespace rocksdb { + +class TableCache; +class VersionStorageInfo; +class VersionEdit; +class FileMetaData; + +// A helper class so we can efficiently apply a whole sequence +// of edits to a particular state without creating intermediate +// Versions that contain full copies of the intermediate state. +class VersionBuilder { + public: + VersionBuilder(const EnvOptions& env_options, TableCache* table_cache, + VersionStorageInfo* base_vstorage); + ~VersionBuilder(); + void CheckConsistency(VersionStorageInfo* vstorage); + void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, + int level); + void Apply(VersionEdit* edit); + void SaveTo(VersionStorageInfo* vstorage); + void LoadTableHandlers(); + void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); + + private: + class Rep; + Rep* rep_; +}; +} // namespace rocksdb diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc new file mode 100644 index 000000000..e11f78eb1 --- /dev/null +++ b/db/version_builder_test.cc @@ -0,0 +1,123 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include "db/version_edit.h" +#include "db/version_set.h" +#include "util/logging.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +class VersionBuilderTest { + public: + const Comparator* ucmp; + InternalKeyComparator icmp; + Options options; + ImmutableCFOptions ioptions; + MutableCFOptions mutable_cf_options; + VersionStorageInfo vstorage; + uint32_t file_num; + CompactionOptionsFIFO fifo_options; + std::vector size_being_compacted; + + VersionBuilderTest() + : ucmp(BytewiseComparator()), + icmp(ucmp), + ioptions(options), + mutable_cf_options(options, ioptions), + vstorage(&icmp, ucmp, options.num_levels, kCompactionStyleLevel, + nullptr), + file_num(1) { + mutable_cf_options.RefreshDerivedOptions(ioptions); + size_being_compacted.resize(options.num_levels); + } + + ~VersionBuilderTest() { + for (int i = 0; i < vstorage.NumberLevels(); i++) { + for (auto* f : vstorage.LevelFiles(i)) { + if (--f->refs == 0) { + delete f; + } + } + } + } + + InternalKey GetInternalKey(const char* ukey, + SequenceNumber smallest_seq = 100) { + return InternalKey(ukey, smallest_seq, kTypeValue); + } + + void Add(int level, uint32_t file_number, const char* smallest, + const char* largest, uint64_t file_size = 0, uint32_t path_id = 0, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100) { + assert(level < vstorage.NumberLevels()); + FileMetaData* f = new FileMetaData; + f->fd = FileDescriptor(file_number, path_id, file_size); + f->smallest = GetInternalKey(smallest, smallest_seq); + f->largest = GetInternalKey(largest, largest_seq); + f->compensated_file_size = file_size; + f->refs = 0; + vstorage.MaybeAddFile(level, f); + } + + void UpdateVersionStorageInfo() { + vstorage.ComputeCompactionScore(mutable_cf_options, fifo_options, + size_being_compacted); + vstorage.UpdateFilesBySize(); + vstorage.UpdateNumNonEmptyLevels(); + vstorage.GenerateFileIndexer(); + vstorage.GenerateLevelFilesBrief(); + vstorage.SetFinalized(); + } +}; + +TEST(VersionBuilderTest, ApplyAndSaveTo) { + Add(0, 1U, "150", "200", 100U); + // Level 1 score 1.2 + Add(1, 66U, "150", "200", 100U); + Add(1, 88U, "201", "300", 100U); + // Level 2 score 1.8. File 7 is the largest. Should be picked + Add(2, 6U, "150", "179", 100U); + Add(2, 7U, "180", "220", 100U); + Add(2, 8U, "221", "300", 100U); + // Level 3 score slightly larger than 1 + Add(3, 26U, "150", "170", 100U); + Add(3, 27U, "171", "179", 100U); + Add(3, 28U, "191", "220", 100U); + Add(3, 29U, "221", "300", 100U); + UpdateVersionStorageInfo(); + + VersionEdit version_edit; + version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"), + GetInternalKey("350"), 200, 200); + version_edit.DeleteFile(3, 27U); + + EnvOptions env_options; + + VersionBuilder version_builder(env_options, nullptr, &vstorage); + + VersionStorageInfo new_vstorage(&icmp, ucmp, options.num_levels, + kCompactionStyleLevel, nullptr); + version_builder.Apply(&version_edit); + version_builder.SaveTo(&new_vstorage); + + ASSERT_EQ(400U, new_vstorage.NumLevelBytes(2)); + ASSERT_EQ(300U, new_vstorage.NumLevelBytes(3)); + + for (int i = 0; i < new_vstorage.NumberLevels(); i++) { + for (auto* f : new_vstorage.LevelFiles(i)) { + if (--f->refs == 0) { + delete f; + } + } + } +} + +} // namespace rocksdb + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); } diff --git a/db/version_edit.h b/db/version_edit.h index 3317b11c4..f8e71d2e9 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -217,14 +217,19 @@ class VersionEdit { bool EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); + typedef std::set> DeletedFileSet; + + const DeletedFileSet& GetDeletedFiles() { return deleted_files_; } + const std::vector>& GetNewFiles() { + return new_files_; + } + std::string DebugString(bool hex_key = false) const; private: friend class VersionSet; friend class Version; - typedef std::set< std::pair> DeletedFileSet; - bool GetLevel(Slice* input, int* level, const char** msg); int max_level_; diff --git a/db/version_set.cc b/db/version_set.cc index 0069ef6b0..a195a7168 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -29,6 +29,7 @@ #include "db/merge_context.h" #include "db/table_cache.h" #include "db/compaction.h" +#include "db/version_builder.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "table/table_reader.h" @@ -315,9 +316,9 @@ Version::~Version() { next_->prev_ = prev_; // Drop references to files - for (int level = 0; level < vstorage_.num_levels_; level++) { - for (size_t i = 0; i < vstorage_.files_[level].size(); i++) { - FileMetaData* f = vstorage_.files_[level][i]; + for (int level = 0; level < storage_info_.num_levels_; level++) { + for (size_t i = 0; i < storage_info_.files_[level].size(); i++) { + FileMetaData* f = storage_info_.files_[level][i]; assert(f->refs > 0); f->refs--; if (f->refs <= 0) { @@ -512,6 +513,23 @@ class LevelFileIteratorState : public TwoLevelIteratorState { bool for_compaction_; }; +// A wrapper of version builder which references the current version in +// constructor and unref it in the destructor. +class BaseReferencedVersionBuilder { + public: + explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd) + : version_builder_(cfd->current()->version_set()->GetEnvOptions(), + cfd->table_cache(), cfd->current()->storage_info()), + version_(cfd->current()) { + version_->Ref(); + } + ~BaseReferencedVersionBuilder() { version_->Unref(); } + VersionBuilder* GetVersionBuilder() { return &version_builder_; } + + private: + VersionBuilder version_builder_; + Version* version_; +}; } // anonymous namespace Status Version::GetTableProperties(std::shared_ptr* tp, @@ -565,8 +583,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, } Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { - for (int level = 0; level < vstorage_.num_levels_; level++) { - for (const auto& file_meta : vstorage_.files_[level]) { + for (int level = 0; level < storage_info_.num_levels_; level++) { + for (const auto& file_meta : storage_info_.files_[level]) { auto fname = TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); @@ -587,7 +605,7 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { size_t Version::GetMemoryUsageByTableReaders() { size_t total_usage = 0; - for (auto& file_level : vstorage_.level_files_brief_) { + for (auto& file_level : storage_info_.level_files_brief_) { for (size_t i = 0; i < file_level.num_files; i++) { total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader( vset_->env_options_, cfd_->internal_comparator(), @@ -597,7 +615,7 @@ size_t Version::GetMemoryUsageByTableReaders() { return total_usage; } -uint64_t VersionStorageInfo::GetEstimatedActiveKeys() { +uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const { // Estimation will be not accurate when: // (1) there is merge keys // (2) keys are directly overwritten @@ -620,11 +638,11 @@ uint64_t VersionStorageInfo::GetEstimatedActiveKeys() { void Version::AddIterators(const ReadOptions& read_options, const EnvOptions& soptions, MergeIteratorBuilder* merge_iter_builder) { - assert(vstorage_.finalized_); + assert(storage_info_.finalized_); // Merge all level zero files together since they may overlap - for (size_t i = 0; i < vstorage_.level_files_brief_[0].num_files; i++) { - const auto& file = vstorage_.level_files_brief_[0].files[i]; + for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) { + const auto& file = storage_info_.LevelFilesBrief(0).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, false, merge_iter_builder->GetArena())); @@ -633,15 +651,15 @@ void Version::AddIterators(const ReadOptions& read_options, // For levels > 0, we can use a concatenating iterator that sequentially // walks through the non-overlapping files in the level, opening them // lazily. - for (int level = 1; level < vstorage_.num_levels_; level++) { - if (vstorage_.level_files_brief_[level].num_files != 0) { + for (int level = 1; level < storage_info_.NumberLevels(); level++) { + if (storage_info_.level_files_brief_[level].num_files != 0) { merge_iter_builder->AddIterator(NewTwoLevelIterator( new LevelFileIteratorState( cfd_->table_cache(), read_options, soptions, cfd_->internal_comparator(), false /* for_compaction */, cfd_->ioptions()->prefix_extractor != nullptr), new LevelFileNumIterator(cfd_->internal_comparator(), - &vstorage_.level_files_brief_[level]), + &storage_info_.LevelFilesBrief(level)), merge_iter_builder->GetArena())); } } @@ -689,14 +707,14 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, table_cache_((cfd == nullptr) ? nullptr : cfd->table_cache()), merge_operator_((cfd == nullptr) ? nullptr : cfd->ioptions()->merge_operator), - vstorage_((cfd == nullptr) ? nullptr : &cfd->internal_comparator(), - (cfd == nullptr) ? nullptr : cfd->user_comparator(), - cfd == nullptr ? 0 : cfd->NumberLevels(), - cfd == nullptr ? kCompactionStyleLevel - : cfd->ioptions()->compaction_style, - (cfd == nullptr || cfd->current() == nullptr) - ? nullptr - : cfd->current()->GetStorageInfo()), + storage_info_((cfd == nullptr) ? nullptr : &cfd->internal_comparator(), + (cfd == nullptr) ? nullptr : cfd->user_comparator(), + cfd == nullptr ? 0 : cfd->NumberLevels(), + cfd == nullptr ? kCompactionStyleLevel + : cfd->ioptions()->compaction_style, + (cfd == nullptr || cfd->current() == nullptr) + ? nullptr + : cfd->current()->storage_info()), vset_(vset), next_(this), prev_(this), @@ -715,16 +733,17 @@ void Version::Get(const ReadOptions& read_options, assert(status->ok() || status->IsMergeInProgress()); GetContext get_context( - GetUserComparator(), merge_operator_, info_log_, db_statistics_, + user_comparator(), merge_operator_, info_log_, db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, value, value_found, merge_context); - FilePicker fp(vstorage_.files_, user_key, ikey, &vstorage_.level_files_brief_, - vstorage_.num_non_empty_levels_, &vstorage_.file_indexer_, - GetUserComparator(), GetInternalComparator()); + FilePicker fp( + storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_, + storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, + user_comparator(), internal_comparator()); FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { - *status = table_cache_->Get(read_options, *GetInternalComparator(), f->fd, + *status = table_cache_->Get(read_options, *internal_comparator(), f->fd, ikey, &get_context); // TODO: examine the behavior for corrupted key if (!status->ok()) { @@ -783,13 +802,13 @@ void VersionStorageInfo::GenerateLevelFilesBrief() { void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, std::vector& size_being_compacted) { UpdateAccumulatedStats(); - vstorage_.ComputeCompactionScore(mutable_cf_options, - cfd_->ioptions()->compaction_options_fifo, - size_being_compacted); - vstorage_.UpdateFilesBySize(); - vstorage_.UpdateNumNonEmptyLevels(); - vstorage_.GenerateFileIndexer(); - vstorage_.GenerateLevelFilesBrief(); + storage_info_.ComputeCompactionScore( + mutable_cf_options, cfd_->ioptions()->compaction_options_fifo, + size_being_compacted); + storage_info_.UpdateFilesBySize(); + storage_info_.UpdateNumNonEmptyLevels(); + storage_info_.GenerateFileIndexer(); + storage_info_.GenerateLevelFilesBrief(); } bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { @@ -841,11 +860,12 @@ void Version::UpdateAccumulatedStats() { // will be triggered, which creates higher-level files whose num_deletions // will be updated here. for (int level = 0; - level < vstorage_.num_levels_ && init_count < kMaxInitCount; ++level) { - for (auto* file_meta : vstorage_.files_[level]) { + level < storage_info_.num_levels_ && init_count < kMaxInitCount; + ++level) { + for (auto* file_meta : storage_info_.files_[level]) { if (MaybeInitializeFileMetaData(file_meta)) { // each FileMeta will be initialized only once. - vstorage_.UpdateAccumulatedStats(file_meta); + storage_info_.UpdateAccumulatedStats(file_meta); if (++init_count >= kMaxInitCount) { break; } @@ -855,17 +875,17 @@ void Version::UpdateAccumulatedStats() { // In case all sampled-files contain only deletion entries, then we // load the table-property of a file in higher-level to initialize // that value. - for (int level = vstorage_.num_levels_ - 1; - vstorage_.accumulated_raw_value_size_ == 0 && level >= 0; --level) { - for (int i = static_cast(vstorage_.files_[level].size()) - 1; - vstorage_.accumulated_raw_value_size_ == 0 && i >= 0; --i) { - if (MaybeInitializeFileMetaData(vstorage_.files_[level][i])) { - vstorage_.UpdateAccumulatedStats(vstorage_.files_[level][i]); + for (int level = storage_info_.num_levels_ - 1; + storage_info_.accumulated_raw_value_size_ == 0 && level >= 0; --level) { + for (int i = static_cast(storage_info_.files_[level].size()) - 1; + storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) { + if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) { + storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]); } } } - vstorage_.ComputeCompensatedSizes(); + storage_info_.ComputeCompensatedSizes(); } void VersionStorageInfo::ComputeCompensatedSizes() { @@ -987,6 +1007,18 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) { } // anonymous namespace +void VersionStorageInfo::MaybeAddFile(int level, FileMetaData* f) { + assert(level < NumberLevels()); + auto* level_files = &files_[level]; + // Must not overlap + assert(level <= 0 || level_files->empty() || + internal_comparator_->Compare( + (*level_files)[level_files->size() - 1]->largest, f->smallest) < + 0); + f->refs++; + level_files->push_back(f); +} + void VersionStorageInfo::UpdateNumNonEmptyLevels() { num_non_empty_levels_ = num_levels_; for (int i = num_levels_ - 1; i >= 0; i--) { @@ -1379,8 +1411,8 @@ int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() { } void Version::AddLiveFiles(std::vector* live) { - for (int level = 0; level < vstorage_.NumberLevels(); level++) { - const std::vector& files = vstorage_.files_[level]; + for (int level = 0; level < storage_info_.NumberLevels(); level++) { + const std::vector& files = storage_info_.files_[level]; for (const auto& file : files) { live->push_back(file->fd); } @@ -1389,7 +1421,7 @@ void Version::AddLiveFiles(std::vector* live) { std::string Version::DebugString(bool hex) const { std::string r; - for (int level = 0; level < vstorage_.num_levels_; level++) { + for (int level = 0; level < storage_info_.num_levels_; level++) { // E.g., // --- level 1 --- // 17:123['a' .. 'd'] @@ -1399,7 +1431,7 @@ std::string Version::DebugString(bool hex) const { r.append(" --- version# "); AppendNumberTo(&r, version_number_); r.append(" ---\n"); - const std::vector& files = vstorage_.files_[level]; + const std::vector& files = storage_info_.files_[level]; for (size_t i = 0; i < files.size(); i++) { r.push_back(' '); AppendNumberTo(&r, files[i]->fd.GetNumber()); @@ -1428,10 +1460,7 @@ struct VersionSet::ManifestWriter { : done(false), cv(mu), cfd(cfd), edit(e) {} }; -// A helper class so we can efficiently apply a whole sequence -// of edits to a particular state without creating intermediate -// Versions that contain full copies of the intermediate state. -class VersionSet::Builder { +class VersionBuilder::Rep { private: // Helper to sort files_ in v // kLevel0 -- NewestFirstBySeqNo @@ -1461,30 +1490,33 @@ class VersionSet::Builder { FileSet* added_files; }; - ColumnFamilyData* cfd_; - Version* base_; + const EnvOptions& env_options_; + TableCache* table_cache_; + VersionStorageInfo* base_vstorage_; LevelState* levels_; FileComparator level_zero_cmp_; FileComparator level_nonzero_cmp_; public: - Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) { - base_->Ref(); - levels_ = new LevelState[base_->GetStorageInfo()->NumberLevels()]; + Rep(const EnvOptions& env_options, TableCache* table_cache, + VersionStorageInfo* base_vstorage) + : env_options_(env_options), + table_cache_(table_cache), + base_vstorage_(base_vstorage) { + levels_ = new LevelState[base_vstorage_->NumberLevels()]; level_zero_cmp_.sort_method = FileComparator::kLevel0; level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; - level_nonzero_cmp_.internal_comparator = &cfd->internal_comparator(); + level_nonzero_cmp_.internal_comparator = + base_vstorage_->InternalComparator(); levels_[0].added_files = new FileSet(level_zero_cmp_); - for (int level = 1; level < base_->GetStorageInfo()->NumberLevels(); - level++) { + for (int level = 1; level < base_vstorage_->NumberLevels(); level++) { levels_[level].added_files = new FileSet(level_nonzero_cmp_); } } - ~Builder() { - for (int level = 0; level < base_->GetStorageInfo()->NumberLevels(); - level++) { + ~Rep() { + for (int level = 0; level < base_vstorage_->NumberLevels(); level++) { const FileSet* added = levels_[level].added_files; std::vector to_unref; to_unref.reserve(added->size()); @@ -1498,7 +1530,8 @@ class VersionSet::Builder { f->refs--; if (f->refs <= 0) { if (f->table_reader_handle) { - cfd_->table_cache()->ReleaseHandle(f->table_reader_handle); + assert(table_cache_ != nullptr); + table_cache_->ReleaseHandle(f->table_reader_handle); f->table_reader_handle = nullptr; } delete f; @@ -1507,17 +1540,16 @@ class VersionSet::Builder { } delete[] levels_; - base_->Unref(); } - void CheckConsistency(Version* v) { + void CheckConsistency(VersionStorageInfo* vstorage) { #ifndef NDEBUG // make sure the files are sorted correctly - auto* files = v->GetFiles(); - for (int level = 0; level < v->GetStorageInfo()->NumberLevels(); level++) { - for (size_t i = 1; i < files[level].size(); i++) { - auto f1 = files[level][i - 1]; - auto f2 = files[level][i]; + for (int level = 0; level < vstorage->NumberLevels(); level++) { + auto& level_files = vstorage->LevelFiles(level); + for (size_t i = 1; i < level_files.size(); i++) { + auto f1 = level_files[i - 1]; + auto f2 = level_files[i]; if (level == 0) { assert(level_zero_cmp_(f1, f2)); assert(f1->largest_seqno > f2->largest_seqno); @@ -1525,8 +1557,8 @@ class VersionSet::Builder { assert(level_nonzero_cmp_(f1, f2)); // Make sure there is no overlap in levels > 0 - if (cfd_->internal_comparator().Compare(f1->largest, f2->smallest) >= - 0) { + if (vstorage->InternalComparator()->Compare(f1->largest, + f2->smallest) >= 0) { fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", (f1->largest).DebugString().c_str(), (f2->smallest).DebugString().c_str()); @@ -1543,10 +1575,9 @@ class VersionSet::Builder { #ifndef NDEBUG // a file to be deleted better exist in the previous version bool found = false; - auto* files = base_->GetFiles(); - for (int l = 0; !found && l < base_->GetStorageInfo()->NumberLevels(); - l++) { - const std::vector& base_files = files[l]; + for (int l = 0; !found && l < base_vstorage_->NumberLevels(); l++) { + const std::vector& base_files = + base_vstorage_->LevelFiles(l); for (unsigned int i = 0; i < base_files.size(); i++) { FileMetaData* f = base_files[i]; if (f->fd.GetNumber() == number) { @@ -1558,8 +1589,8 @@ class VersionSet::Builder { // if the file did not exist in the previous version, then it // is possibly moved from lower level to higher level in current // version - for (int l = level + 1; - !found && l < base_->GetStorageInfo()->NumberLevels(); l++) { + for (int l = level + 1; !found && l < base_vstorage_->NumberLevels(); + l++) { const FileSet* added = levels_[l].added_files; for (FileSet::const_iterator added_iter = added->begin(); added_iter != added->end(); ++added_iter) { @@ -1592,10 +1623,10 @@ class VersionSet::Builder { // Apply all of the edits in *edit to the current state. void Apply(VersionEdit* edit) { - CheckConsistency(base_); + CheckConsistency(base_vstorage_); // Delete files - const VersionEdit::DeletedFileSet& del = edit->deleted_files_; + const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles(); for (const auto& del_file : del) { const auto level = del_file.first; const auto number = del_file.second; @@ -1604,7 +1635,7 @@ class VersionSet::Builder { } // Add new files - for (const auto& new_file : edit->new_files_) { + for (const auto& new_file : edit->GetNewFiles()) { const int level = new_file.first; FileMetaData* f = new FileMetaData(new_file.second); f->refs = 1; @@ -1615,77 +1646,88 @@ class VersionSet::Builder { } // Save the current state in *v. - void SaveTo(Version* v) { - CheckConsistency(base_); - CheckConsistency(v); + void SaveTo(VersionStorageInfo* vstorage) { + CheckConsistency(base_vstorage_); + CheckConsistency(vstorage); - auto* out_files = v->GetFiles(); - for (int level = 0; level < base_->GetStorageInfo()->NumberLevels(); - level++) { + for (int level = 0; level < base_vstorage_->NumberLevels(); level++) { const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. - const auto& base_files = base_->GetStorageInfo()->LevelFiles(level); + const auto& base_files = base_vstorage_->LevelFiles(level); auto base_iter = base_files.begin(); auto base_end = base_files.end(); const auto& added_files = *levels_[level].added_files; - out_files[level].reserve(base_files.size() + added_files.size()); + vstorage->Reserve(level, base_files.size() + added_files.size()); for (const auto& added : added_files) { // Add all smaller files listed in base_ for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp); base_iter != bpos; ++base_iter) { - MaybeAddFile(v, level, *base_iter); + MaybeAddFile(vstorage, level, *base_iter); } - MaybeAddFile(v, level, added); + MaybeAddFile(vstorage, level, added); } // Add remaining base files for (; base_iter != base_end; ++base_iter) { - MaybeAddFile(v, level, *base_iter); + MaybeAddFile(vstorage, level, *base_iter); } } - CheckConsistency(v); + CheckConsistency(vstorage); } void LoadTableHandlers() { - for (int level = 0; level < cfd_->NumberLevels(); level++) { + assert(table_cache_ != nullptr); + for (int level = 0; level < base_vstorage_->NumberLevels(); level++) { for (auto& file_meta : *(levels_[level].added_files)) { - assert (!file_meta->table_reader_handle); - cfd_->table_cache()->FindTable( - base_->GetVersionSet()->env_options_, cfd_->internal_comparator(), + assert(!file_meta->table_reader_handle); + table_cache_->FindTable( + env_options_, *(base_vstorage_->InternalComparator()), file_meta->fd, &file_meta->table_reader_handle, false); if (file_meta->table_reader_handle != nullptr) { // Load table_reader - file_meta->fd.table_reader = - cfd_->table_cache()->GetTableReaderFromHandle( - file_meta->table_reader_handle); - } + file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle( + file_meta->table_reader_handle); } } } + } - void MaybeAddFile(Version* v, int level, FileMetaData* f) { + void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) { // File is deleted: do nothing } else { - auto* files = v->GetFiles(); - auto* level_files = &files[level]; - if (level > 0 && !level_files->empty()) { - // Must not overlap - assert(cfd_->internal_comparator().Compare( - (*level_files)[level_files->size() - 1]->largest, - f->smallest) < 0); - } - f->refs++; - level_files->push_back(f); + vstorage->MaybeAddFile(level, f); } } }; +VersionBuilder::VersionBuilder(const EnvOptions& env_options, + TableCache* table_cache, + VersionStorageInfo* base_vstorage) + : rep_(new Rep(env_options, table_cache, base_vstorage)) {} +VersionBuilder::~VersionBuilder() { delete rep_; } +void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) { + rep_->CheckConsistency(vstorage); +} +void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit, + uint64_t number, int level) { + rep_->CheckConsistencyForDeletes(edit, number, level); +} +void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); } +void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) { + rep_->SaveTo(vstorage); +} +void VersionBuilder::LoadTableHandlers() { rep_->LoadTableHandlers(); } +void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, + FileMetaData* f) { + rep_->MaybeAddFile(vstorage, level, f); +} + VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& env_options, Cache* table_cache, WriteController* write_controller) @@ -1717,7 +1759,7 @@ VersionSet::~VersionSet() { void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Version* v) { // Mark v finalized - v->vstorage_.SetFinalized(); + v->storage_info_.SetFinalized(); // Make "v" current assert(v->refs_ == 0); @@ -1773,7 +1815,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, std::vector batch_edits; Version* v = nullptr; - std::unique_ptr builder(nullptr); + std::unique_ptr builder_guard(nullptr); // process all requests in the queue ManifestWriter* last_writer = &w; @@ -1785,7 +1827,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, batch_edits.push_back(edit); } else { v = new Version(column_family_data, this, current_version_number_++); - builder.reset(new Builder(column_family_data)); + builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data)); + auto* builder = builder_guard->GetVersionBuilder(); for (const auto& writer : manifest_writers_) { if (writer->edit->IsColumnFamilyManipulation() || writer->cfd->GetID() != column_family_data->GetID()) { @@ -1794,11 +1837,10 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, break; } last_writer = writer; - LogAndApplyHelper(column_family_data, builder.get(), v, last_writer->edit, - mu); + LogAndApplyHelper(column_family_data, builder, v, last_writer->edit, mu); batch_edits.push_back(last_writer->edit); } - builder->SaveTo(v); + builder->SaveTo(v->storage_info()); } // Initialize new descriptor log file if necessary by creating @@ -1828,7 +1870,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, { std::vector size_being_compacted; if (!edit->IsColumnFamilyManipulation()) { - size_being_compacted.resize(v->GetStorageInfo()->NumberLevels() - 1); + size_being_compacted.resize(v->storage_info()->NumberLevels() - 1); // calculate the amount of data being compacted at every level column_family_data->compaction_picker()->SizeBeingCompacted( size_being_compacted); @@ -1840,7 +1882,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, db_options_->max_open_files == -1) { // unlimited table cache. Pre-load table handle now. // Need to do it out of the mutex. - builder->LoadTableHandlers(); + builder_guard->GetVersionBuilder()->LoadTableHandlers(); } // This is fine because everything inside of this block is serialized -- @@ -2019,9 +2061,9 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { } } -void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder, - Version* v, VersionEdit* edit, - port::Mutex* mu) { +void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, + VersionBuilder* builder, Version* v, + VersionEdit* edit, port::Mutex* mu) { mu->AssertHeld(); assert(!edit->IsColumnFamilyManipulation()); @@ -2097,7 +2139,7 @@ Status VersionSet::Recover( uint64_t log_number = 0; uint64_t prev_log_number = 0; uint32_t max_column_family = 0; - std::unordered_map builders; + std::unordered_map builders; // add default column family auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName); @@ -2109,7 +2151,7 @@ Status VersionSet::Recover( default_cf_edit.SetColumnFamily(0); ColumnFamilyData* default_cfd = CreateColumnFamily(default_cf_iter->second, &default_cf_edit); - builders.insert({0, new Builder(default_cfd)}); + builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); { VersionSet::LogReporter reporter; @@ -2155,7 +2197,8 @@ Status VersionSet::Recover( {edit.column_family_, edit.column_family_name_}); } else { cfd = CreateColumnFamily(cf_options->second, &edit); - builders.insert({edit.column_family_, new Builder(cfd)}); + builders.insert( + {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); } } else if (edit.is_column_family_drop_) { if (cf_in_builders) { @@ -2188,8 +2231,7 @@ Status VersionSet::Recover( cfd = column_family_set_->GetColumnFamily(edit.column_family_); // this should never happen since cf_in_builders is true assert(cfd != nullptr); - if (edit.max_level_ >= - cfd->current()->GetStorageInfo()->NumberLevels()) { + if (edit.max_level_ >= cfd->current()->storage_info()->NumberLevels()) { s = Status::InvalidArgument( "db has more levels than options.num_levels"); break; @@ -2200,7 +2242,7 @@ Status VersionSet::Recover( // to builder auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); - builder->second->Apply(&edit); + builder->second->GetVersionBuilder()->Apply(&edit); } if (cfd != nullptr) { @@ -2280,7 +2322,7 @@ Status VersionSet::Recover( for (auto cfd : *column_family_set_) { auto builders_iter = builders.find(cfd->GetID()); assert(builders_iter != builders.end()); - auto builder = builders_iter->second; + auto builder = builders_iter->second->GetVersionBuilder(); if (db_options_->max_open_files == -1) { // unlimited table cache. Pre-load table handle now. @@ -2289,11 +2331,11 @@ Status VersionSet::Recover( } Version* v = new Version(cfd, this, current_version_number_++); - builder->SaveTo(v); + builder->SaveTo(v->storage_info()); // Install recovered version std::vector size_being_compacted( - v->GetStorageInfo()->NumberLevels() - 1); + v->storage_info()->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); AppendVersion(cfd, v); @@ -2425,7 +2467,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, Version* current_version = versions.GetColumnFamilySet()->GetDefault()->current(); - auto* vstorage = current_version->GetStorageInfo(); + auto* vstorage = current_version->storage_info(); int current_levels = vstorage->NumberLevels(); if (current_levels <= new_levels) { @@ -2454,18 +2496,17 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, } } - std::vector* old_files_list = vstorage->GetFiles(); // we need to allocate an array with the old number of levels size to // avoid SIGSEGV in WriteSnapshot() // however, all levels bigger or equal to new_levels will be empty std::vector* new_files_list = new std::vector[current_levels]; for (int i = 0; i < new_levels - 1; i++) { - new_files_list[i] = old_files_list[i]; + new_files_list[i] = vstorage->LevelFiles(i); } if (first_nonempty_level > 0) { - new_files_list[new_levels - 1] = old_files_list[first_nonempty_level]; + new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level); } delete[] vstorage -> files_; @@ -2498,7 +2539,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, uint64_t prev_log_number = 0; int count = 0; std::unordered_map comparators; - std::unordered_map builders; + std::unordered_map builders; // add default column family VersionEdit default_cf_edit; @@ -2506,7 +2547,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, default_cf_edit.SetColumnFamily(0); ColumnFamilyData* default_cfd = CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit); - builders.insert({0, new Builder(default_cfd)}); + builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); { VersionSet::LogReporter reporter; @@ -2545,7 +2586,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, break; } cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit); - builders.insert({edit.column_family_, new Builder(cfd)}); + builders.insert( + {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); } else if (edit.is_column_family_drop_) { if (!cf_in_builders) { s = Status::Corruption( @@ -2577,7 +2619,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, // to builder auto builder = builders.find(edit.column_family_); assert(builder != builders.end()); - builder->second->Apply(&edit); + builder->second->GetVersionBuilder()->Apply(&edit); } if (cfd != nullptr && edit.has_log_number_) { @@ -2624,12 +2666,12 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, for (auto cfd : *column_family_set_) { auto builders_iter = builders.find(cfd->GetID()); assert(builders_iter != builders.end()); - auto builder = builders_iter->second; + auto builder = builders_iter->second->GetVersionBuilder(); Version* v = new Version(cfd, this, current_version_number_++); - builder->SaveTo(v); + builder->SaveTo(v->storage_info()); std::vector size_being_compacted( - v->GetStorageInfo()->NumberLevels() - 1); + v->storage_info()->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); delete builder; @@ -2706,8 +2748,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { edit.SetColumnFamily(cfd->GetID()); for (int level = 0; level < cfd->NumberLevels(); level++) { - auto* files = cfd->current()->GetFiles(); - for (const auto& f : files[level]) { + for (const auto& f : + cfd->current()->storage_info()->LevelFiles(level)) { edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); @@ -2762,7 +2804,7 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_number, uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; - auto* vstorage = v->GetStorageInfo(); + const auto* vstorage = v->storage_info(); for (int level = 0; level < vstorage->NumberLevels(); level++) { const std::vector& files = vstorage->LevelFiles(level); for (size_t i = 0; i < files.size(); i++) { @@ -2803,7 +2845,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Version* dummy_versions = cfd->dummy_versions(); for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { - auto* vstorage = v->GetStorageInfo(); + const auto* vstorage = v->storage_info(); for (int level = 0; level < vstorage->NumberLevels(); level++) { total_files += vstorage->LevelFiles(level).size(); } @@ -2817,7 +2859,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Version* dummy_versions = cfd->dummy_versions(); for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { - auto* vstorage = v->GetStorageInfo(); + const auto* vstorage = v->storage_info(); for (int level = 0; level < vstorage->NumberLevels(); level++) { for (const auto& f : vstorage->LevelFiles(level)) { live_list->push_back(f->fd); @@ -2875,7 +2917,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG Version* version = c->column_family_data()->current(); - VersionStorageInfo* vstorage = version->GetStorageInfo(); + const VersionStorageInfo* vstorage = version->storage_info(); if (c->input_version() != version) { Log(db_options_->info_log, "[%s] VerifyCompactionFileConsistency version mismatch", @@ -2927,7 +2969,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, ColumnFamilyData** cfd) { for (auto cfd_iter : *column_family_set_) { Version* version = cfd_iter->current(); - auto* vstorage = version->GetStorageInfo(); + const auto* vstorage = version->storage_info(); for (int level = 0; level < vstorage->NumberLevels(); level++) { for (const auto& file : vstorage->LevelFiles(level)) { if (file->fd.GetNumber() == number) { @@ -2944,9 +2986,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (auto cfd : *column_family_set_) { - auto* files = cfd->current()->GetFiles(); for (int level = 0; level < cfd->NumberLevels(); level++) { - for (const auto& file : files[level]) { + for (const auto& file : + cfd->current()->storage_info()->LevelFiles(level)) { LiveFileMetaData filemetadata; filemetadata.column_family_name = cfd->GetName(); uint32_t path_id = file->fd.GetPathId(); diff --git a/db/version_set.h b/db/version_set.h index 98ce172e3..44e6f94b2 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -26,6 +26,7 @@ #include #include #include "db/dbformat.h" +#include "db/version_builder.h" #include "db/version_edit.h" #include "port/port.h" #include "db/table_cache.h" @@ -91,6 +92,10 @@ class VersionStorageInfo { VersionStorageInfo* src_vstorage); ~VersionStorageInfo(); + void Reserve(int level, size_t size) { files_[level].reserve(size); } + + void MaybeAddFile(int level, FileMetaData* f); + void SetFinalized() { finalized_ = true; } // Update num_non_empty_levels_. @@ -197,7 +202,6 @@ class VersionStorageInfo { // REQUIRES: This version has been saved (see VersionSet::SaveTo) const std::vector& LevelFiles(int level) const { - assert(finalized_); return files_[level]; } @@ -249,8 +253,6 @@ class VersionStorageInfo { // in a specified level. Uses *scratch as backing store. const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; - std::vector* GetFiles() { return files_; } - // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. int64_t MaxNextLevelOverlappingBytes(); @@ -269,7 +271,7 @@ class VersionStorageInfo { (accumulated_raw_key_size_ + accumulated_raw_value_size_); } - uint64_t GetEstimatedActiveKeys(); + uint64_t GetEstimatedActiveKeys() const; // re-initializes the index that is used to offset into files_by_size_ // to find the next compaction candidate file. @@ -277,6 +279,10 @@ class VersionStorageInfo { next_file_to_compact_by_size_[level] = 0; } + const InternalKeyComparator* InternalComparator() { + return internal_comparator_; + } + private: const InternalKeyComparator* internal_comparator_; const Comparator* user_comparator_; @@ -374,8 +380,6 @@ class Version { // and return true. Otherwise, return false. bool Unref(); - std::vector* GetFiles() { return vstorage_.GetFiles(); } - // Add all files listed in the current version to *live. void AddLiveFiles(std::vector* live); @@ -385,10 +389,6 @@ class Version { // Returns the version nuber of this version uint64_t GetVersionNumber() const { return version_number_; } - uint64_t GetAverageValueSize() const { - return vstorage_.GetAverageValueSize(); - } - // REQUIRES: lock is held // On success, "tp" will contains the table properties of the file // specified in "file_meta". If the file name of "file_meta" is @@ -405,7 +405,7 @@ class Version { Status GetPropertiesOfAllTables(TablePropertiesCollection* props); uint64_t GetEstimatedActiveKeys() { - return vstorage_.GetEstimatedActiveKeys(); + return storage_info_.GetEstimatedActiveKeys(); } size_t GetMemoryUsageByTableReaders(); @@ -418,16 +418,18 @@ class Version { return next_; } - VersionStorageInfo* GetStorageInfo() { return &vstorage_; } + VersionStorageInfo* storage_info() { return &storage_info_; } + + VersionSet* version_set() { return vset_; } private: friend class VersionSet; - const InternalKeyComparator* GetInternalComparator() const { - return vstorage_.internal_comparator_; + const InternalKeyComparator* internal_comparator() const { + return storage_info_.internal_comparator_; } - const Comparator* GetUserComparator() const { - return vstorage_.user_comparator_; + const Comparator* user_comparator() const { + return storage_info_.user_comparator_; } bool PrefixMayMatch(const ReadOptions& read_options, Iterator* level_iter, @@ -446,15 +448,13 @@ class Version { // record results in files_by_size_. The largest files are listed first. void UpdateFilesBySize(); - VersionSet* GetVersionSet() { return vset_; } - ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs Logger* info_log_; Statistics* db_statistics_; TableCache* table_cache_; const MergeOperator* merge_operator_; - VersionStorageInfo vstorage_; + VersionStorageInfo storage_info_; VersionSet* vset_; // VersionSet to which this Version belongs Version* next_; // Next version in linked list Version* prev_; // Previous version in linked list @@ -602,9 +602,9 @@ class VersionSet { void GetObsoleteFiles(std::vector* files); ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } + const EnvOptions& GetEnvOptions() { return env_options_; } private: - class Builder; struct ManifestWriter; friend class Version; @@ -664,7 +664,7 @@ class VersionSet { void operator=(const VersionSet&); void LogAndApplyCFHelper(VersionEdit* edit); - void LogAndApplyHelper(ColumnFamilyData* cfd, Builder* b, Version* v, + void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v, VersionEdit* edit, port::Mutex* mu); }; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 3ff31359b..618c10a35 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1125,7 +1125,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int max = -1; auto default_cfd = versions.GetColumnFamilySet()->GetDefault(); for (int i = 0; i < default_cfd->NumberLevels(); i++) { - if (default_cfd->current()->GetStorageInfo()->NumLevelFiles(i)) { + if (default_cfd->current()->storage_info()->NumLevelFiles(i)) { max = i; } } diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc index 455b312fa..3a417de2b 100644 --- a/utilities/compacted_db/compacted_db_impl.cc +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -104,7 +104,7 @@ Status CompactedDBImpl::Init(const Options& options) { } version_ = cfd_->GetSuperVersion()->current; user_comparator_ = cfd_->user_comparator(); - auto* vstorage = version_->GetStorageInfo(); + auto* vstorage = version_->storage_info(); const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0); // L0 should not have files if (l0.num_files > 1) {