From 140c308f6f4cafd174c1b99a378b67a2cbfd4938 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Fri, 30 Mar 2018 11:14:41 -0700 Subject: [PATCH] Skip deleted WALs during recovery Summary: This patch record the deleted WAL numbers in the manifest to ignore them and any WAL older than them during recovery. This is to avoid scenarios when we have a gap between the WAL files are fed to the recovery procedure. The gap could happen by for example out-of-order WAL deletion. Such gap could cause problems in 2PC recovery where the prepared and commit entry are placed into two separate WAL and gap in the WALs could result into not processing the WAL with the commit entry and hence breaking the 2PC recovery logic. Closes https://github.com/facebook/rocksdb/pull/3488 Differential Revision: D6967893 Pulled By: maysamyabandeh fbshipit-source-id: 13119feb155a08ab6d4909f437c7a750480dc8a1 --- db/db_flush_test.cc | 8 ++- db/db_impl_files.cc | 12 +++++ db/db_impl_open.cc | 7 +++ db/db_test_util.h | 5 +- db/db_wal_test.cc | 102 ++++++++++++++++++++++++++++++++++- db/external_sst_file_test.cc | 8 ++- db/version_edit.cc | 58 ++++++++++++++++++++ db/version_edit.h | 7 +++ db/version_edit_test.cc | 10 ++++ db/version_set.cc | 29 ++++++++-- db/version_set.h | 11 ++++ 11 files changed, 246 insertions(+), 11 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 87f894a7b..0e6dd0bf5 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -72,19 +72,23 @@ TEST_F(DBFlushTest, SyncFail) { auto* cfd = reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(); - int refs_before = cfd->current()->TEST_refs(); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(dbfull()->Flush(flush_options)); + // Flush installs a new super-version. Get the ref count after that. + auto current_before = cfd->current(); + int refs_before = cfd->current()->TEST_refs(); fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); + // Now the background job will do the flush; wait for it. dbfull()->TEST_WaitForFlushMemTable(); #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE - // Flush job should release ref count to current version. + // Backgroun flush job should release ref count to current version. + ASSERT_EQ(current_before, cfd->current()); ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 983787129..c3e698e98 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -332,6 +332,8 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, }; // namespace // Delete obsolete files and log status and information of file deletion +// Note: All WAL files must be deleted through this function (unelss they are +// archived) to ensure that maniefest is updated properly. void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, FileType type, uint64_t number, uint32_t path_id) { @@ -340,6 +342,16 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, file_deletion_status = DeleteSSTFile(&immutable_db_options_, fname, path_id); } else { + if (type == kLogFile) { + // Before deleting the file, mark file as deleted in the manifest + VersionEdit edit; + edit.SetDeletedLogNumber(number); + auto edit_cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto edit_cf_opts = edit_cfd->GetLatestMutableCFOptions(); + mutex_.Lock(); + versions_->LogAndApply(edit_cfd, *edit_cf_opts, &edit, &mutex_); + mutex_.Unlock(); + } file_deletion_status = env_->DeleteFile(fname); } TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion", diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 047a17b21..6e05af70a 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -529,6 +529,13 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool flushed = false; uint64_t corrupted_log_number = kMaxSequenceNumber; for (auto log_number : log_numbers) { + if (log_number <= versions_->latest_deleted_log_number()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Skipping log #%" PRIu64 + " since it is not newer than latest deleted log #%" PRIu64, + log_number, versions_->latest_deleted_log_number()); + continue; + } // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. diff --git a/db/db_test_util.h b/db/db_test_util.h index 936823eff..0c3da91ce 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -451,8 +451,9 @@ class SpecialEnv : public EnvWrapper { return s; } - Status NewSequentialFile(const std::string& f, unique_ptr* r, - const EnvOptions& soptions) override { + virtual Status NewSequentialFile(const std::string& f, + unique_ptr* r, + const EnvOptions& soptions) override { class CountingFile : public SequentialFile { public: CountingFile(unique_ptr&& target, diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 796ef251c..0349bdc8d 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -20,6 +20,106 @@ class DBWALTest : public DBTestBase { DBWALTest() : DBTestBase("/db_wal_test") {} }; +// A SpecialEnv enriched to give more insight about deleted files +class EnrichedSpecialEnv : public SpecialEnv { + public: + explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {} + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) override { + InstrumentedMutexLock l(&env_mutex_); + if (f == skipped_wal) { + deleted_wal_reopened = true; + if (IsWAL(f) && largetest_deleted_wal.size() != 0 && + f.compare(largetest_deleted_wal) <= 0) { + gap_in_wals = true; + } + } + return SpecialEnv::NewSequentialFile(f, r, soptions); + } + Status DeleteFile(const std::string& fname) override { + if (IsWAL(fname)) { + deleted_wal_cnt++; + InstrumentedMutexLock l(&env_mutex_); + // If this is the first WAL, remember its name and skip deleting it. We + // remember its name partly because the application might attempt to + // delete the file again. + if (skipped_wal.size() != 0 && skipped_wal != fname) { + if (largetest_deleted_wal.size() == 0 || + largetest_deleted_wal.compare(fname) < 0) { + largetest_deleted_wal = fname; + } + } else { + skipped_wal = fname; + return Status::OK(); + } + } + return SpecialEnv::DeleteFile(fname); + } + bool IsWAL(const std::string& fname) { + // printf("iswal %s\n", fname.c_str()); + return fname.compare(fname.size() - 3, 3, "log") == 0; + } + + InstrumentedMutex env_mutex_; + // the wal whose actual delete was skipped by the env + std::string skipped_wal = ""; + // the largest WAL that was requested to be deleted + std::string largetest_deleted_wal = ""; + // number of WALs that were successfully deleted + std::atomic deleted_wal_cnt = {0}; + // the WAL whose delete from fs was skipped is reopened during recovery + std::atomic deleted_wal_reopened = {false}; + // whether a gap in the WALs was detected during recovery + std::atomic gap_in_wals = {false}; +}; + +class DBWALTestWithEnrichedEnv : public DBTestBase { + public: + DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") { + enriched_env_ = new EnrichedSpecialEnv(env_->target()); + auto options = CurrentOptions(); + options.env = enriched_env_; + Reopen(options); + delete env_; + // to be deleted by the parent class + env_ = enriched_env_; + } + + protected: + EnrichedSpecialEnv* enriched_env_; +}; + +// Test that the recovery would successfully avoid the gaps between the logs. +// One known scenario that could cause this is that the application issue the +// WAL deletion out of order. For the sake of simplicity in the test, here we +// create the gap by manipulating the env to skip deletion of the first WAL but +// not the ones after it. +TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) { + auto options = last_options_; + // To cause frequent WAL deletion + options.write_buffer_size = 128; + Reopen(options); + + WriteOptions writeOpt = WriteOptions(); + for (int i = 0; i < 128 * 5; i++) { + ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); + } + FlushOptions fo; + fo.wait = true; + ASSERT_OK(db_->Flush(fo)); + + // some wals are deleted + ASSERT_NE(0, enriched_env_->deleted_wal_cnt); + // but not the first one + ASSERT_NE(0, enriched_env_->skipped_wal.size()); + + // Test that the WAL that was not deleted will be skipped during recovery + options = last_options_; + Reopen(options); + ASSERT_FALSE(enriched_env_->deleted_wal_reopened); + ASSERT_FALSE(enriched_env_->gap_in_wals); +} + TEST_F(DBWALTest, WAL) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -891,7 +991,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { // Record the offset at this point Env* env = options.env; - int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1; + uint64_t wal_file_id = dbfull()->TEST_LogfileNumber(); std::string fname = LogFileName(dbname_, wal_file_id); uint64_t offset_to_corrupt; ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index fcdf07adc..aeaee8fc0 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1413,8 +1413,12 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { // fit in L3 but will overlap with compaction so will be added // to L2 but a compaction will trivially move it to L3 // and break LSM consistency - ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); - ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + static std::atomic called = {false}; + if (!called) { + called = true; + ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); + } }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/version_edit.cc b/db/version_edit.cc index ebfc10584..b966122a3 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -30,6 +30,7 @@ enum Tag { kNewFile = 7, // 8 was used for large value refs kPrevLogNumber = 9, + kDeletedLogNumber = 10, // these are new formats divergent from open source leveldb kNewFile2 = 100, @@ -44,6 +45,11 @@ enum Tag { enum CustomTag { kTerminate = 1, // The end of customized fields kNeedCompaction = 2, + // Since Manifest is not entirely currently forward-compatible, and the only + // forward-compatbile part is the CutsomtTag of kNewFile, we currently encode + // kDeletedLogNumber as part of a CustomTag as a hack. This should be removed + // when manifest becomes forward-comptabile. + kDeletedLogNumberHack = 3, kPathId = 65, }; // If this bit for the custom tag is set, opening DB should fail if @@ -63,12 +69,14 @@ void VersionEdit::Clear() { last_sequence_ = 0; next_file_number_ = 0; max_column_family_ = 0; + deleted_log_number_ = 0; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; has_max_column_family_ = false; + has_deleted_log_number_ = false; deleted_files_.clear(); new_files_.clear(); column_family_ = 0; @@ -97,6 +105,24 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (has_max_column_family_) { PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_); } + if (has_deleted_log_number_) { + // TODO(myabandeh): uncomment me when manifest is forward-compatible + // PutVarint32Varint64(dst, kDeletedLogNumber, deleted_log_number_); + // Since currently manifest is not forward compatible we encode this entry + // disguised as a kNewFile4 entry which has forward-compatible extensions. + PutVarint32(dst, kNewFile4); + PutVarint32Varint64(dst, 0u, 0ull); // level and number + PutVarint64(dst, 0ull); // file size + InternalKey dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue); + PutLengthPrefixedSlice(dst, dummy_key.Encode()); // smallest + PutLengthPrefixedSlice(dst, dummy_key.Encode()); // largest + PutVarint64Varint64(dst, 0ull, 0ull); // smallest_seqno and largerst + PutVarint32(dst, CustomTag::kDeletedLogNumberHack); + std::string buf; + PutFixed64(&buf, deleted_log_number_); + PutLengthPrefixedSlice(dst, Slice(buf)); + PutVarint32(dst, CustomTag::kTerminate); + } for (const auto& deleted : deleted_files_) { PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, @@ -218,6 +244,10 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { uint64_t number; uint32_t path_id = 0; uint64_t file_size; + // Since this is the only forward-compatible part of the code, we hack new + // extension into this record. When we do, we set this boolean to distinguish + // the record from the normal NewFile records. + bool this_is_not_a_new_file_record = false; if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetInternalKey(input, &f.largest) && @@ -252,6 +282,15 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } f.marked_for_compaction = (field[0] == 1); break; + case kDeletedLogNumberHack: + // This is a hack to encode kDeletedLogNumber in a forward-compatbile + // fashion. + this_is_not_a_new_file_record = true; + if (!GetFixed64(&field, &deleted_log_number_)) { + return "deleted log number malformatted"; + } + has_deleted_log_number_ = true; + break; default: if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) { // Should not proceed if cannot understand it @@ -263,6 +302,10 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } else { return "new-file4 entry"; } + if (this_is_not_a_new_file_record) { + // Since this has nothing to do with NewFile, return immediately. + return nullptr; + } f.fd = FileDescriptor(number, path_id, file_size); new_files_.push_back(std::make_pair(level, f)); return nullptr; @@ -331,6 +374,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kDeletedLogNumber: + if (GetVarint64(&input, &deleted_log_number_)) { + has_deleted_log_number_ = true; + } else { + msg = "deleted log number"; + } + break; + case kCompactPointer: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { @@ -513,6 +564,10 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n MaxColumnFamily: "); AppendNumberTo(&r, max_column_family_); } + if (has_deleted_log_number_) { + r.append("\n DeletedLogNumber: "); + AppendNumberTo(&r, deleted_log_number_); + } r.append("\n}\n"); return r; } @@ -582,6 +637,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { if (has_max_column_family_) { jw << "MaxColumnFamily" << max_column_family_; } + if (has_deleted_log_number_) { + jw << "DeletedLogNumber" << deleted_log_number_; + } jw.EndObject(); diff --git a/db/version_edit.h b/db/version_edit.h index 391e61434..f826b11bc 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -199,6 +199,10 @@ class VersionEdit { has_max_column_family_ = true; max_column_family_ = max_column_family; } + void SetDeletedLogNumber(uint64_t num) { + has_deleted_log_number_ = true; + deleted_log_number_ = num; + } // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) @@ -285,6 +289,8 @@ class VersionEdit { uint64_t prev_log_number_; uint64_t next_file_number_; uint32_t max_column_family_; + // The most recent WAL log number that is deleted + uint64_t deleted_log_number_; SequenceNumber last_sequence_; bool has_comparator_; bool has_log_number_; @@ -292,6 +298,7 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; bool has_max_column_family_; + bool has_deleted_log_number_; DeletedFileSet deleted_files_; std::vector> new_files_; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 338bb36f6..0fc4a5e75 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -181,6 +181,16 @@ TEST_F(VersionEditTest, ColumnFamilyTest) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, DeletedLogNumber) { + VersionEdit edit; + edit.SetDeletedLogNumber(13); + TestEncodeDecode(edit); + + edit.Clear(); + edit.SetDeletedLogNumber(23); + TestEncodeDecode(edit); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index b7a62d5e7..d2bfcc8de 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3076,6 +3076,7 @@ Status VersionSet::Recover( uint64_t log_number = 0; uint64_t previous_log_number = 0; uint32_t max_column_family = 0; + uint64_t deleted_log_number = 0; std::unordered_map builders; // add default column family @@ -3216,6 +3217,11 @@ Status VersionSet::Recover( max_column_family = edit.max_column_family_; } + if (edit.has_deleted_log_number_) { + deleted_log_number = + std::max(deleted_log_number, edit.deleted_log_number_); + } + if (edit.has_last_sequence_) { last_sequence = edit.last_sequence_; have_last_sequence = true; @@ -3238,6 +3244,7 @@ Status VersionSet::Recover( column_family_set_->UpdateMaxColumnFamily(max_column_family); + MarkDeletedLogNumber(deleted_log_number); MarkFileNumberUsed(previous_log_number); MarkFileNumberUsed(log_number); } @@ -3309,11 +3316,12 @@ Status VersionSet::Recover( "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," - "max_column_family is %u\n", + "max_column_family is %u," + "deleted_log_number is %lu\n", manifest_filename.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -3619,6 +3627,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, if (edit.has_max_column_family_) { column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); } + + if (edit.has_deleted_log_number_) { + MarkDeletedLogNumber(edit.deleted_log_number_); + } } } file_reader.reset(); @@ -3677,10 +3689,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, printf( "next_file_number %lu last_sequence " - "%lu prev_log_number %lu max_column_family %u\n", + "%lu prev_log_number %lu max_column_family %u deleted_log_number " + "%" PRIu64 "\n", (unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)previous_log_number, - column_family_set_->GetMaxColumnFamily()); + column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number()); } return s; @@ -3695,6 +3708,14 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } } +// Called only either from ::LogAndApply which is protected by mutex or during +// recovery which is single-threaded. +void VersionSet::MarkDeletedLogNumber(uint64_t number) { + if (latest_deleted_log_number_.load(std::memory_order_relaxed) < number) { + latest_deleted_log_number_.store(number, std::memory_order_relaxed); + } +} + Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? diff --git a/db/version_set.h b/db/version_set.h index 832857f63..bf92b200c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -759,6 +759,10 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } + uint64_t latest_deleted_log_number() const { + return latest_deleted_log_number_.load(); + } + // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } @@ -806,6 +810,11 @@ class VersionSet { // REQUIRED: this is only called during single-threaded recovery or repair. void MarkFileNumberUsed(uint64_t number); + // Mark the specified log number as deleted + // REQUIRED: this is only called during single-threaded recovery or repair, or + // from ::LogAndApply where the global mutex is held. + void MarkDeletedLogNumber(uint64_t number); + // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t prev_log_number() const { return prev_log_number_; } @@ -903,6 +912,8 @@ class VersionSet { const std::string dbname_; const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; + // Any log number equal or lower than this should be ignored during recovery. + std::atomic latest_deleted_log_number_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_;