Always track WAL obsoletion (#7759)
Summary: Currently, when a WAL becomes obsolete after flushing, if VersionSet::WalSet does not contain the WAL, we do not track the WAL obsoletion event in MANIFEST. But consider this case: * WAL 10 is synced, a VersionEdit is LogAndApplied to MANIFEST to log this WAL addition event, but the VersionEdit is not applied to WalSet yet since its corresponding ManifestWriter is still pending in the write queue; * Since the above ManifestWriter is blocking, the LogAndApply will block on a conditional variable and release the db mutex, so another LogAndApply can proceed to enqueue other VersionEdits concurrently; * Now flush happens, and WAL 10 becomes obsolete, although WalSet does not contain WAL 10 yet, we should call LogAndApply to enqueue a VersionEdit to indicate the obsoletion of WAL 10; * otherwise, when the queued edit indicating WAL 10 addition is logged to MANIFEST, and DB crashes and reopens, the WAL 10 might have been removed from disk, but it still exists in MANIFEST. This PR changes the behavior to: always `LogAndApply` any WAL addition or obsoletion event, without considering the order issues caused by concurrency, but when applying the edits to `WalSet`, do not add the WALs if they are already obsolete. In this approach, the logical events of WAL addition and obsoletion are always tracked in MANIFEST, so we can inspect the MANIFEST and know all the previous WAL events, but we choose to ignore certain events due to the concurrency issues such as the case above, or the case in https://github.com/facebook/rocksdb/pull/7725. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7759 Test Plan: make check Reviewed By: pdillinger Differential Revision: D25423089 Pulled By: cheng-chang fbshipit-source-id: 9cb9a7fbc1875bf954f2a42f9b6cfd6d49a7b21c
This commit is contained in:
parent
98236fb10e
commit
efe827baf0
@ -1324,11 +1324,7 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
|
||||
assert(wal.getting_synced);
|
||||
if (logs_.size() > 1) {
|
||||
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
|
||||
wal.writer->file()->GetFileSize() > 0 &&
|
||||
wal.number >= versions_->GetWalSet().GetMinWalNumberToKeep()) {
|
||||
// wal.number might < min_wal_number_to_keep when
|
||||
// the WAL becomes obsolete after flushing, but not deleted from disk
|
||||
// yet, then SyncWAL is called.
|
||||
wal.writer->file()->GetFileSize() > 0) {
|
||||
synced_wals.AddWal(wal.number,
|
||||
WalMetadata(wal.writer->file()->GetFileSize()));
|
||||
}
|
||||
|
@ -488,13 +488,9 @@ Status MemTableList::TryInstallMemtableFlushResults(
|
||||
|
||||
std::unique_ptr<VersionEdit> wal_deletion;
|
||||
if (vset->db_options()->track_and_verify_wals_in_manifest) {
|
||||
vset->SetMinWalNumberToKeepInWalSet(min_wal_number_to_keep);
|
||||
const auto& wals = vset->GetWalSet().GetWals();
|
||||
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
|
||||
edit_list.push_back(wal_deletion.get());
|
||||
}
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
|
||||
edit_list.push_back(wal_deletion.get());
|
||||
}
|
||||
|
||||
const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
|
||||
@ -758,14 +754,10 @@ Status InstallMemtableAtomicFlushResults(
|
||||
min_wal_number_to_keep =
|
||||
PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists);
|
||||
}
|
||||
vset->SetMinWalNumberToKeepInWalSet(min_wal_number_to_keep);
|
||||
const auto& wals = vset->GetWalSet().GetWals();
|
||||
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
|
||||
edit_lists.back().push_back(wal_deletion.get());
|
||||
++num_entries;
|
||||
}
|
||||
wal_deletion.reset(new VersionEdit);
|
||||
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
|
||||
edit_lists.back().push_back(wal_deletion.get());
|
||||
++num_entries;
|
||||
}
|
||||
|
||||
// Mark the version edits as an atomic group if the number of version edits
|
||||
|
@ -1229,11 +1229,6 @@ class VersionSet {
|
||||
// The returned WalSet needs to be accessed with DB mutex held.
|
||||
const WalSet& GetWalSet() const { return wals_; }
|
||||
|
||||
// Must be called with DB mutex held.
|
||||
void SetMinWalNumberToKeepInWalSet(WalNumber number) {
|
||||
return wals_.SetMinWalNumberToKeep(number);
|
||||
}
|
||||
|
||||
void TEST_CreateAndAppendVersion(ColumnFamilyData* cfd) {
|
||||
assert(cfd);
|
||||
|
||||
|
@ -106,10 +106,8 @@ std::string WalDeletion::DebugString() const {
|
||||
|
||||
Status WalSet::AddWal(const WalAddition& wal) {
|
||||
if (wal.GetLogNumber() < min_wal_number_to_keep_) {
|
||||
std::stringstream ss;
|
||||
ss << "min_wal_number_to_keep is " << min_wal_number_to_keep_ << ", so WAL "
|
||||
<< wal.GetLogNumber() << " is obsolete";
|
||||
return Status::Corruption("WalSet::AddWal", ss.str());
|
||||
// The WAL has been obsolete, ignore it.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
auto it = wals_.lower_bound(wal.GetLogNumber());
|
||||
@ -149,7 +147,10 @@ Status WalSet::AddWals(const WalAdditions& wals) {
|
||||
}
|
||||
|
||||
Status WalSet::DeleteWalsBefore(WalNumber wal) {
|
||||
wals_.erase(wals_.begin(), wals_.lower_bound(wal));
|
||||
if (wal > min_wal_number_to_keep_) {
|
||||
min_wal_number_to_keep_ = wal;
|
||||
wals_.erase(wals_.begin(), wals_.lower_bound(wal));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -143,10 +143,6 @@ class WalSet {
|
||||
|
||||
// WALs with number less than MinWalNumberToKeep should not exist in WalSet.
|
||||
WalNumber GetMinWalNumberToKeep() const { return min_wal_number_to_keep_; }
|
||||
// If number < MinWalNumberToKeep, then it's a no-op.
|
||||
void SetMinWalNumberToKeep(WalNumber number) {
|
||||
min_wal_number_to_keep_ = std::max(min_wal_number_to_keep_, number);
|
||||
}
|
||||
|
||||
const std::map<WalNumber, WalMetadata>& GetWals() const { return wals_; }
|
||||
|
||||
|
@ -85,20 +85,19 @@ TEST(WalSet, AddObsoleteWal) {
|
||||
constexpr WalNumber kNumber = 100;
|
||||
WalSet wals;
|
||||
ASSERT_OK(wals.DeleteWalsBefore(kNumber + 1));
|
||||
Status s = wals.AddWal(WalAddition(kNumber));
|
||||
ASSERT_TRUE(s.IsCorruption());
|
||||
ASSERT_TRUE(s.ToString().find("WAL 100 is obsolete") != std::string::npos);
|
||||
ASSERT_OK(wals.AddWal(WalAddition(kNumber)));
|
||||
ASSERT_TRUE(wals.GetWals().empty());
|
||||
}
|
||||
|
||||
TEST(WalSet, MinWalNumberToKeep) {
|
||||
constexpr WalNumber kNumber = 100;
|
||||
WalSet wals;
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), 0);
|
||||
wals.SetMinWalNumberToKeep(kNumber);
|
||||
ASSERT_OK(wals.DeleteWalsBefore(kNumber));
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), kNumber);
|
||||
wals.SetMinWalNumberToKeep(kNumber - 1);
|
||||
ASSERT_OK(wals.DeleteWalsBefore(kNumber - 1));
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), kNumber);
|
||||
wals.SetMinWalNumberToKeep(kNumber + 1);
|
||||
ASSERT_OK(wals.DeleteWalsBefore(kNumber + 1));
|
||||
ASSERT_EQ(wals.GetMinWalNumberToKeep(), kNumber + 1);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user