Refactor to use VersionSet [CF + RepairDB part 1/3]

Summary:
To support column families, it is easiest to use VersionSet to manage
our column families (if we don't have Versions then ColumnFamilyData always
behaves as a dummy column family). This diff only refactors the existing repair
logic to use VersionSet; the next two parts will add support for multiple
column families.

Test Plan:
  $ ./repair_test

Reviewers: yhchiang, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D59775
This commit is contained in:
Andrew Kryczka 2016-06-24 11:19:40 -07:00
parent aa432be4b9
commit 343507afb1
3 changed files with 80 additions and 67 deletions

View File

@ -491,6 +491,8 @@ class DBImpl : public DB {
void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
void MarkLogAsContainingPrepSection(uint64_t log);
Status NewDB();
protected:
Env* const env_;
const std::string dbname_;
@ -559,8 +561,6 @@ class DBImpl : public DB {
struct PurgeFileInfo;
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.

View File

@ -94,32 +94,69 @@ class Repairer {
: dbname_(dbname),
env_(options.env),
icmp_(options.comparator),
cf_options_(options),
options_(SanitizeOptions(dbname, &icmp_, options)),
ioptions_(options_),
env_options_(),
raw_table_cache_(
// TableCache can be small since we expect each table to be opened
// once.
NewLRUCache(10, options_.table_cache_numshardbits)),
table_cache_(
new TableCache(ioptions_, env_options_, raw_table_cache_.get())),
wb_(options_.db_write_buffer_size),
wc_(options_.delayed_write_rate),
vset_(dbname_, &options_, env_options_, raw_table_cache_.get(), &wb_,
&wc_),
next_file_number_(1) {
GetIntTblPropCollectorFactory(options, &int_tbl_prop_collector_factories_);
}
table_cache_ =
new TableCache(ioptions_, env_options_, raw_table_cache_.get());
edit_ = new VersionEdit();
// Adds a column family to the VersionSet with cf_options_ and updates
// manifest.
Status AddColumnFamily(const std::string& cf_name, uint32_t cf_id) {
MutableCFOptions mut_cf_opts(options_, ioptions_);
VersionEdit edit;
edit.SetComparatorName(icmp_.user_comparator()->Name());
edit.SetLogNumber(0);
edit.SetColumnFamily(cf_id);
ColumnFamilyData* cfd;
cfd = nullptr;
edit.AddColumnFamily(cf_name);
mutex_.Lock();
Status status = vset_.LogAndApply(
cfd, mut_cf_opts, &edit, &mutex_, nullptr /* db_directory */,
false /* new_descriptor_log */, &cf_options_);
mutex_.Unlock();
return status;
}
~Repairer() {
delete table_cache_;
raw_table_cache_.reset();
delete edit_;
}
Status Run() {
Status status = FindFiles();
if (status.ok()) {
// Discard older manifests and start a fresh one
for (size_t i = 0; i < manifests_.size(); i++) {
ArchiveFile(dbname_ + "/" + manifests_[i]);
}
// Just create a DBImpl temporarily so we can reuse NewDB()
DBImpl* db_impl = new DBImpl(options_, dbname_);
status = db_impl->NewDB();
delete db_impl;
}
if (status.ok()) {
// Recover using the fresh manifest created by NewDB()
status = vset_.Recover({{kDefaultColumnFamilyName, cf_options_}}, false);
}
if (status.ok()) {
ConvertLogFilesToTables();
ExtractMetaData();
status = WriteDescriptor();
status = AddTables();
}
if (status.ok()) {
uint64_t bytes = 0;
@ -149,18 +186,22 @@ class Repairer {
const InternalKeyComparator icmp_;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories_;
const Options options_;
const ImmutableCFOptions ioptions_;
const ColumnFamilyOptions cf_options_; // unsanitized
const Options options_; // sanitized
const ImmutableCFOptions ioptions_; // sanitized
const EnvOptions env_options_;
std::shared_ptr<Cache> raw_table_cache_;
TableCache* table_cache_;
VersionEdit* edit_;
WriteBuffer wb_;
WriteController wc_;
VersionSet vset_;
InstrumentedMutex mutex_;
std::vector<std::string> manifests_;
std::vector<FileDescriptor> table_fds_;
std::vector<uint64_t> logs_;
std::vector<TableInfo> tables_;
uint64_t next_file_number_;
const EnvOptions env_options_;
Status FindFiles() {
std::vector<std::string> filenames;
@ -246,7 +287,7 @@ class Repairer {
reporter.env = env_;
reporter.info_log = options_.info_log;
reporter.lognum = log;
// We intentially make log::Reader do checksumming so that
// We intentionally make log::Reader do checksumming so that
// corruptions cause entire commits to be skipped instead of
// propagating bad information (like overly large sequence
// numbers).
@ -384,62 +425,34 @@ class Repairer {
return status;
}
Status WriteDescriptor() {
std::string tmp = TempFileName(dbname_, 1);
unique_ptr<WritableFile> file;
EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
Status status = env_->NewWritableFile(tmp, &file, env_options);
if (!status.ok()) {
return status;
}
Status AddTables() {
SequenceNumber max_sequence = 0;
for (size_t i = 0; i < tables_.size(); i++) {
if (max_sequence < tables_[i].max_sequence) {
max_sequence = tables_[i].max_sequence;
}
}
vset_.SetLastSequence(max_sequence);
edit_->SetComparatorName(icmp_.user_comparator()->Name());
edit_->SetLogNumber(0);
edit_->SetNextFile(next_file_number_);
edit_->SetLastSequence(max_sequence);
auto* cfd = vset_.GetColumnFamilySet()->GetDefault();
VersionEdit edit;
edit.SetComparatorName(cfd->user_comparator()->Name());
edit.SetLogNumber(0);
edit.SetNextFile(next_file_number_);
edit.SetColumnFamily(cfd->GetID());
for (size_t i = 0; i < tables_.size(); i++) {
// TODO(opt): separate out into multiple levels
const TableInfo& t = tables_[i];
edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest,
t.min_sequence, t.max_sequence,
t.meta.marked_for_compaction);
}
//fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
{
unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), env_options));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
edit_->EncodeTo(&record);
status = log.AddRecord(record);
}
if (!status.ok()) {
env_->DeleteFile(tmp);
} else {
// Discard older manifests
for (size_t i = 0; i < manifests_.size(); i++) {
ArchiveFile(dbname_ + "/" + manifests_[i]);
}
// Install new manifest
status = env_->RenameFile(tmp, DescriptorFileName(dbname_, 1));
if (status.ok()) {
status = SetCurrentFile(env_, dbname_, 1, nullptr);
} else {
env_->DeleteFile(tmp);
}
// TODO(opt): separate out into multiple levels
for (const auto& table : tables_) {
edit.AddFile(0, table.meta.fd.GetNumber(), table.meta.fd.GetPathId(),
table.meta.fd.GetFileSize(), table.meta.smallest,
table.meta.largest, table.min_sequence, table.max_sequence,
table.meta.marked_for_compaction);
}
mutex_.Lock();
Status status = vset_.LogAndApply(
cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
nullptr /* db_directory */, false /* new_descriptor_log */);
mutex_.Unlock();
return status;
}
@ -464,7 +477,7 @@ class Repairer {
fname.c_str(), s.ToString().c_str());
}
};
} // namespace
} // anonymous namespace
Status RepairDB(const std::string& dbname, const Options& options) {
Repairer repairer(dbname, options);

View File

@ -49,7 +49,7 @@ TEST_F(RepairTest, LostManifest) {
Close();
ASSERT_OK(env_->FileExists(manifest_path));
ASSERT_OK(env_->DeleteFile(manifest_path));
RepairDB(dbname_, CurrentOptions());
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions());
ASSERT_EQ(Get("key"), "val");
@ -70,7 +70,7 @@ TEST_F(RepairTest, CorruptManifest) {
Close();
ASSERT_OK(env_->FileExists(manifest_path));
CreateFile(env_, manifest_path, "blah");
RepairDB(dbname_, CurrentOptions());
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions());
ASSERT_EQ(Get("key"), "val");
@ -96,7 +96,7 @@ TEST_F(RepairTest, IncompleteManifest) {
ASSERT_OK(env_->FileExists(new_manifest_path));
// Replace the manifest with one that is only aware of the first SST file.
CopyFile(orig_manifest_path + ".tmp", new_manifest_path);
RepairDB(dbname_, CurrentOptions());
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions());
ASSERT_EQ(Get("key"), "val");
@ -115,7 +115,7 @@ TEST_F(RepairTest, LostSst) {
ASSERT_OK(env_->DeleteFile(sst_path));
Close();
RepairDB(dbname_, CurrentOptions());
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions());
// Exactly one of the key-value pairs should be in the DB now.
@ -134,7 +134,7 @@ TEST_F(RepairTest, CorruptSst) {
CreateFile(env_, sst_path, "blah");
Close();
RepairDB(dbname_, CurrentOptions());
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions());
// Exactly one of the key-value pairs should be in the DB now.
@ -159,7 +159,7 @@ TEST_F(RepairTest, UnflushedSst) {
Close();
ASSERT_OK(env_->FileExists(manifest_path));
ASSERT_OK(env_->DeleteFile(manifest_path));
RepairDB(dbname_, CurrentOptions());
ASSERT_OK(RepairDB(dbname_, CurrentOptions()));
Reopen(CurrentOptions());
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));