[column families] Removing VersionSet::current()

Summary: Instead of VersionSet::current(), DBImpl uses default_cfd_->current directly.

Test Plan: make check

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15483
This commit is contained in:
Igor Canadi 2014-01-27 14:33:50 -08:00
parent 511b03a5b5
commit 4bf25357ae
8 changed files with 48 additions and 53 deletions

View File

@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// Make a set of all of the live *.sst files // Make a set of all of the live *.sst files
std::set<uint64_t> live; std::set<uint64_t> live;
versions_->current()->AddLiveFiles(&live); default_cfd_->current->AddLiveFiles(&live);
ret.clear(); ret.clear();
ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST

View File

@ -1079,7 +1079,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable = const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber(); mems[0]->GetFirstSequenceNumber();
Version* base = versions_->current(); Version* base = default_cfd_->current;
base->Ref(); // it is likely that we do not need this reference base->Ref(); // it is likely that we do not need this reference
Status s; Status s;
{ {
@ -1116,7 +1116,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
// re-acquire the most current version // re-acquire the most current version
base = versions_->current(); base = default_cfd_->current;
// There could be multiple threads writing to its own level-0 file. // There could be multiple threads writing to its own level-0 file.
// The pending_outputs cannot be cleared here, otherwise this newly // The pending_outputs cannot be cleared here, otherwise this newly
@ -1239,7 +1239,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
int max_level_with_files = 1; int max_level_with_files = 1;
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
Version* base = versions_->current(); Version* base = default_cfd_->current;
for (int level = 1; level < NumberLevels(); level++) { for (int level = 1; level < NumberLevels(); level++) {
if (base->OverlapInLevel(level, begin, end)) { if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level; max_level_with_files = level;
@ -1272,7 +1272,7 @@ Status DBImpl::CompactRange(const ColumnFamilyHandle& column_family,
// return the same level if it cannot be moved // return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(int level) { int DBImpl::FindMinimumEmptyLevelFitting(int level) {
mutex_.AssertHeld(); mutex_.AssertHeld();
Version* current = versions_->current(); Version* current = default_cfd_->current;
int minimum_level = level; int minimum_level = level;
for (int i = level - 1; i > 0; --i) { for (int i = level - 1; i > 0; --i) {
// stop if level i is not empty // stop if level i is not empty
@ -1323,10 +1323,10 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
Status status; Status status;
if (to_level < level) { if (to_level < level) {
Log(options_.info_log, "Before refitting:\n%s", Log(options_.info_log, "Before refitting:\n%s",
versions_->current()->DebugString().data()); default_cfd_->current->DebugString().data());
VersionEdit edit; VersionEdit edit;
for (const auto& f : versions_->current()->files_[level]) { for (const auto& f : default_cfd_->current->files_[level]) {
edit.DeleteFile(level, f->number); edit.DeleteFile(level, f->number);
edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno); f->smallest_seqno, f->largest_seqno);
@ -1343,7 +1343,7 @@ Status DBImpl::ReFitLevel(int level, int target_level) {
if (status.ok()) { if (status.ok()) {
Log(options_.info_log, "After refitting:\n%s", Log(options_.info_log, "After refitting:\n%s",
versions_->current()->DebugString().data()); default_cfd_->current->DebugString().data());
} }
} }
@ -1714,8 +1714,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// flush, but the HIGH pool is not enabled). Do it only if // flush, but the HIGH pool is not enabled). Do it only if
// max_background_compactions hasn't been reached and, in case // max_background_compactions hasn't been reached and, in case
// bg_manual_only_ > 0, if it's a manual compaction. // bg_manual_only_ > 0, if it's a manual compaction.
if ((manual_compaction_ || if ((manual_compaction_ || default_cfd_->current->NeedsCompaction() ||
versions_->current()->NeedsCompaction() ||
(is_flush_pending && (options_.max_background_flushes <= 0))) && (is_flush_pending && (options_.max_background_flushes <= 0))) &&
bg_compaction_scheduled_ < options_.max_background_compactions && bg_compaction_scheduled_ < options_.max_background_compactions &&
(!bg_manual_only_ || manual_compaction_)) { (!bg_manual_only_ || manual_compaction_)) {
@ -1794,7 +1793,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
uint64_t DBImpl::TEST_GetLevel0TotalSize() { uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
return versions_->current()->NumLevelBytes(0); return default_cfd_->current->NumLevelBytes(0);
} }
void DBImpl::BackgroundCallCompaction() { void DBImpl::BackgroundCallCompaction() {
@ -1916,7 +1915,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size), static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), versions_->current()->LevelSummary(&tmp)); status.ToString().c_str(), default_cfd_->current->LevelSummary(&tmp));
versions_->ReleaseCompactionFiles(c.get(), status); versions_->ReleaseCompactionFiles(c.get(), status);
*madeProgress = true; *madeProgress = true;
} else { } else {
@ -2206,7 +2205,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compact->compaction->Summary(scratch, sizeof(scratch)); compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch); Log(options_.info_log, "Compaction start summary: %s\n", scratch);
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->compaction->input_version()->NumLevelFiles(
compact->compaction->level()) > 0);
assert(compact->builder == nullptr); assert(compact->builder == nullptr);
assert(!compact->outfile); assert(!compact->outfile);
@ -2584,7 +2584,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
"compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s\n", "write-amplify(%.1f) %s\n",
versions_->current()->LevelSummary(&tmp), compact->compaction->input_version()->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) / (stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
(double)stats.micros, (double)stats.micros,
compact->compaction->output_level(), stats.files_in_leveln, compact->compaction->output_level(), stats.files_in_leveln,
@ -2648,8 +2648,8 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
mutable_mem->Ref(); mutable_mem->Ref();
immutable_mems = default_cfd_->imm.current(); immutable_mems = default_cfd_->imm.current();
immutable_mems->Ref(); immutable_mems->Ref();
versions_->current()->Ref(); version = default_cfd_->current;
version = versions_->current(); version->Ref();
mutex_.Unlock(); mutex_.Unlock();
std::vector<Iterator*> iterator_list; std::vector<Iterator*> iterator_list;
@ -2689,7 +2689,7 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
mutable_mem->Ref(); mutable_mem->Ref();
immutable_mems = default_cfd_->imm.current(); immutable_mems = default_cfd_->imm.current();
immutable_mems->Ref(); immutable_mems->Ref();
version = versions_->current(); version = default_cfd_->current;
version->Ref(); version->Ref();
if (superversion_number != nullptr) { if (superversion_number != nullptr) {
*superversion_number = CurrentVersionNumber(); *superversion_number = CurrentVersionNumber();
@ -2731,7 +2731,7 @@ std::pair<Iterator*, Iterator*> DBImpl::GetTailingIteratorPair(
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
return versions_->current()->MaxNextLevelOverlappingBytes(); return default_cfd_->current->MaxNextLevelOverlappingBytes();
} }
Status DBImpl::Get(const ReadOptions& options, Status DBImpl::Get(const ReadOptions& options,
@ -3299,7 +3299,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
// this delay hands over some CPU to the compaction thread in // this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer. // case it is sharing the same core as the writer.
uint64_t slowdown = uint64_t slowdown =
SlowdownAmount(versions_->current()->NumLevelFiles(0), SlowdownAmount(default_cfd_->current->NumLevelFiles(0),
options_.level0_slowdown_writes_trigger, options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger); options_.level0_stop_writes_trigger);
mutex_.Unlock(); mutex_.Unlock();
@ -3339,7 +3339,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
STALL_MEMTABLE_COMPACTION_MICROS, stall); STALL_MEMTABLE_COMPACTION_MICROS, stall);
stall_memtable_compaction_ += stall; stall_memtable_compaction_ += stall;
stall_memtable_compaction_count_++; stall_memtable_compaction_count_++;
} else if (versions_->current()->NumLevelFiles(0) >= } else if (default_cfd_->current->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) { options_.level0_stop_writes_trigger) {
// There are too many level-0 files. // There are too many level-0 files.
DelayLoggingAndReset(); DelayLoggingAndReset();
@ -3355,10 +3355,10 @@ Status DBImpl::MakeRoomForWrite(bool force,
stall_level0_num_files_ += stall; stall_level0_num_files_ += stall;
stall_level0_num_files_count_++; stall_level0_num_files_count_++;
} else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 && } else if (allow_hard_rate_limit_delay && options_.hard_rate_limit > 1.0 &&
(score = versions_->current()->MaxCompactionScore()) > (score = default_cfd_->current->MaxCompactionScore()) >
options_.hard_rate_limit) { options_.hard_rate_limit) {
// Delay a write when the compaction score for any level is too large. // Delay a write when the compaction score for any level is too large.
int max_level = versions_->current()->MaxCompactionScoreLevel(); int max_level = default_cfd_->current->MaxCompactionScoreLevel();
mutex_.Unlock(); mutex_.Unlock();
uint64_t delayed; uint64_t delayed;
{ {
@ -3381,7 +3381,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
} }
mutex_.Lock(); mutex_.Lock();
} else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 && } else if (allow_soft_rate_limit_delay && options_.soft_rate_limit > 0.0 &&
(score = versions_->current()->MaxCompactionScore()) > (score = default_cfd_->current->MaxCompactionScore()) >
options_.soft_rate_limit) { options_.soft_rate_limit) {
// Delay a write when the compaction score for any level is too large. // Delay a write when the compaction score for any level is too large.
// TODO: add statistics // TODO: add statistics
@ -3470,7 +3470,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
value->clear(); value->clear();
MutexLock l(&mutex_); MutexLock l(&mutex_);
Version* current = versions_->current(); Version* current = default_cfd_->current;
Slice in = property; Slice in = property;
Slice prefix("rocksdb."); Slice prefix("rocksdb.");
if (!in.starts_with(prefix)) return false; if (!in.starts_with(prefix)) return false;
@ -3734,7 +3734,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family,
return true; return true;
} else if (in == "sstables") { } else if (in == "sstables") {
*value = versions_->current()->DebugString(); *value = default_cfd_->current->DebugString();
return true; return true;
} else if (in == "num-immutable-mem-table") { } else if (in == "num-immutable-mem-table") {
*value = std::to_string(default_cfd_->imm.size()); *value = std::to_string(default_cfd_->imm.size());
@ -3750,8 +3750,8 @@ void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family,
Version* v; Version* v;
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
versions_->current()->Ref(); v = default_cfd_->current;
v = versions_->current(); v->Ref();
} }
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
@ -3803,11 +3803,12 @@ Status DBImpl::DeleteFile(std::string name) {
int level; int level;
FileMetaData metadata; FileMetaData metadata;
int maxlevel = NumberLevels(); int maxlevel = NumberLevels();
ColumnFamilyData* cfd;
VersionEdit edit; VersionEdit edit;
DeletionState deletion_state(0, true); DeletionState deletion_state(0, true);
{ {
MutexLock l(&mutex_); MutexLock l(&mutex_);
status = versions_->GetMetadataForFile(number, &level, &metadata); status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
if (!status.ok()) { if (!status.ok()) {
Log(options_.info_log, "DeleteFile %s failed. File not found\n", Log(options_.info_log, "DeleteFile %s failed. File not found\n",
name.c_str()); name.c_str());
@ -3826,17 +3827,16 @@ Status DBImpl::DeleteFile(std::string name) {
// This is to make sure that any deletion tombstones are not // This is to make sure that any deletion tombstones are not
// lost. Check that the level passed is the last level. // lost. Check that the level passed is the last level.
for (int i = level + 1; i < maxlevel; i++) { for (int i = level + 1; i < maxlevel; i++) {
if (versions_->current()->NumLevelFiles(i) != 0) { if (cfd->current->NumLevelFiles(i) != 0) {
Log(options_.info_log, Log(options_.info_log,
"DeleteFile %s FAILED. File not in last level\n", name.c_str()); "DeleteFile %s FAILED. File not in last level\n", name.c_str());
return Status::InvalidArgument("File not in last level"); return Status::InvalidArgument("File not in last level");
} }
} }
edit.DeleteFile(level, number); edit.DeleteFile(level, number);
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_, status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
db_directory_.get());
if (status.ok()) { if (status.ok()) {
InstallSuperVersion(default_cfd_, deletion_state); InstallSuperVersion(cfd, deletion_state);
} }
FindObsoleteFiles(deletion_state, false); FindObsoleteFiles(deletion_state, false);
} // lock released here } // lock released here
@ -4010,7 +4010,7 @@ Status DB::OpenWithColumnFamilies(
} }
if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) { if (s.ok() && impl->options_.compaction_style == kCompactionStyleUniversal) {
Version* current = impl->versions_->current(); Version* current = impl->default_cfd_->current;
for (int i = 1; i < impl->NumberLevels(); i++) { for (int i = 1; i < impl->NumberLevels(); i++) {
int num_files = current->NumLevelFiles(i); int num_files = current->NumLevelFiles(i);
if (num_files > 0) { if (num_files > 0) {

View File

@ -258,10 +258,7 @@ class DBImpl : public DB {
return internal_comparator_.user_comparator(); return internal_comparator_.user_comparator();
} }
MemTable* GetMemTable() { ColumnFamilyData* GetDefaultColumnFamily() { return default_cfd_; }
// TODO currently only works for default column family
return default_cfd_->mem;
}
Iterator* NewInternalIterator(const ReadOptions&, Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot); SequenceNumber* latest_snapshot);

View File

@ -56,8 +56,8 @@ Status DBImplReadOnly::Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value) { const Slice& key, std::string* value) {
Status s; Status s;
MemTable* mem = GetMemTable(); MemTable* mem = GetDefaultColumnFamily()->mem;
Version* current = versions_->current(); Version* current = GetDefaultColumnFamily()->current;
SequenceNumber snapshot = versions_->LastSequence(); SequenceNumber snapshot = versions_->LastSequence();
MergeContext merge_context; MergeContext merge_context;
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);

View File

@ -65,7 +65,7 @@ void DBImpl::LogDBDeployStats() {
uint64_t file_total_size = 0; uint64_t file_total_size = 0;
uint32_t file_total_num = 0; uint32_t file_total_num = 0;
Version* current = versions_->current(); Version* current = default_cfd_->current;
for (int i = 0; i < current->NumberLevels(); i++) { for (int i = 0; i < current->NumberLevels(); i++) {
file_total_num += current->NumLevelFiles(i); file_total_num += current->NumLevelFiles(i);
file_total_size += current->NumLevelBytes(i); file_total_size += current->NumLevelBytes(i);

View File

@ -1923,7 +1923,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
return status; return status;
} }
Version* current_version = versions.current(); Version* current_version =
versions.GetColumnFamilySet()->GetDefault()->current;
int current_levels = current_version->NumberLevels(); int current_levels = current_version->NumberLevels();
if (current_levels <= new_levels) { if (current_levels <= new_levels) {
@ -2357,14 +2358,16 @@ void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
} }
Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData* meta) { FileMetaData* meta,
for (auto cfd : *column_family_set_) { ColumnFamilyData** cfd) {
Version* version = cfd->current; for (auto cfd_iter : *column_family_set_) {
Version* version = cfd_iter->current;
for (int level = 0; level < version->NumberLevels(); level++) { for (int level = 0; level < version->NumberLevels(); level++) {
for (const auto& file : version->files_[level]) { for (const auto& file : version->files_[level]) {
if (file->number == number) { if (file->number == number) {
*meta = *file; *meta = *file;
*filelevel = level; *filelevel = level;
*cfd = cfd_iter;
return Status::OK(); return Status::OK();
} }
} }

View File

@ -314,12 +314,6 @@ class VersionSet {
const EnvOptions& storage_options, const EnvOptions& storage_options,
int new_levels); int new_levels);
// Return the current version.
Version* current() const {
// TODO this only works for default column family now
return column_family_set_->GetDefault()->current;
}
// A Flag indicating whether write needs to slowdown because of there are // A Flag indicating whether write needs to slowdown because of there are
// too many number of level0 files. // too many number of level0 files.
bool NeedSlowdownForNumLevel0Files() const { bool NeedSlowdownForNumLevel0Files() const {
@ -418,8 +412,8 @@ class VersionSet {
void ReleaseCompactionFiles(Compaction* c, Status status); void ReleaseCompactionFiles(Compaction* c, Status status);
Status GetMetadataForFile( Status GetMetadataForFile(uint64_t number, int* filelevel,
uint64_t number, int *filelevel, FileMetaData *metadata); FileMetaData* metadata, ColumnFamilyData** cfd);
void GetLiveFilesMetaData( void GetLiveFilesMetaData(
std::vector<LiveFileMetaData> *metadata); std::vector<LiveFileMetaData> *metadata);

View File

@ -1026,8 +1026,9 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
return st; return st;
} }
int max = -1; int max = -1;
auto default_cfd = versions.GetColumnFamilySet()->GetDefault();
for (int i = 0; i < versions.NumberLevels(); i++) { for (int i = 0; i < versions.NumberLevels(); i++) {
if (versions.current()->NumLevelFiles(i)) { if (default_cfd->current->NumLevelFiles(i)) {
max = i; max = i;
} }
} }