diff --git a/HISTORY.md b/HISTORY.md index 723c48773..8115c15ab 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes * Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev(). +* Fix a bug that prevents opening a DB after two consecutive crash with TransactionDB, where the first crash recovers from a corrupted WAL with kPointInTimeRecovery but the second cannot. ## 6.7.0 (01/21/2020) ### Public API Change diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index bbfecb3a6..931daeee3 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1110,10 +1110,13 @@ class DBImpl : public DB { // Recover the descriptor from persistent storage. May do a significant // amount of work to recover recently logged updates. Any changes to // be made to the descriptor are added to *edit. + // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is + // skipped. virtual Status Recover( const std::vector& column_families, bool read_only = false, bool error_if_log_file_exist = false, - bool error_if_data_exists_in_logs = false); + bool error_if_data_exists_in_logs = false, + uint64_t* recovered_seq = nullptr); virtual bool OwnTablesAndLogs() const { return true; } @@ -1355,8 +1358,10 @@ class DBImpl : public DB { JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); // REQUIRES: log_numbers are sorted in ascending order + // corrupted_log_found is set to true if we recover from a corrupted log file. Status RecoverLogFiles(const std::vector& log_numbers, - SequenceNumber* next_sequence, bool read_only); + SequenceNumber* next_sequence, bool read_only, + bool* corrupted_log_found); // The following two methods are used to flush a memtable to // storage. The first one is used at database RecoveryTime (when the diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 8027d786b..8174c9a0c 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -345,7 +345,8 @@ Status Directories::SetDirectories(Env* env, const std::string& dbname, Status DBImpl::Recover( const std::vector& column_families, bool read_only, - bool error_if_log_file_exist, bool error_if_data_exists_in_logs) { + bool error_if_log_file_exist, bool error_if_data_exists_in_logs, + uint64_t* recovered_seq) { mutex_.AssertHeld(); bool is_new_db = false; @@ -541,7 +542,12 @@ Status DBImpl::Recover( if (!logs.empty()) { // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); - s = RecoverLogFiles(logs, &next_sequence, read_only); + bool corrupted_log_found = false; + s = RecoverLogFiles(logs, &next_sequence, read_only, + &corrupted_log_found); + if (corrupted_log_found && recovered_seq != nullptr) { + *recovered_seq = next_sequence; + } if (!s.ok()) { // Clear memtables if recovery failed for (auto cfd : *versions_->GetColumnFamilySet()) { @@ -671,7 +677,8 @@ Status DBImpl::InitPersistStatsColumnFamily() { // REQUIRES: log_numbers are sorted in ascending order Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, - SequenceNumber* next_sequence, bool read_only) { + SequenceNumber* next_sequence, bool read_only, + bool* corrupted_log_found) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -973,6 +980,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, status = Status::OK(); stop_replay_for_corruption = true; corrupted_log_number = log_number; + if (corrupted_log_found != nullptr) { + *corrupted_log_found = true; + } ROCKS_LOG_INFO(immutable_db_options_.info_log, "Point in time recovered to log #%" PRIu64 " seq #%" PRIu64, @@ -1001,6 +1011,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // the corrupted log number, which means CF contains data beyond the point of // corruption. This could during PIT recovery when the WAL is corrupted and // some (but not all) CFs are flushed + // Exclude the PIT case where no log is dropped after the corruption point. + // This is to cover the case for empty logs after corrupted log, in which we + // don't reset stop_replay_for_corruption. if (stop_replay_for_corruption == true && (immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery || @@ -1396,7 +1409,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->mutex_.Lock(); // Handles create_if_missing, error_if_exists - s = impl->Recover(column_families); + uint64_t recovered_seq(kMaxSequenceNumber); + s = impl->Recover(column_families, false, false, false, &recovered_seq); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); log::Writer* new_log = nullptr; @@ -1454,9 +1468,29 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (impl->two_write_queues_) { impl->log_write_mutex_.Unlock(); } + impl->DeleteObsoleteFiles(); s = impl->directories_.GetDbDir()->Fsync(); } + if (s.ok()) { + // In WritePrepared there could be gap in sequence numbers. This breaks + // the trick we use in kPointInTimeRecovery which assumes the first seq in + // the log right after the corrupted log is one larger than the last seq + // we read from the logs. To let this trick keep working, we add a dummy + // entry with the expected sequence to the first log right after recovery. + // In non-WritePrepared case also the new log after recovery could be + // empty, and thus missing the consecutive seq hint to distinguish + // middle-log corruption to corrupted-log-remained-after-recovery. This + // case also will be addressed by a dummy write. + if (recovered_seq != kMaxSequenceNumber) { + WriteBatch empty_batch; + WriteBatchInternal::SetSequence(&empty_batch, recovered_seq); + WriteOptions write_options; + uint64_t log_used, log_size; + log::Writer* log_writer = impl->logs_.back().writer; + s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size); + } + } } if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { // try to read format version but no need to fail Open() even if it fails diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index d465077b0..4a3b0e035 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -29,7 +29,7 @@ DBImplSecondary::~DBImplSecondary() {} Status DBImplSecondary::Recover( const std::vector& column_families, bool /*readonly*/, bool /*error_if_log_file_exist*/, - bool /*error_if_data_exists_in_logs*/) { + bool /*error_if_data_exists_in_logs*/, uint64_t*) { mutex_.AssertHeld(); JobContext job_context(0); diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 99887f55b..cde664c58 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -78,7 +78,8 @@ class DBImplSecondary : public DBImpl { // and log_readers_ to facilitate future operations. Status Recover(const std::vector& column_families, bool read_only, bool error_if_log_file_exist, - bool error_if_data_exists_in_logs) override; + bool error_if_data_exists_in_logs, + uint64_t* = nullptr) override; // Implementations of the DB interface using DB::Get; diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index 5242c6260..6859c1aeb 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -158,12 +158,13 @@ class PessimisticTransactionDB : public TransactionDB { friend class WritePreparedTxnDB; friend class WritePreparedTxnDBMock; friend class WriteUnpreparedTxn; + friend class TransactionTest_DoubleCrashInRecovery_Test; friend class TransactionTest_DoubleEmptyWrite_Test; friend class TransactionTest_DuplicateKeys_Test; friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; - friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; + friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; TransactionLockMgr lock_mgr_; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index da1edd185..142799f5d 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -6116,6 +6116,92 @@ TEST_P(TransactionTest, ReseekOptimization) { delete txn0; } +// After recovery in kPointInTimeRecovery mode, the corrupted log file remains +// there. The new log files should be still read succesfully during recovery of +// the 2nd crash. +TEST_P(TransactionTest, DoubleCrashInRecovery) { + for (const bool write_after_recovery : {false, true}) { + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + ReOpen(); + std::string cf_name = "two"; + ColumnFamilyOptions cf_options; + ColumnFamilyHandle* cf_handle = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle)); + + // Add a prepare entry to prevent the older logs from being deleted. + WriteOptions write_options; + TransactionOptions txn_options; + Transaction* txn = db->BeginTransaction(write_options, txn_options); + ASSERT_OK(txn->SetName("xid")); + ASSERT_OK(txn->Put(Slice("foo-prepare"), Slice("bar-prepare"))); + ASSERT_OK(txn->Prepare()); + + FlushOptions flush_ops; + db->Flush(flush_ops); + // Now we have a log that cannot be deleted + + ASSERT_OK(db->Put(write_options, cf_handle, "foo1", "bar1")); + // Flush only the 2nd cf + db->Flush(flush_ops, cf_handle); + + // The value is large enough to be touched by the corruption we ingest + // below. + std::string large_value(400, ' '); + // key/value not touched by corruption + ASSERT_OK(db->Put(write_options, "foo2", "bar2")); + // key/value touched by corruption + ASSERT_OK(db->Put(write_options, "foo3", large_value)); + // key/value not touched by corruption + ASSERT_OK(db->Put(write_options, "foo4", "bar4")); + + db->FlushWAL(true); + DBImpl* db_impl = reinterpret_cast(db->GetRootDB()); + uint64_t wal_file_id = db_impl->TEST_LogfileNumber(); + std::string fname = LogFileName(dbname, wal_file_id); + reinterpret_cast(db)->TEST_Crash(); + delete txn; + delete cf_handle; + delete db; + db = nullptr; + + // Corrupt the last log file in the middle, so that it is not corrupted + // in the tail. + std::string file_content; + ASSERT_OK(ReadFileToString(env, fname, &file_content)); + file_content[400] = 'h'; + file_content[401] = 'a'; + ASSERT_OK(env->DeleteFile(fname)); + ASSERT_OK(WriteStringToFile(env, file_content, fname)); + + // Recover from corruption + std::vector handles; + std::vector column_families; + column_families.push_back(ColumnFamilyDescriptor(kDefaultColumnFamilyName, + ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("two", ColumnFamilyOptions())); + ASSERT_OK(ReOpenNoDelete(column_families, &handles)); + + if (write_after_recovery) { + // Write data to the log right after the corrupted log + ASSERT_OK(db->Put(write_options, "foo5", large_value)); + } + + // Persist data written to WAL during recovery or by the last Put + db->FlushWAL(true); + // 2nd crash to recover while having a valid log after the corrupted one. + ASSERT_OK(ReOpenNoDelete(column_families, &handles)); + assert(db != nullptr); + txn = db->GetTransactionByName("xid"); + ASSERT_TRUE(txn != nullptr); + ASSERT_OK(txn->Commit()); + delete txn; + for (auto handle : handles) { + delete handle; + } + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/utilities/transactions/transaction_test.h b/utilities/transactions/transaction_test.h index 62e033282..73d6b2b84 100644 --- a/utilities/transactions/transaction_test.h +++ b/utilities/transactions/transaction_test.h @@ -129,7 +129,7 @@ class TransactionTestBase : public ::testing::Test { } else { s = OpenWithStackableDB(cfs, handles); } - assert(db != nullptr); + assert(!s.ok() || db != nullptr); return s; }