Refactor Recover() code

Summary:
This diff does two things:
* Rethinks how we call Recover() with read_only option. Before, we call it with pointer to memtable where we'd like to apply those changes to. This memtable is set in db_impl_readonly.cc and it's actually DBImpl::mem_. Why don't we just apply updates to mem_ right away? It seems more intuitive.
* Changes when we apply updates to manifest. Before, the process is to recover all the logs, flush it to sst files and then do one giant commit that atomically adds all recovered sst files and sets the next log number. This works good enough, but causes some small troubles for my column family approach, since I can't have one VersionEdit apply to more than single column family[1]. The change here is to commit the files recovered from logs right away. Here is the state of the world before the change:
1. Recover log 5, add new sst files to edit
2. Recover log 7, add new sst files to edit
3. Recover log 8, add new sst files to edit
4. Commit all added sst files to manifest and mark log files 5, 7 and 8 as recoverd (via SetLogNumber(9) function)
After the change, we'll do:
1. Recover log 5, commit the new sst files and set log 5 as recovered
2. Recover log 7, commit the new sst files and set log 7 as recovered
3. Recover log 8, commit the new sst files and set log 8 as recovered

The added (small) benefit is that if we fail after (2), the new recovery will only have to recover log 8. In previous case, we'll have to restart the recovery from the beginning. The bigger benefit will be to enable easier integration of multiple column families in Recovery code path.

[1] I'm happy to dicuss this decison, but I believe this is the cleanest way to go. It also makes backward compatibility much easier. We don't have a requirement of adding multiple column families atomically.

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong

Reviewed By: kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15237
This commit is contained in:
Igor Canadi 2014-01-22 10:45:26 -08:00
parent 4036d58dc9
commit 6fe9b57748
4 changed files with 150 additions and 62 deletions

View File

@ -860,14 +860,11 @@ void DBImpl::PurgeObsoleteWALFiles() {
}
}
// If externalTable is set, then apply recovered transactions
// to that table. This is used for readonly mode.
Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
bool error_if_log_file_exist) {
Status DBImpl::Recover(bool read_only, bool error_if_log_file_exist) {
mutex_.AssertHeld();
assert(db_lock_ == nullptr);
if (!external_table) {
if (!read_only) {
// We call CreateDirIfMissing() as the directory may already exist (if we
// are reopening a DB), when this happens we don't want creating the
// directory to cause an error. However, we need to check if creating the
@ -948,12 +945,12 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence, external_table);
for (size_t i = 0; s.ok() && i < logs.size(); i++) {
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
s = RecoverLogFile(logs[i], &max_sequence, read_only);
}
if (s.ok()) {
@ -968,10 +965,8 @@ Status DBImpl::Recover(VersionEdit* edit, MemTable* external_table,
return s;
}
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence,
MemTable* external_table) {
Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@ -988,6 +983,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
mutex_.AssertHeld();
VersionEdit edit;
// Open the log file
std::string fname = LogFileName(options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
@ -1017,11 +1014,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = nullptr;
if (external_table) {
mem = external_table;
}
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
bool memtable_empty = true;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
@ -1029,14 +1023,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) {
mem = new MemTable(internal_comparator_, options_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
status = WriteBatchInternal::InsertInto(&batch, mem_, &options_);
memtable_empty = false;
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
return status;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
@ -1045,28 +1036,44 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
*max_sequence = last_seq;
}
if (!external_table &&
mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem, edit);
if (!read_only &&
mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem_, &edit);
// we still want to clear memtable, even if the recovery failed
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
memtable_empty = true;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
return status;
}
delete mem->Unref();
mem = nullptr;
}
}
if (status.ok() && mem != nullptr && !external_table) {
status = WriteLevel0TableForRecovery(mem, edit);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
if (!memtable_empty && !read_only) {
status = WriteLevel0TableForRecovery(mem_, &edit);
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
if (!status.ok()) {
return status;
}
}
if (mem != nullptr && !external_table) {
delete mem->Unref();
if (edit.NumEntries() > 0) {
// if read_only, NumEntries() will be 0
assert(!read_only);
// writing log number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit.SetLogNumber(log_number + 1);
status = versions_->LogAndApply(&edit, &mutex_);
}
return status;
}
@ -3826,8 +3833,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
return s;
}
impl->mutex_.Lock();
VersionEdit edit;
s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
s = impl->Recover(); // Handles create_if_missing, error_if_exists
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
@ -3839,6 +3845,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
);
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
VersionEdit edit;
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));

