Revert "Skip deleted WALs during recovery"

This reverts commit 140c308f6f.
This commit is contained in:
Maysam Yabandeh 2018-04-23 12:19:16 -07:00
parent 544978d935
commit e049d5a12a
11 changed files with 11 additions and 246 deletions

View File

@ -72,23 +72,19 @@ TEST_F(DBFlushTest, SyncFail) {
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd();
int refs_before = cfd->current()->TEST_refs();
FlushOptions flush_options;
flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options));
// Flush installs a new super-version. Get the ref count after that.
auto current_before = cfd->current();
int refs_before = cfd->current()->TEST_refs();
fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true);
// Now the background job will do the flush; wait for it.
dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE
ASSERT_EQ("", FilesPerLevel()); // flush failed.
#endif // ROCKSDB_LITE
// Backgroun flush job should release ref count to current version.
ASSERT_EQ(current_before, cfd->current());
// Flush job should release ref count to current version.
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options);
}

View File

@ -332,8 +332,6 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
}; // namespace
// Delete obsolete files and log status and information of file deletion
// Note: All WAL files must be deleted through this function (unelss they are
// archived) to ensure that maniefest is updated properly.
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
FileType type, uint64_t number,
uint32_t path_id) {
@ -342,16 +340,6 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
file_deletion_status =
DeleteSSTFile(&immutable_db_options_, fname, path_id);
} else {
if (type == kLogFile) {
// Before deleting the file, mark file as deleted in the manifest
VersionEdit edit;
edit.SetDeletedLogNumber(number);
auto edit_cfd = versions_->GetColumnFamilySet()->GetDefault();
auto edit_cf_opts = edit_cfd->GetLatestMutableCFOptions();
mutex_.Lock();
versions_->LogAndApply(edit_cfd, *edit_cf_opts, &edit, &mutex_);
mutex_.Unlock();
}
file_deletion_status = env_->DeleteFile(fname);
}
TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",

View File

@ -528,13 +528,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool flushed = false;
uint64_t corrupted_log_number = kMaxSequenceNumber;
for (auto log_number : log_numbers) {
if (log_number <= versions_->latest_deleted_log_number()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is not newer than latest deleted log #%" PRIu64,
log_number, versions_->latest_deleted_log_number());
continue;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.

View File

@ -451,9 +451,8 @@ class SpecialEnv : public EnvWrapper {
return s;
}
virtual Status NewSequentialFile(const std::string& f,
unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public SequentialFile {
public:
CountingFile(unique_ptr<SequentialFile>&& target,

View File

@ -20,106 +20,6 @@ class DBWALTest : public DBTestBase {
DBWALTest() : DBTestBase("/db_wal_test") {}
};
// A SpecialEnv enriched to give more insight about deleted files
class EnrichedSpecialEnv : public SpecialEnv {
public:
explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
InstrumentedMutexLock l(&env_mutex_);
if (f == skipped_wal) {
deleted_wal_reopened = true;
if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
f.compare(largetest_deleted_wal) <= 0) {
gap_in_wals = true;
}
}
return SpecialEnv::NewSequentialFile(f, r, soptions);
}
Status DeleteFile(const std::string& fname) override {
if (IsWAL(fname)) {
deleted_wal_cnt++;
InstrumentedMutexLock l(&env_mutex_);
// If this is the first WAL, remember its name and skip deleting it. We
// remember its name partly because the application might attempt to
// delete the file again.
if (skipped_wal.size() != 0 && skipped_wal != fname) {
if (largetest_deleted_wal.size() == 0 ||
largetest_deleted_wal.compare(fname) < 0) {
largetest_deleted_wal = fname;
}
} else {
skipped_wal = fname;
return Status::OK();
}
}
return SpecialEnv::DeleteFile(fname);
}
bool IsWAL(const std::string& fname) {
// printf("iswal %s\n", fname.c_str());
return fname.compare(fname.size() - 3, 3, "log") == 0;
}
InstrumentedMutex env_mutex_;
// the wal whose actual delete was skipped by the env
std::string skipped_wal = "";
// the largest WAL that was requested to be deleted
std::string largetest_deleted_wal = "";
// number of WALs that were successfully deleted
std::atomic<size_t> deleted_wal_cnt = {0};
// the WAL whose delete from fs was skipped is reopened during recovery
std::atomic<bool> deleted_wal_reopened = {false};
// whether a gap in the WALs was detected during recovery
std::atomic<bool> gap_in_wals = {false};
};
class DBWALTestWithEnrichedEnv : public DBTestBase {
public:
DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
enriched_env_ = new EnrichedSpecialEnv(env_->target());
auto options = CurrentOptions();
options.env = enriched_env_;
Reopen(options);
delete env_;
// to be deleted by the parent class
env_ = enriched_env_;
}
protected:
EnrichedSpecialEnv* enriched_env_;
};
// Test that the recovery would successfully avoid the gaps between the logs.
// One known scenario that could cause this is that the application issue the
// WAL deletion out of order. For the sake of simplicity in the test, here we
// create the gap by manipulating the env to skip deletion of the first WAL but
// not the ones after it.
TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
auto options = last_options_;
// To cause frequent WAL deletion
options.write_buffer_size = 128;
Reopen(options);
WriteOptions writeOpt = WriteOptions();
for (int i = 0; i < 128 * 5; i++) {
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
}
FlushOptions fo;
fo.wait = true;
ASSERT_OK(db_->Flush(fo));
// some wals are deleted
ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
// but not the first one
ASSERT_NE(0, enriched_env_->skipped_wal.size());
// Test that the WAL that was not deleted will be skipped during recovery
options = last_options_;
Reopen(options);
ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
ASSERT_FALSE(enriched_env_->gap_in_wals);
}
TEST_F(DBWALTest, WAL) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
@ -991,7 +891,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
// Record the offset at this point
Env* env = options.env;
uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1;
std::string fname = LogFileName(dbname_, wal_file_id);
uint64_t offset_to_corrupt;
ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));

