CF cleanup part 2
This commit is contained in:
parent
f071a20f6e
commit
f0e1e3ebf1
@ -1489,12 +1489,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
const ColumnFamilyOptions* options) {
|
||||
mu->AssertHeld();
|
||||
|
||||
assert(column_family_data != nullptr || edit->is_column_family_add_);
|
||||
|
||||
if (edit->is_column_family_drop_) {
|
||||
// if we drop column family, we have to make sure to save max column family,
|
||||
// so that we don't reuse existing ID
|
||||
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
|
||||
// column_family_data can be nullptr only if this is column_family_add.
|
||||
// in that case, we also need to specify ColumnFamilyOptions
|
||||
if (column_family_data == nullptr) {
|
||||
assert(edit->is_column_family_add_);
|
||||
assert(options != nullptr);
|
||||
}
|
||||
|
||||
// queue our request
|
||||
@ -1527,9 +1526,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
assert(manifest_writers_.front() == &w);
|
||||
if (edit->IsColumnFamilyManipulation()) {
|
||||
// no group commits for column family add or drop
|
||||
last_writer = &w;
|
||||
edit->SetNextFile(next_file_number_);
|
||||
edit->SetLastSequence(last_sequence_);
|
||||
LogAndApplyCFHelper(edit);
|
||||
batch_edits.push_back(edit);
|
||||
} else {
|
||||
v = new Version(column_family_data, this, current_version_number_++);
|
||||
@ -1567,6 +1564,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
if (new_descriptor_log) {
|
||||
new_manifest_filename = DescriptorFileName(dbname_, manifest_file_number_);
|
||||
edit->SetNextFile(next_file_number_);
|
||||
// if we're writing out new snapshot make sure to persist max column
|
||||
// family
|
||||
if (column_family_set_->GetMaxColumnFamily() > 0) {
|
||||
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock during expensive operations. New writes cannot get here
|
||||
@ -1590,7 +1592,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
|
||||
// This is fine because everything inside of this block is serialized --
|
||||
// only one thread can be here at the same time
|
||||
if (!new_manifest_filename.empty()) {
|
||||
if (new_descriptor_log) {
|
||||
unique_ptr<WritableFile> descriptor_file;
|
||||
s = env_->NewWritableFile(new_manifest_filename, &descriptor_file,
|
||||
storage_options_.AdaptForLogWrite());
|
||||
@ -1725,16 +1727,22 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
||||
return s;
|
||||
}
|
||||
|
||||
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
|
||||
assert(edit->IsColumnFamilyManipulation());
|
||||
edit->SetNextFile(next_file_number_);
|
||||
edit->SetLastSequence(last_sequence_);
|
||||
if (edit->is_column_family_drop_) {
|
||||
// if we drop column family, we have to make sure to save max column family,
|
||||
// so that we don't reuse existing ID
|
||||
edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
|
||||
}
|
||||
}
|
||||
|
||||
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder,
|
||||
Version* v, VersionEdit* edit,
|
||||
port::Mutex* mu) {
|
||||
mu->AssertHeld();
|
||||
|
||||
if (!edit->has_prev_log_number_) {
|
||||
edit->SetPrevLogNumber(prev_log_number_);
|
||||
}
|
||||
edit->SetNextFile(next_file_number_);
|
||||
edit->SetLastSequence(last_sequence_);
|
||||
assert(!edit->IsColumnFamilyManipulation());
|
||||
|
||||
if (edit->has_log_number_) {
|
||||
assert(edit->log_number_ >= cfd->GetLogNumber());
|
||||
@ -1743,6 +1751,12 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder,
|
||||
}
|
||||
assert(edit->log_number_ < next_file_number_);
|
||||
|
||||
if (!edit->has_prev_log_number_) {
|
||||
edit->SetPrevLogNumber(prev_log_number_);
|
||||
}
|
||||
edit->SetNextFile(next_file_number_);
|
||||
edit->SetLastSequence(last_sequence_);
|
||||
|
||||
builder->Apply(edit);
|
||||
}
|
||||
|
||||
@ -1806,17 +1820,16 @@ Status VersionSet::Recover(
|
||||
std::unordered_map<uint32_t, Builder*> builders;
|
||||
|
||||
// add default column family
|
||||
auto default_cf_iter = cf_name_to_options.find(default_column_family_name);
|
||||
if (default_cf_iter == cf_name_to_options.end()) {
|
||||
return Status::InvalidArgument("Default column family not specified");
|
||||
}
|
||||
VersionEdit default_cf_edit;
|
||||
default_cf_edit.AddColumnFamily(default_column_family_name);
|
||||
default_cf_edit.SetColumnFamily(0);
|
||||
auto default_cf_iter = cf_name_to_options.find(default_column_family_name);
|
||||
if (default_cf_iter == cf_name_to_options.end()) {
|
||||
column_families_not_found.insert(0);
|
||||
} else {
|
||||
ColumnFamilyData* default_cfd =
|
||||
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
|
||||
builders.insert({0, new Builder(default_cfd)});
|
||||
}
|
||||
|
||||
{
|
||||
VersionSet::LogReporter reporter;
|
||||
@ -2357,9 +2370,11 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
||||
Status VersionSet::WriteSnapshot(log::Writer* log) {
|
||||
// TODO: Break up into multiple records to reduce memory usage on recovery?
|
||||
|
||||
// WARNING: This method doesn't hold a mutex!!
|
||||
|
||||
// This is done without DB mutex lock held, but only within single-threaded
|
||||
// LogAndApply. Column family manipulations can only happen within LogAndApply
|
||||
// (the same single thread), so we're safe
|
||||
// (the same single thread), so we're safe to iterate.
|
||||
for (auto cfd : *column_family_set_) {
|
||||
{
|
||||
// Store column family info
|
||||
@ -2406,18 +2421,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// persist max column family, last sequence and next file
|
||||
VersionEdit edit;
|
||||
if (column_family_set_->GetMaxColumnFamily() > 0) {
|
||||
edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
|
||||
}
|
||||
edit.SetLastSequence(last_sequence_);
|
||||
edit.SetNextFile(next_file_number_);
|
||||
std::string record;
|
||||
edit.EncodeTo(&record);
|
||||
return log->AddRecord(record);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Opens the mainfest file and reads all records
|
||||
|
@ -466,6 +466,7 @@ class VersionSet {
|
||||
VersionSet(const VersionSet&);
|
||||
void operator=(const VersionSet&);
|
||||
|
||||
void LogAndApplyCFHelper(VersionEdit* edit);
|
||||
void LogAndApplyHelper(ColumnFamilyData* cfd, Builder* b, Version* v,
|
||||
VersionEdit* edit, port::Mutex* mu);
|
||||
};
|
||||
|
@ -177,7 +177,6 @@ void WriteBatch::Put(uint32_t column_family_id, const Slice& key,
|
||||
const Slice& value) {
|
||||
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
|
||||
if (column_family_id == 0) {
|
||||
// save some data on disk by not writing default column family
|
||||
rep_.push_back(static_cast<char>(kTypeValue));
|
||||
} else {
|
||||
rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
|
||||
|
Loading…
Reference in New Issue
Block a user