View File

@ -262,10 +262,8 @@ class DBImpl : public DB {
Status NewDB();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit, MemTable* external_table = nullptr,
bool error_if_log_file_exist = false);
// amount of work to recover recently logged updates.
Status Recover(bool read_only = false, bool error_if_log_file_exist = false);
void MaybeIgnoreError(Status* s) const;
@ -279,10 +277,8 @@ class DBImpl : public DB {
Status FlushMemTableToOutputFile(bool* madeProgress,
DeletionState& deletion_state);
Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence,
MemTable* external_table);
Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only);
// The following two methods are used to flush a memtable to
// storage. The first one is used atdatabase RecoveryTime (when the

View File

@ -86,9 +86,7 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
Status s = impl->Recover(&edit, impl->GetMemTable(),
error_if_log_file_exist);
Status s = impl->Recover(true /* read only */, error_if_log_file_exist);
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;

View File

@ -669,6 +669,31 @@ class DBTest {
ASSERT_EQ(IterStatus(iter), expected_key);
delete iter;
}
void CopyFile(const std::string& source, const std::string& destination,
uint64_t size = 0) {
const EnvOptions soptions;
unique_ptr<SequentialFile> srcfile;
ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
unique_ptr<WritableFile> destfile;
ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
if (size == 0) {
// default argument means copy everything
ASSERT_OK(env_->GetFileSize(source, &size));
}
char buffer[4096];
Slice slice;
while (size > 0) {
uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
ASSERT_OK(srcfile->Read(one, &slice, buffer));
ASSERT_OK(destfile->Append(slice));
size -= slice.size();
}
ASSERT_OK(destfile->Close());
}
};
static std::string Key(int i) {
@ -1471,6 +1496,82 @@ TEST(DBTest, Recover) {
} while (ChangeOptions());
}
TEST(DBTest, IgnoreRecoveredLog) {
std::string backup_logs = dbname_ + "/backup_logs";
// delete old files in backup_logs directory
env_->CreateDirIfMissing(backup_logs);
std::vector<std::string> old_files;
env_->GetChildren(backup_logs, &old_files);
for (auto& file : old_files) {
if (file != "." && file != "..") {
env_->DeleteFile(backup_logs + "/" + file);
}
}
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
options.wal_dir = dbname_ + "/logs";
DestroyAndReopen(&options);
// fill up the DB
std::string one, two;
PutFixed64(&one, 1);
PutFixed64(&two, 2);
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
// copy the logs to backup
std::vector<std::string> logs;
env_->GetChildren(options.wal_dir, &logs);
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
}
}
// recover the DB
Reopen(&options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
// copy the logs from backup back to wal dir
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
}
}
// this should ignore the log files, recovery should not happen again
// if the recovery happens, the same merge operator would be called twice,
// leading to incorrect results
Reopen(&options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
Destroy(&options);
// copy the logs from backup back to wal dir
env_->CreateDirIfMissing(options.wal_dir);
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
// we won't be needing this file no more
env_->DeleteFile(backup_logs + "/" + log);
}
}
// assert that we successfully recovered only from logs, even though we
// destroyed the DB
Reopen(&options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
} while (ChangeOptions());
}
TEST(DBTest, RollLog) {
do {
ASSERT_OK(Put("foo", "v1"));
@ -3613,7 +3714,6 @@ TEST(DBTest, BloomFilter) {
TEST(DBTest, SnapshotFiles) {
do {
Options options = CurrentOptions();
const EnvOptions soptions;
options.write_buffer_size = 100000000; // Large write buffer
Reopen(&options);
@ -3669,20 +3769,7 @@ TEST(DBTest, SnapshotFiles) {
}
}
}
unique_ptr<SequentialFile> srcfile;
ASSERT_OK(env_->NewSequentialFile(src, &srcfile, soptions));
unique_ptr<WritableFile> destfile;
ASSERT_OK(env_->NewWritableFile(dest, &destfile, soptions));
char buffer[4096];
Slice slice;
while (size > 0) {
uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
ASSERT_OK(srcfile->Read(one, &slice, buffer));
ASSERT_OK(destfile->Append(slice));
size -= slice.size();
}
ASSERT_OK(destfile->Close());
CopyFile(src, dest, size);
}
// release file snapshot