View File

@ -1413,12 +1413,8 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
// fit in L3 but will overlap with compaction so will be added
// to L2 but a compaction will trivially move it to L3
// and break LSM consistency
static std::atomic<bool> called = {false};
if (!called) {
called = true;
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
}
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

View File

@ -30,7 +30,6 @@ enum Tag {
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9,
kDeletedLogNumber = 10,
// these are new formats divergent from open source leveldb
kNewFile2 = 100,
@ -45,11 +44,6 @@ enum Tag {
enum CustomTag {
kTerminate = 1, // The end of customized fields
kNeedCompaction = 2,
// Since Manifest is not entirely currently forward-compatible, and the only
// forward-compatbile part is the CutsomtTag of kNewFile, we currently encode
// kDeletedLogNumber as part of a CustomTag as a hack. This should be removed
// when manifest becomes forward-comptabile.
kDeletedLogNumberHack = 3,
kPathId = 65,
};
// If this bit for the custom tag is set, opening DB should fail if
@ -69,14 +63,12 @@ void VersionEdit::Clear() {
last_sequence_ = 0;
next_file_number_ = 0;
max_column_family_ = 0;
deleted_log_number_ = 0;
has_comparator_ = false;
has_log_number_ = false;
has_prev_log_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false;
has_max_column_family_ = false;
has_deleted_log_number_ = false;
deleted_files_.clear();
new_files_.clear();
column_family_ = 0;
@ -105,24 +97,6 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (has_max_column_family_) {
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
}
if (has_deleted_log_number_) {
// TODO(myabandeh): uncomment me when manifest is forward-compatible
// PutVarint32Varint64(dst, kDeletedLogNumber, deleted_log_number_);
// Since currently manifest is not forward compatible we encode this entry
// disguised as a kNewFile4 entry which has forward-compatible extensions.
PutVarint32(dst, kNewFile4);
PutVarint32Varint64(dst, 0u, 0ull); // level and number
PutVarint64(dst, 0ull); // file size
InternalKey dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue);
PutLengthPrefixedSlice(dst, dummy_key.Encode()); // smallest
PutLengthPrefixedSlice(dst, dummy_key.Encode()); // largest
PutVarint64Varint64(dst, 0ull, 0ull); // smallest_seqno and largerst
PutVarint32(dst, CustomTag::kDeletedLogNumberHack);
std::string buf;
PutFixed64(&buf, deleted_log_number_);
PutLengthPrefixedSlice(dst, Slice(buf));
PutVarint32(dst, CustomTag::kTerminate);
}
for (const auto& deleted : deleted_files_) {
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
@ -244,10 +218,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
uint64_t number;
uint32_t path_id = 0;
uint64_t file_size;
// Since this is the only forward-compatible part of the code, we hack new
// extension into this record. When we do, we set this boolean to distinguish
// the record from the normal NewFile records.
bool this_is_not_a_new_file_record = false;
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
GetInternalKey(input, &f.largest) &&
@ -282,15 +252,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
}
f.marked_for_compaction = (field[0] == 1);
break;
case kDeletedLogNumberHack:
// This is a hack to encode kDeletedLogNumber in a forward-compatbile
// fashion.
this_is_not_a_new_file_record = true;
if (!GetFixed64(&field, &deleted_log_number_)) {
return "deleted log number malformatted";
}
has_deleted_log_number_ = true;
break;
default:
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
// Should not proceed if cannot understand it
@ -302,10 +263,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
} else {
return "new-file4 entry";
}
if (this_is_not_a_new_file_record) {
// Since this has nothing to do with NewFile, return immediately.
return nullptr;
}
f.fd = FileDescriptor(number, path_id, file_size);
new_files_.push_back(std::make_pair(level, f));
return nullptr;
@ -374,14 +331,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kDeletedLogNumber:
if (GetVarint64(&input, &deleted_log_number_)) {
has_deleted_log_number_ = true;
} else {
msg = "deleted log number";
}
break;
case kCompactPointer:
if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) {
@ -564,10 +513,6 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append("\n MaxColumnFamily: ");
AppendNumberTo(&r, max_column_family_);
}
if (has_deleted_log_number_) {
r.append("\n DeletedLogNumber: ");
AppendNumberTo(&r, deleted_log_number_);
}
r.append("\n}\n");
return r;
}
@ -637,9 +582,6 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
if (has_max_column_family_) {
jw << "MaxColumnFamily" << max_column_family_;
}
if (has_deleted_log_number_) {
jw << "DeletedLogNumber" << deleted_log_number_;
}
jw.EndObject();

