Do not track obsolete WALs in MANIFEST even if they are synced (#7725)
Summary: Consider the case: 1. All column families are flushed, so all WALs become obsolete, but no WAL is removed from disk yet because the removal is asynchronous, a VersionEdit is written to MANIFEST indicating that WALs before a certain WAL number are obsolete, let's say this number is 3; 2. `SyncWAL` is called, so all the on-disk WALs are synced, and if track_and_verify_wal_in_manifest=true, the WALs will be tracked in MANIFEST, let's say the WAL numbers are 1 and 2; 3. DB crashes; 4. During DB recovery, when replaying MANIFEST, we first see that WAL with number < 3 are obsolete, then we see that WAL 1 and 2 are synced, so according to current implementation of `WalSet`, the `WalSet` will be recovered to include WAL 1 and 2; 5. WAL 1 and 2 are asynchronously deleted from disk, then the WAL verification algorithm fails with `Corruption: missing WAL`. The above case is reproduced in a new unit test `DBBasicTestTrackWal::DoNotTrackObsoleteWal`. The fix is to maintain the upper bound of the obsolete WAL numbers, any WAL with number less than the maintained number is considered to be obsolete, so shouldn't be tracked even if they are later synced. The number is maintained in `WalSet`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7725 Test Plan: 1. a new unit test `DBBasicTestTrackWal::DoNotTrackObsoleteWal` is added. 2. run `make crash_test` on devserver. Reviewed By: riversand963 Differential Revision: D25238914 Pulled By: cheng-chang fbshipit-source-id: f5dccd57c3d89f19565ec5731f2d42f06d272b72
This commit is contained in:
parent
11c4be2222
commit
07030c6f4a
@ -2534,6 +2534,64 @@ TEST_F(DBBasicTest, ManifestChecksumMismatch) {
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
class DBBasicTestTrackWal : public DBTestBase,
|
||||
public testing::WithParamInterface<bool> {
|
||||
public:
|
||||
DBBasicTestTrackWal()
|
||||
: DBTestBase("/db_basic_test_track_wal", /*env_do_fsync=*/false) {}
|
||||
|
||||
int CountWalFiles() {
|
||||
VectorLogPtr log_files;
|
||||
dbfull()->GetSortedWalFiles(log_files);
|
||||
return static_cast<int>(log_files.size());
|
||||
};
|
||||
};
|
||||
|
||||
TEST_P(DBBasicTestTrackWal, DoNotTrackObsoleteWal) {
|
||||
// If a WAL becomes obsolete after flushing, but is not deleted from disk yet,
|
||||
// then if SyncWAL is called afterwards, the obsolete WAL should not be
|
||||
// tracked in MANIFEST.
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.track_and_verify_wals_in_manifest = true;
|
||||
options.atomic_flush = GetParam();
|
||||
|
||||
DestroyAndReopen(options);
|
||||
CreateAndReopenWithCF({"cf"}, options);
|
||||
ASSERT_EQ(handles_.size(), 2); // default, cf
|
||||
// Do not delete WALs.
|
||||
ASSERT_OK(db_->DisableFileDeletions());
|
||||
constexpr int n = 10;
|
||||
std::vector<std::unique_ptr<LogFile>> wals(n);
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
// Generate a new WAL for each key-value.
|
||||
const int cf = i % 2;
|
||||
ASSERT_OK(db_->GetCurrentWalFile(&wals[i]));
|
||||
ASSERT_OK(Put(cf, "k" + std::to_string(i), "v" + std::to_string(i)));
|
||||
ASSERT_OK(Flush({0, 1}));
|
||||
}
|
||||
ASSERT_EQ(CountWalFiles(), n);
|
||||
// Since all WALs are obsolete, no WAL should be tracked in MANIFEST.
|
||||
ASSERT_OK(db_->SyncWAL());
|
||||
|
||||
// Manually delete all WALs.
|
||||
Close();
|
||||
for (const auto& wal : wals) {
|
||||
ASSERT_OK(env_->DeleteFile(LogFileName(dbname_, wal->LogNumber())));
|
||||
}
|
||||
|
||||
// If SyncWAL tracks the obsolete WALs in MANIFEST,
|
||||
// reopen will fail because the WALs are missing from disk.
|
||||
ASSERT_OK(TryReopenWithColumnFamilies({"default", "cf"}, options));
|
||||
Destroy(options);
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBBasicTestTrackWal, DBBasicTestTrackWal,
|
||||
testing::Bool());
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
class DBBasicTestMultiGet : public DBTestBase {
|
||||
public:
|
||||
DBBasicTestMultiGet(std::string test_dir, int num_cfs, bool compressed_cache,
|
||||
|
@ -1324,7 +1324,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
|
||||
assert(wal.getting_synced);
|
||||
if (logs_.size() > 1) {
|
||||
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
|
||||
wal.writer->file()->GetFileSize() > 0) {
|
||||
wal.writer->file()->GetFileSize() > 0 &&
|
||||
wal.number >= versions_->GetWalSet().GetMinWalNumberToKeep()) {
|
||||
// wal.number might < min_wal_number_to_keep when
|
||||
// the WAL becomes obsolete after flushing, but not deleted from disk
|
||||
// yet, then SyncWAL is called.
|
||||
synced_wals.AddWal(wal.number,
|
||||
WalMetadata(wal.writer->file()->GetFileSize()));
|
||||
}
|
||||
|
@ -1563,6 +1563,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
InstrumentedMutexLock wl(&impl->log_write_mutex_);
|
||||
impl->logfile_number_ = new_log_number;
|
||||
assert(new_log != nullptr);
|
||||
assert(impl->logs_.empty());
|
||||
impl->logs_.emplace_back(new_log_number, new_log);
|
||||
}
|
||||
|
||||
|
@ -488,6 +488,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
|
||||
std::unique_ptr<VersionEdit> wal_deletion;
|
||||
if (vset->db_options()->track_and_verify_wals_in_manifest) {
|
||||
vset->SetMinWalNumberToKeepInWalSet(min_wal_number_to_keep);
|
||||
const auto& wals = vset->GetWalSet().GetWals();
|
||||
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
@ -752,13 +753,14 @@ Status InstallMemtableAtomicFlushResults(
|
||||
}
|
||||
|
||||
std::unique_ptr<VersionEdit> wal_deletion;
|
||||
if (vset->db_options()->track_and_verify_wals_in_manifest &&
|
||||
!vset->GetWalSet().GetWals().empty()) {
|
||||
if (vset->db_options()->track_and_verify_wals_in_manifest) {
|
||||
if (!vset->db_options()->allow_2pc) {
|
||||
min_wal_number_to_keep =
|
||||
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
|
||||
}
|
||||
if (min_wal_number_to_keep > vset->GetWalSet().GetWals().begin()->first) {
|
||||
vset->SetMinWalNumberToKeepInWalSet(min_wal_number_to_keep);
|
||||
const auto& wals = vset->GetWalSet().GetWals();
|
||||
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
|
||||
edit_lists.back().push_back(wal_deletion.get());
|
||||
|
@ -1229,6 +1229,11 @@ class VersionSet {
|
||||
// The returned WalSet needs to be accessed with DB mutex held.
|
||||
const WalSet& GetWalSet() const { return wals_; }
|
||||
|
||||
// Must be called with DB mutex held.
|
||||
void SetMinWalNumberToKeepInWalSet(WalNumber number) {
|
||||
return wals_.SetMinWalNumberToKeep(number);
|
||||
}
|
||||
|
||||
void TEST_CreateAndAppendVersion(ColumnFamilyData* cfd) {
|
||||
assert(cfd);
|
||||
|
||||
|
@ -105,12 +105,19 @@ std::string WalDeletion::DebugString() const {
|
||||
}
|
||||
|
||||
Status WalSet::AddWal(const WalAddition& wal) {
|
||||
if (wal.GetLogNumber() < min_wal_number_to_keep_) {
|
||||
std::stringstream ss;
|
||||
ss << "min_wal_number_to_keep is " << min_wal_number_to_keep_ << ", so WAL "
|
||||
<< wal.GetLogNumber() << " is obsolete";
|
||||
return Status::Corruption("WalSet::AddWal", ss.str());
|
||||
}
|
||||
|
||||
auto it = wals_.lower_bound(wal.GetLogNumber());
|
||||
bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
|
||||
if (existing && !wal.GetMetadata().HasSyncedSize()) {
|
||||
std::stringstream ss;
|
||||
ss << "WAL " << wal.GetLogNumber() << " is created more than once";
|
||||
return Status::Corruption("WalSet", ss.str());
|
||||
return Status::Corruption("WalSet::AddWal", ss.str());
|
||||
}
|
||||
// If the WAL has synced size, it must >= the previous size.
|
||||
if (wal.GetMetadata().HasSyncedSize() && existing &&
|
||||
@ -120,7 +127,7 @@ Status WalSet::AddWal(const WalAddition& wal) {
|
||||
std::stringstream ss;
|
||||
ss << "WAL " << wal.GetLogNumber()
|
||||
<< " must not have smaller synced size than previous one";
|
||||
return Status::Corruption("WalSet", ss.str());
|
||||
return Status::Corruption("WalSet::AddWal", ss.str());
|
||||
}
|
||||
if (existing) {
|
||||
it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
|
||||
@ -146,7 +153,10 @@ Status WalSet::DeleteWalsBefore(WalNumber wal) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void WalSet::Reset() { wals_.clear(); }
|
||||
void WalSet::Reset() {
|
||||
wals_.clear();
|
||||
min_wal_number_to_keep_ = 0;
|
||||
}
|
||||
|
||||
Status WalSet::CheckWals(
|
||||
Env* env,
|
||||
|
@ -141,6 +141,13 @@ class WalSet {
|
||||
// Resets the internal state.
|
||||
void Reset();
|
||||
|
||||
// WALs with number less than MinWalNumberToKeep should not exist in WalSet.
|
||||
WalNumber GetMinWalNumberToKeep() const { return min_wal_number_to_keep_; }
|
||||
// If number < MinWalNumberToKeep, then it's a no-op.
|
||||
void SetMinWalNumberToKeep(WalNumber number) {
|
||||
min_wal_number_to_keep_ = std::max(min_wal_number_to_keep_, number);
|
||||
}
|
||||
|
||||
const std::map<WalNumber, WalMetadata>& GetWals() const { return wals_; }
|
||||
|
||||
// Checks whether there are missing or corrupted WALs.
|
||||
@ -155,6 +162,9 @@ class WalSet {
|
||||
|
||||
private:
|
||||
std::map<WalNumber, WalMetadata> wals_;
|
||||
// WAL number < min_wal_number_to_keep_ should not exist in wals_.
|
||||
// It's monotonically increasing, in-memory only, not written to MANIFEST.
|
||||
WalNumber min_wal_number_to_keep_ = 0;
|
||||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -81,6 +81,27 @@ TEST(WalSet, DeleteAllWals) {
|
||||
ASSERT_OK(wals.DeleteWalsBefore(kMaxWalNumber + 1));
|
||||
}
|
||||
|
||||
TEST(WalSet, AddObsoleteWal) {
|
||||
constexpr WalNumber kNumber = 100;
|
||||
WalSet wals;
|
||||
ASSERT_OK(wals.DeleteWalsBefore(kNumber + 1));
|
||||
Status s = wals.AddWal(WalAddition(kNumber));
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
ASSERT_TRUE(s.ToString().find("WAL 100 is obsolete") != std::string::npos);
|
||||
}
|
||||
|
||||
TEST(WalSet, MinWalNumberToKeep) {
|
||||
constexpr WalNumber kNumber = 100;
|
||||
WalSet wals;
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), 0);
|
||||
wals.SetMinWalNumberToKeep(kNumber);
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), kNumber);
|
||||
wals.SetMinWalNumberToKeep(kNumber - 1);
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), kNumber);
|
||||
wals.SetMinWalNumberToKeep(kNumber + 1);
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), kNumber + 1);
|
||||
}
|
||||
|
||||
class WalSetTest : public DBTestBase {
|
||||
public:
|
||||
WalSetTest() : DBTestBase("WalSetTest", /* env_do_fsync */ true) {}
|
||||
|
Loading…
Reference in New Issue
Block a user