[CF] Rething LogAndApply for column families

Summary:
I though I might get away with as little changes to LogAndApply() as possible. It turns out this is not the case.

This diff introduces different behavior of LogAndApply() for three cases:
1. column family add
2. column family drop
3. no-column family manipulation

(1) and (2) don't support group commit yet.

There were a lot of problems with old version od LogAndApply, detected by db_stress. The biggest was non-atomicity of manifest writes and metadata changes (i.e. if column family add is in manifest, it also has to be in in-memory data structure).

Test Plan: db_stress

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16491
This commit is contained in:
Igor Canadi 2014-02-28 14:05:11 -08:00
parent 12966ec1bb
commit 8ea21a778b
6 changed files with 104 additions and 51 deletions

View File

@ -333,6 +333,15 @@ ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
}
}
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
const {
auto cfd_iter = column_families_.find(name);
if (cfd_iter == column_families_.end()) {
return nullptr;
}
return GetColumnFamily(cfd_iter->second);
}
bool ColumnFamilySet::Exists(uint32_t id) {
return column_family_data_.find(id) != column_family_data_.end();
}

View File

@ -264,6 +264,7 @@ class ColumnFamilySet {
ColumnFamilyData* GetDefault() const;
// GetColumnFamily() calls return nullptr if column family is not found
ColumnFamilyData* GetColumnFamily(uint32_t id) const;
ColumnFamilyData* GetColumnFamily(const std::string& name) const;
bool Exists(uint32_t id);
bool Exists(const std::string& name);
uint32_t GetID(const std::string& name);

View File

@ -3068,9 +3068,12 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
edit.SetLogNumber(logfile_number_);
edit.SetComparatorName(options.comparator->Name());
Status s = versions_->LogAndApply(default_cf_handle_->cfd(), &edit, &mutex_);
Status s = versions_->LogAndApply(nullptr, &edit, &mutex_,
db_directory_.get(), false, &options);
if (s.ok()) {
auto cfd = versions_->CreateColumnFamily(options, &edit);
auto cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
Log(options_.info_log, "Created column family \"%s\" (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
@ -3098,16 +3101,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
s = Status::InvalidArgument("Column family already dropped!\n");
}
if (s.ok()) {
cfd->SetDropped();
s = versions_->LogAndApply(cfd, &edit, &mutex_);
}
if (s.ok()) {
// DB is holding one reference to each column family when it's alive,
// need to drop it now
if (cfd->Unref()) {
delete cfd;
}
}
if (s.ok()) {
Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID());

View File

@ -101,6 +101,10 @@ class VersionEdit {
return new_files_.size() + deleted_files_.size();
}
bool IsColumnFamilyManipulation() {
return is_column_family_add_ || is_column_family_drop_;
}
void SetColumnFamily(uint32_t column_family_id) {
column_family_ = column_family_id;
}

View File

@ -1485,15 +1485,20 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
VersionEdit* edit, port::Mutex* mu,
Directory* db_directory,
bool new_descriptor_log) {
Directory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* options) {
mu->AssertHeld();
if (column_family_data->IsDropped() && !edit->is_column_family_drop_) {
assert(column_family_data != nullptr || edit->is_column_family_add_);
if (column_family_data != nullptr && column_family_data->IsDropped()) {
// if column family is dropped no need to write anything to the manifest
// (unless, of course, thit is the drop column family write)
return Status::OK();
}
if (edit->is_column_family_drop_) {
column_family_data->SetDropped();
}
// queue our request
ManifestWriter w(mu, column_family_data, edit);
@ -1506,23 +1511,36 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
std::vector<VersionEdit*> batch_edits;
Version* v = new Version(column_family_data, this, current_version_number_++);
Builder builder(column_family_data);
Version* v = nullptr;
std::unique_ptr<Builder> builder(nullptr);
// process all requests in the queue
ManifestWriter* last_writer = &w;
assert(!manifest_writers_.empty());
assert(manifest_writers_.front() == &w);
for (const auto& writer : manifest_writers_) {
if (writer->cfd->GetID() != column_family_data->GetID()) {
// group commits across column families are not yet supported
break;
if (edit->IsColumnFamilyManipulation()) {
// no group commits for column family add or drop
last_writer = &w;
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
batch_edits.push_back(edit);
} else {
v = new Version(column_family_data, this, current_version_number_++);
builder.reset(new Builder(column_family_data));
for (const auto& writer : manifest_writers_) {
if (writer->edit->IsColumnFamilyManipulation() ||
writer->cfd->GetID() != column_family_data->GetID()) {
// no group commits for column family add or drop
// also, group commits across column families are not supported
break;
}
last_writer = writer;
LogAndApplyHelper(column_family_data, builder.get(), v, last_writer->edit,
mu);
batch_edits.push_back(last_writer->edit);
}
last_writer = writer;
LogAndApplyHelper(column_family_data, &builder, v, last_writer->edit, mu);
batch_edits.push_back(last_writer->edit);
builder->SaveTo(v);
}
builder.SaveTo(v);
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
@ -1547,17 +1565,20 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// Unlock during expensive operations. New writes cannot get here
// because &w is ensuring that all new writes get queued.
{
// calculate the amount of data being compacted at every level
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
column_family_data->compaction_picker()->SizeBeingCompacted(
size_being_compacted);
std::vector<uint64_t> size_being_compacted;
if (!edit->IsColumnFamilyManipulation()) {
size_being_compacted.resize(v->NumberLevels() - 1);
// calculate the amount of data being compacted at every level
column_family_data->compaction_picker()->SizeBeingCompacted(
size_being_compacted);
}
mu->Unlock();
if (options_->max_open_files == -1) {
if (!edit->IsColumnFamilyManipulation() && options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
builder.LoadTableHandlers();
builder->LoadTableHandlers();
}
// This is fine because everything inside of this block is serialized --
@ -1573,10 +1594,12 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
}
}
// The calls to Finalize and UpdateFilesBySize are cpu-heavy
// and is best called outside the mutex.
v->Finalize(size_being_compacted);
v->UpdateFilesBySize();
if (!edit->IsColumnFamilyManipulation()) {
// The calls to Finalize and UpdateFilesBySize are cpu-heavy
// and is best called outside the mutex.
v->Finalize(size_being_compacted);
v->UpdateFilesBySize();
}
// Write new record to MANIFEST log
if (s.ok()) {
@ -1650,11 +1673,23 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
// Install the new version
if (s.ok()) {
manifest_file_size_ = new_manifest_file_size;
AppendVersion(column_family_data, v);
column_family_data->SetLogNumber(edit->log_number_);
prev_log_number_ = edit->prev_log_number_;
if (edit->is_column_family_add_) {
// no group commit on column family add
assert(batch_edits.size() == 1);
assert(options != nullptr);
CreateColumnFamily(*options, edit);
} else if (edit->is_column_family_drop_) {
assert(batch_edits.size() == 1);
if (column_family_data->Unref()) {
delete column_family_data;
}
} else {
column_family_data->SetLogNumber(batch_edits.back()->log_number_);
AppendVersion(column_family_data, v);
}
manifest_file_size_ = new_manifest_file_size;
prev_log_number_ = edit->prev_log_number_;
} else {
Log(options_->info_log, "Error in committing version %lu",
(unsigned long)v->GetVersionNumber());
@ -1694,19 +1729,14 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder,
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
if (edit->is_column_family_add_) {
assert(edit->has_log_number_);
if (edit->has_log_number_) {
assert(edit->log_number_ >= cfd->GetLogNumber());
} else {
if (edit->has_log_number_) {
assert(edit->log_number_ >= cfd->GetLogNumber());
} else {
edit->SetLogNumber(cfd->GetLogNumber());
}
builder->Apply(edit);
edit->SetLogNumber(cfd->GetLogNumber());
}
assert(edit->log_number_ < next_file_number_);
builder->Apply(edit);
}
Status VersionSet::Recover(
@ -2013,9 +2043,20 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
break;
}
if (edit.is_column_family_add_) {
if (column_family_names.find(edit.column_family_) !=
column_family_names.end()) {
s = Status::Corruption("Manifest adding the same column family twice");
break;
}
column_family_names.insert(
{edit.column_family_, edit.column_family_name_});
} else if (edit.is_column_family_drop_) {
if (column_family_names.find(edit.column_family_) ==
column_family_names.end()) {
s = Status::Corruption(
"Manifest - dropping non-existing column family");
break;
}
column_family_names.erase(edit.column_family_);
}
}

View File

@ -296,11 +296,14 @@ class VersionSet {
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
// current version. Will release *mu while actually writing to the file.
// column_family_options has to be set if edit is column family add
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit,
port::Mutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false);
bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options =
nullptr);
// Recover the last saved descriptor from persistent storage.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families);
@ -401,9 +404,6 @@ class VersionSet {
void GetObsoleteFiles(std::vector<FileMetaData*>* files);
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options,
VersionEdit* edit);
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
private:
@ -426,6 +426,9 @@ class VersionSet {
bool ManifestContains(const std::string& record) const;
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options,
VersionEdit* edit);
std::unique_ptr<ColumnFamilySet> column_family_set_;
Env* const env_;