View File

@ -199,10 +199,6 @@ class VersionEdit {
has_max_column_family_ = true;
max_column_family_ = max_column_family;
}
void SetDeletedLogNumber(uint64_t num) {
has_deleted_log_number_ = true;
deleted_log_number_ = num;
}
// Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
@ -289,8 +285,6 @@ class VersionEdit {
uint64_t prev_log_number_;
uint64_t next_file_number_;
uint32_t max_column_family_;
// The most recent WAL log number that is deleted
uint64_t deleted_log_number_;
SequenceNumber last_sequence_;
bool has_comparator_;
bool has_log_number_;
@ -298,7 +292,6 @@ class VersionEdit {
bool has_next_file_number_;
bool has_last_sequence_;
bool has_max_column_family_;
bool has_deleted_log_number_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;

View File

@ -181,16 +181,6 @@ TEST_F(VersionEditTest, ColumnFamilyTest) {
TestEncodeDecode(edit);
}
TEST_F(VersionEditTest, DeletedLogNumber) {
VersionEdit edit;
edit.SetDeletedLogNumber(13);
TestEncodeDecode(edit);
edit.Clear();
edit.SetDeletedLogNumber(23);
TestEncodeDecode(edit);
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -3107,7 +3107,6 @@ Status VersionSet::Recover(
uint64_t log_number = 0;
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
uint64_t deleted_log_number = 0;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
// add default column family
@ -3248,11 +3247,6 @@ Status VersionSet::Recover(
max_column_family = edit.max_column_family_;
}
if (edit.has_deleted_log_number_) {
deleted_log_number =
std::max(deleted_log_number, edit.deleted_log_number_);
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
@ -3275,7 +3269,6 @@ Status VersionSet::Recover(
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkDeletedLogNumber(deleted_log_number);
MarkFileNumberUsed(previous_log_number);
MarkFileNumberUsed(log_number);
}
@ -3347,12 +3340,11 @@ Status VersionSet::Recover(
"manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu,"
"max_column_family is %u,"
"deleted_log_number is %lu\n",
"max_column_family is %u\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number());
column_family_set_->GetMaxColumnFamily());
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
@ -3658,10 +3650,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (edit.has_max_column_family_) {
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
}
if (edit.has_deleted_log_number_) {
MarkDeletedLogNumber(edit.deleted_log_number_);
}
}
}
file_reader.reset();
@ -3720,11 +3708,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
printf(
"next_file_number %lu last_sequence "
"%lu prev_log_number %lu max_column_family %u deleted_log_number "
"%" PRIu64 "\n",
"%lu prev_log_number %lu max_column_family %u\n",
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
(unsigned long)previous_log_number,
column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number());
column_family_set_->GetMaxColumnFamily());
}
return s;
@ -3739,14 +3726,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
}
}
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkDeletedLogNumber(uint64_t number) {
if (latest_deleted_log_number_.load(std::memory_order_relaxed) < number) {
latest_deleted_log_number_.store(number, std::memory_order_relaxed);
}
}
Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?

View File

@ -772,10 +772,6 @@ class VersionSet {
uint64_t current_next_file_number() const { return next_file_number_.load(); }
uint64_t latest_deleted_log_number() const {
return latest_deleted_log_number_.load();
}
// Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
@ -823,11 +819,6 @@ class VersionSet {
// REQUIRED: this is only called during single-threaded recovery or repair.
void MarkFileNumberUsed(uint64_t number);
// Mark the specified log number as deleted
// REQUIRED: this is only called during single-threaded recovery or repair, or
// from ::LogAndApply where the global mutex is held.
void MarkDeletedLogNumber(uint64_t number);
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t prev_log_number() const { return prev_log_number_; }
@ -925,8 +916,6 @@ class VersionSet {
const std::string dbname_;
const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_;
// Any log number equal or lower than this should be ignored during recovery.
std::atomic<uint64_t> latest_deleted_log_number_ = {0};
uint64_t manifest_file_number_;
uint64_t options_file_number_;
uint64_t pending_manifest_file_number_;