From e049d5a12a6f29d270e5a3b869c75926c0990c87 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 23 Apr 2018 12:19:16 -0700 Subject: [PATCH] Revert "Skip deleted WALs during recovery" This reverts commit 140c308f6f4cafd174c1b99a378b67a2cbfd4938. --- db/db_flush_test.cc | 8 +-- db/db_impl_files.cc | 12 ----- db/db_impl_open.cc | 7 --- db/db_test_util.h | 5 +- db/db_wal_test.cc | 102 +---------------------------------- db/external_sst_file_test.cc | 8 +-- db/version_edit.cc | 58 -------------------- db/version_edit.h | 7 --- db/version_edit_test.cc | 10 ---- db/version_set.cc | 29 ++-------- db/version_set.h | 11 ---- 11 files changed, 11 insertions(+), 246 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 0e6dd0bf5..87f894a7b 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -72,23 +72,19 @@ TEST_F(DBFlushTest, SyncFail) { auto* cfd = reinterpret_cast(db_->DefaultColumnFamily()) ->cfd(); + int refs_before = cfd->current()->TEST_refs(); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(dbfull()->Flush(flush_options)); - // Flush installs a new super-version. Get the ref count after that. - auto current_before = cfd->current(); - int refs_before = cfd->current()->TEST_refs(); fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); fault_injection_env->SetFilesystemActive(true); - // Now the background job will do the flush; wait for it. dbfull()->TEST_WaitForFlushMemTable(); #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE - // Backgroun flush job should release ref count to current version. - ASSERT_EQ(current_before, cfd->current()); + // Flush job should release ref count to current version. ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index c3e698e98..983787129 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -332,8 +332,6 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, }; // namespace // Delete obsolete files and log status and information of file deletion -// Note: All WAL files must be deleted through this function (unelss they are -// archived) to ensure that maniefest is updated properly. void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, FileType type, uint64_t number, uint32_t path_id) { @@ -342,16 +340,6 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, file_deletion_status = DeleteSSTFile(&immutable_db_options_, fname, path_id); } else { - if (type == kLogFile) { - // Before deleting the file, mark file as deleted in the manifest - VersionEdit edit; - edit.SetDeletedLogNumber(number); - auto edit_cfd = versions_->GetColumnFamilySet()->GetDefault(); - auto edit_cf_opts = edit_cfd->GetLatestMutableCFOptions(); - mutex_.Lock(); - versions_->LogAndApply(edit_cfd, *edit_cf_opts, &edit, &mutex_); - mutex_.Unlock(); - } file_deletion_status = env_->DeleteFile(fname); } TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion", diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 78e871c58..364461964 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -528,13 +528,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, bool flushed = false; uint64_t corrupted_log_number = kMaxSequenceNumber; for (auto log_number : log_numbers) { - if (log_number <= versions_->latest_deleted_log_number()) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Skipping log #%" PRIu64 - " since it is not newer than latest deleted log #%" PRIu64, - log_number, versions_->latest_deleted_log_number()); - continue; - } // The previous incarnation may not have written any MANIFEST // records after allocating this log number. So we manually // update the file number allocation counter in VersionSet. diff --git a/db/db_test_util.h b/db/db_test_util.h index 0c3da91ce..936823eff 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -451,9 +451,8 @@ class SpecialEnv : public EnvWrapper { return s; } - virtual Status NewSequentialFile(const std::string& f, - unique_ptr* r, - const EnvOptions& soptions) override { + Status NewSequentialFile(const std::string& f, unique_ptr* r, + const EnvOptions& soptions) override { class CountingFile : public SequentialFile { public: CountingFile(unique_ptr&& target, diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 0349bdc8d..796ef251c 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -20,106 +20,6 @@ class DBWALTest : public DBTestBase { DBWALTest() : DBTestBase("/db_wal_test") {} }; -// A SpecialEnv enriched to give more insight about deleted files -class EnrichedSpecialEnv : public SpecialEnv { - public: - explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {} - Status NewSequentialFile(const std::string& f, unique_ptr* r, - const EnvOptions& soptions) override { - InstrumentedMutexLock l(&env_mutex_); - if (f == skipped_wal) { - deleted_wal_reopened = true; - if (IsWAL(f) && largetest_deleted_wal.size() != 0 && - f.compare(largetest_deleted_wal) <= 0) { - gap_in_wals = true; - } - } - return SpecialEnv::NewSequentialFile(f, r, soptions); - } - Status DeleteFile(const std::string& fname) override { - if (IsWAL(fname)) { - deleted_wal_cnt++; - InstrumentedMutexLock l(&env_mutex_); - // If this is the first WAL, remember its name and skip deleting it. We - // remember its name partly because the application might attempt to - // delete the file again. - if (skipped_wal.size() != 0 && skipped_wal != fname) { - if (largetest_deleted_wal.size() == 0 || - largetest_deleted_wal.compare(fname) < 0) { - largetest_deleted_wal = fname; - } - } else { - skipped_wal = fname; - return Status::OK(); - } - } - return SpecialEnv::DeleteFile(fname); - } - bool IsWAL(const std::string& fname) { - // printf("iswal %s\n", fname.c_str()); - return fname.compare(fname.size() - 3, 3, "log") == 0; - } - - InstrumentedMutex env_mutex_; - // the wal whose actual delete was skipped by the env - std::string skipped_wal = ""; - // the largest WAL that was requested to be deleted - std::string largetest_deleted_wal = ""; - // number of WALs that were successfully deleted - std::atomic deleted_wal_cnt = {0}; - // the WAL whose delete from fs was skipped is reopened during recovery - std::atomic deleted_wal_reopened = {false}; - // whether a gap in the WALs was detected during recovery - std::atomic gap_in_wals = {false}; -}; - -class DBWALTestWithEnrichedEnv : public DBTestBase { - public: - DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") { - enriched_env_ = new EnrichedSpecialEnv(env_->target()); - auto options = CurrentOptions(); - options.env = enriched_env_; - Reopen(options); - delete env_; - // to be deleted by the parent class - env_ = enriched_env_; - } - - protected: - EnrichedSpecialEnv* enriched_env_; -}; - -// Test that the recovery would successfully avoid the gaps between the logs. -// One known scenario that could cause this is that the application issue the -// WAL deletion out of order. For the sake of simplicity in the test, here we -// create the gap by manipulating the env to skip deletion of the first WAL but -// not the ones after it. -TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) { - auto options = last_options_; - // To cause frequent WAL deletion - options.write_buffer_size = 128; - Reopen(options); - - WriteOptions writeOpt = WriteOptions(); - for (int i = 0; i < 128 * 5; i++) { - ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); - } - FlushOptions fo; - fo.wait = true; - ASSERT_OK(db_->Flush(fo)); - - // some wals are deleted - ASSERT_NE(0, enriched_env_->deleted_wal_cnt); - // but not the first one - ASSERT_NE(0, enriched_env_->skipped_wal.size()); - - // Test that the WAL that was not deleted will be skipped during recovery - options = last_options_; - Reopen(options); - ASSERT_FALSE(enriched_env_->deleted_wal_reopened); - ASSERT_FALSE(enriched_env_->gap_in_wals); -} - TEST_F(DBWALTest, WAL) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); @@ -991,7 +891,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { // Record the offset at this point Env* env = options.env; - uint64_t wal_file_id = dbfull()->TEST_LogfileNumber(); + int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1; std::string fname = LogFileName(dbname_, wal_file_id); uint64_t offset_to_corrupt; ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index aeaee8fc0..fcdf07adc 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1413,12 +1413,8 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) { // fit in L3 but will overlap with compaction so will be added // to L2 but a compaction will trivially move it to L3 // and break LSM consistency - static std::atomic called = {false}; - if (!called) { - called = true; - ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); - ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); - } + ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); + ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/db/version_edit.cc b/db/version_edit.cc index b966122a3..ebfc10584 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -30,7 +30,6 @@ enum Tag { kNewFile = 7, // 8 was used for large value refs kPrevLogNumber = 9, - kDeletedLogNumber = 10, // these are new formats divergent from open source leveldb kNewFile2 = 100, @@ -45,11 +44,6 @@ enum Tag { enum CustomTag { kTerminate = 1, // The end of customized fields kNeedCompaction = 2, - // Since Manifest is not entirely currently forward-compatible, and the only - // forward-compatbile part is the CutsomtTag of kNewFile, we currently encode - // kDeletedLogNumber as part of a CustomTag as a hack. This should be removed - // when manifest becomes forward-comptabile. - kDeletedLogNumberHack = 3, kPathId = 65, }; // If this bit for the custom tag is set, opening DB should fail if @@ -69,14 +63,12 @@ void VersionEdit::Clear() { last_sequence_ = 0; next_file_number_ = 0; max_column_family_ = 0; - deleted_log_number_ = 0; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; has_max_column_family_ = false; - has_deleted_log_number_ = false; deleted_files_.clear(); new_files_.clear(); column_family_ = 0; @@ -105,24 +97,6 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (has_max_column_family_) { PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_); } - if (has_deleted_log_number_) { - // TODO(myabandeh): uncomment me when manifest is forward-compatible - // PutVarint32Varint64(dst, kDeletedLogNumber, deleted_log_number_); - // Since currently manifest is not forward compatible we encode this entry - // disguised as a kNewFile4 entry which has forward-compatible extensions. - PutVarint32(dst, kNewFile4); - PutVarint32Varint64(dst, 0u, 0ull); // level and number - PutVarint64(dst, 0ull); // file size - InternalKey dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue); - PutLengthPrefixedSlice(dst, dummy_key.Encode()); // smallest - PutLengthPrefixedSlice(dst, dummy_key.Encode()); // largest - PutVarint64Varint64(dst, 0ull, 0ull); // smallest_seqno and largerst - PutVarint32(dst, CustomTag::kDeletedLogNumberHack); - std::string buf; - PutFixed64(&buf, deleted_log_number_); - PutLengthPrefixedSlice(dst, Slice(buf)); - PutVarint32(dst, CustomTag::kTerminate); - } for (const auto& deleted : deleted_files_) { PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, @@ -244,10 +218,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { uint64_t number; uint32_t path_id = 0; uint64_t file_size; - // Since this is the only forward-compatible part of the code, we hack new - // extension into this record. When we do, we set this boolean to distinguish - // the record from the normal NewFile records. - bool this_is_not_a_new_file_record = false; if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetInternalKey(input, &f.largest) && @@ -282,15 +252,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } f.marked_for_compaction = (field[0] == 1); break; - case kDeletedLogNumberHack: - // This is a hack to encode kDeletedLogNumber in a forward-compatbile - // fashion. - this_is_not_a_new_file_record = true; - if (!GetFixed64(&field, &deleted_log_number_)) { - return "deleted log number malformatted"; - } - has_deleted_log_number_ = true; - break; default: if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) { // Should not proceed if cannot understand it @@ -302,10 +263,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) { } else { return "new-file4 entry"; } - if (this_is_not_a_new_file_record) { - // Since this has nothing to do with NewFile, return immediately. - return nullptr; - } f.fd = FileDescriptor(number, path_id, file_size); new_files_.push_back(std::make_pair(level, f)); return nullptr; @@ -374,14 +331,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; - case kDeletedLogNumber: - if (GetVarint64(&input, &deleted_log_number_)) { - has_deleted_log_number_ = true; - } else { - msg = "deleted log number"; - } - break; - case kCompactPointer: if (GetLevel(&input, &level, &msg) && GetInternalKey(&input, &key)) { @@ -564,10 +513,6 @@ std::string VersionEdit::DebugString(bool hex_key) const { r.append("\n MaxColumnFamily: "); AppendNumberTo(&r, max_column_family_); } - if (has_deleted_log_number_) { - r.append("\n DeletedLogNumber: "); - AppendNumberTo(&r, deleted_log_number_); - } r.append("\n}\n"); return r; } @@ -637,9 +582,6 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const { if (has_max_column_family_) { jw << "MaxColumnFamily" << max_column_family_; } - if (has_deleted_log_number_) { - jw << "DeletedLogNumber" << deleted_log_number_; - } jw.EndObject(); diff --git a/db/version_edit.h b/db/version_edit.h index f826b11bc..391e61434 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -199,10 +199,6 @@ class VersionEdit { has_max_column_family_ = true; max_column_family_ = max_column_family; } - void SetDeletedLogNumber(uint64_t num) { - has_deleted_log_number_ = true; - deleted_log_number_ = num; - } // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) @@ -289,8 +285,6 @@ class VersionEdit { uint64_t prev_log_number_; uint64_t next_file_number_; uint32_t max_column_family_; - // The most recent WAL log number that is deleted - uint64_t deleted_log_number_; SequenceNumber last_sequence_; bool has_comparator_; bool has_log_number_; @@ -298,7 +292,6 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; bool has_max_column_family_; - bool has_deleted_log_number_; DeletedFileSet deleted_files_; std::vector> new_files_; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 0fc4a5e75..338bb36f6 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -181,16 +181,6 @@ TEST_F(VersionEditTest, ColumnFamilyTest) { TestEncodeDecode(edit); } -TEST_F(VersionEditTest, DeletedLogNumber) { - VersionEdit edit; - edit.SetDeletedLogNumber(13); - TestEncodeDecode(edit); - - edit.Clear(); - edit.SetDeletedLogNumber(23); - TestEncodeDecode(edit); -} - } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index b66b3436e..3d340de2e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3107,7 +3107,6 @@ Status VersionSet::Recover( uint64_t log_number = 0; uint64_t previous_log_number = 0; uint32_t max_column_family = 0; - uint64_t deleted_log_number = 0; std::unordered_map builders; // add default column family @@ -3248,11 +3247,6 @@ Status VersionSet::Recover( max_column_family = edit.max_column_family_; } - if (edit.has_deleted_log_number_) { - deleted_log_number = - std::max(deleted_log_number, edit.deleted_log_number_); - } - if (edit.has_last_sequence_) { last_sequence = edit.last_sequence_; have_last_sequence = true; @@ -3275,7 +3269,6 @@ Status VersionSet::Recover( column_family_set_->UpdateMaxColumnFamily(max_column_family); - MarkDeletedLogNumber(deleted_log_number); MarkFileNumberUsed(previous_log_number); MarkFileNumberUsed(log_number); } @@ -3347,12 +3340,11 @@ Status VersionSet::Recover( "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," - "max_column_family is %u," - "deleted_log_number is %lu\n", + "max_column_family is %u\n", manifest_filename.c_str(), (unsigned long)manifest_file_number_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)log_number, (unsigned long)prev_log_number_, - column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number()); + column_family_set_->GetMaxColumnFamily()); for (auto cfd : *column_family_set_) { if (cfd->IsDropped()) { @@ -3658,10 +3650,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, if (edit.has_max_column_family_) { column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); } - - if (edit.has_deleted_log_number_) { - MarkDeletedLogNumber(edit.deleted_log_number_); - } } } file_reader.reset(); @@ -3720,11 +3708,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, printf( "next_file_number %lu last_sequence " - "%lu prev_log_number %lu max_column_family %u deleted_log_number " - "%" PRIu64 "\n", + "%lu prev_log_number %lu max_column_family %u\n", (unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)previous_log_number, - column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number()); + column_family_set_->GetMaxColumnFamily()); } return s; @@ -3739,14 +3726,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { } } -// Called only either from ::LogAndApply which is protected by mutex or during -// recovery which is single-threaded. -void VersionSet::MarkDeletedLogNumber(uint64_t number) { - if (latest_deleted_log_number_.load(std::memory_order_relaxed) < number) { - latest_deleted_log_number_.store(number, std::memory_order_relaxed); - } -} - Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? diff --git a/db/version_set.h b/db/version_set.h index 3783da314..582a5a645 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -772,10 +772,6 @@ class VersionSet { uint64_t current_next_file_number() const { return next_file_number_.load(); } - uint64_t latest_deleted_log_number() const { - return latest_deleted_log_number_.load(); - } - // Allocate and return a new file number uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } @@ -823,11 +819,6 @@ class VersionSet { // REQUIRED: this is only called during single-threaded recovery or repair. void MarkFileNumberUsed(uint64_t number); - // Mark the specified log number as deleted - // REQUIRED: this is only called during single-threaded recovery or repair, or - // from ::LogAndApply where the global mutex is held. - void MarkDeletedLogNumber(uint64_t number); - // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t prev_log_number() const { return prev_log_number_; } @@ -925,8 +916,6 @@ class VersionSet { const std::string dbname_; const ImmutableDBOptions* const db_options_; std::atomic next_file_number_; - // Any log number equal or lower than this should be ignored during recovery. - std::atomic latest_deleted_log_number_ = {0}; uint64_t manifest_file_number_; uint64_t options_file_number_; uint64_t pending_manifest_file_number_;