Recover to exact latest seqno of data committed to MANIFEST (#9305)

Summary:
The LastSequence field in the MANIFEST file is the baseline seqno for a recovered DB. Recovering WAL entries might cause the recovered DB's seqno to advance above this baseline, but the recovered DB will never use a smaller seqno.

Before this PR, we were writing the DB's seqno at the time of LogAndApply() as the LastSequence value. This works in the sense that it is a large enough baseline for the recovered DB that it'll never overwrite any records in existing SST files. At the same time, it's arbitrarily larger than what's needed. This behavior comes from LevelDB, where there was no tracking of largest seqno in an SST file.

Now we know the largest seqno of newly written SST files, so we can write an exact value in LastSequence that actually reflects the largest seqno in any file referred to by the MANIFEST. This is primarily useful for correctness testing with unsynced data loss, where the recovered DB's seqno needs to indicate what records were recovered.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9305

Test Plan:
- https://github.com/facebook/rocksdb/issues/9338 adds crash-recovery correctness testing coverage for WAL disabled use cases
- https://github.com/facebook/rocksdb/issues/9357 will extend that testing to cover file ingestion
- Added assertion at end of LogAndApply() for `VersionSet::descriptor_last_sequence_` consistency with files
- Manually tested upgrade/downgrade compatibility with a custom crash test that randomly picks between a `db_stress` built with and without this PR (for old code it must run with `-disable_wal=0`)

Reviewed By: riversand963

Differential Revision: D33182770

Pulled By: ajkr

fbshipit-source-id: 0bfafaf685f347cc8cb0e1d62e0186340a738f7d
This commit is contained in:
Andrew Kryczka 2022-01-05 16:00:41 -08:00 committed by Facebook GitHub Bot
parent fe31dc53ca
commit b860a42158
5 changed files with 73 additions and 20 deletions

View File

@ -418,11 +418,17 @@ class VersionEdit {
temperature, oldest_blob_file_number, oldest_ancester_time,
file_creation_time, file_checksum, file_checksum_func_name,
min_timestamp, max_timestamp));
if (!HasLastSequence() || largest_seqno > GetLastSequence()) {
SetLastSequence(largest_seqno);
}
}
void AddFile(int level, const FileMetaData& f) {
assert(f.fd.smallest_seqno <= f.fd.largest_seqno);
new_files_.emplace_back(level, f);
if (!HasLastSequence() || f.fd.largest_seqno > GetLastSequence()) {
SetLastSequence(f.fd.largest_seqno);
}
}
// Retrieve the table files added as well as their associated levels.

View File

@ -442,6 +442,14 @@ void VersionEditHandler::CheckIterationResult(const log::Reader& reader,
last_seq > version_set_->last_sequence_.load()) {
version_set_->last_sequence_.store(last_seq);
}
if (last_seq != kMaxSequenceNumber &&
last_seq > version_set_->descriptor_last_sequence_) {
// This is the maximum last sequence of all `VersionEdit`s iterated. It
// may be greater than the maximum `largest_seqno` of all files in case
// the newest data referred to by the MANIFEST has been dropped or had its
// sequence number zeroed through compaction.
version_set_->descriptor_last_sequence_ = last_seq;
}
version_set_->prev_log_number_ = version_edit_params_.prev_log_number_;
}
}
@ -597,6 +605,11 @@ Status VersionEditHandler::ExtractInfoFromVersionEdit(ColumnFamilyData* cfd,
edit.min_log_number_to_keep_);
}
if (edit.has_last_sequence_) {
// `VersionEdit::last_sequence_`s are assumed to be non-decreasing. This
// is legacy behavior that cannot change without breaking downgrade
// compatibility.
assert(!version_edit_params_.has_last_sequence_ ||
version_edit_params_.last_sequence_ <= edit.last_sequence_);
version_edit_params_.SetLastSequence(edit.last_sequence_);
}
if (!version_edit_params_.has_prev_log_number_) {

View File

@ -4158,9 +4158,16 @@ Status VersionSet::ProcessManifestWrites(
autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
// Tracking `max_last_sequence` is needed to ensure we write
// `VersionEdit::last_sequence_`s in non-decreasing order according to the
// recovery code's requirement. It also allows us to defer updating
// `descriptor_last_sequence_` until the apply phase, after the log phase
// succeeds.
SequenceNumber max_last_sequence = descriptor_last_sequence_;
if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
// No group commits for column family add or drop
LogAndApplyCFHelper(first_writer.edit_list.front());
LogAndApplyCFHelper(first_writer.edit_list.front(), &max_last_sequence);
batch_edits.push_back(first_writer.edit_list.front());
} else {
auto it = manifest_writers_.cbegin();
@ -4248,7 +4255,8 @@ Status VersionSet::ProcessManifestWrites(
} else if (group_start != std::numeric_limits<size_t>::max()) {
group_start = std::numeric_limits<size_t>::max();
}
Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
Status s = LogAndApplyHelper(last_writer->cfd, builder, e,
&max_last_sequence, mu);
if (!s.ok()) {
// free up the allocated memory
for (auto v : versions) {
@ -4520,9 +4528,11 @@ Status VersionSet::ProcessManifestWrites(
if (first_writer.edit_list.front()->is_column_family_add_) {
assert(batch_edits.size() == 1);
assert(new_cf_options != nullptr);
assert(max_last_sequence == descriptor_last_sequence_);
CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
} else if (first_writer.edit_list.front()->is_column_family_drop_) {
assert(batch_edits.size() == 1);
assert(max_last_sequence == descriptor_last_sequence_);
first_writer.cfd->SetDropped();
first_writer.cfd->UnrefAndTryDelete();
} else {
@ -4562,6 +4572,8 @@ Status VersionSet::ProcessManifestWrites(
AppendVersion(cfd, versions[i]);
}
}
assert(max_last_sequence >= descriptor_last_sequence_);
descriptor_last_sequence_ = max_last_sequence;
manifest_file_number_ = pending_manifest_file_number_;
manifest_file_size_ = new_manifest_file_size;
prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
@ -4628,6 +4640,23 @@ Status VersionSet::ProcessManifestWrites(
pending_manifest_file_number_ = 0;
#ifndef NDEBUG
// This is here kind of awkwardly because there's no other consistency
// checks on `VersionSet`'s updates for the new `Version`s. We might want
// to move it to a dedicated function, or remove it if we gain enough
// confidence in `descriptor_last_sequence_`.
if (s.ok()) {
for (const auto* v : versions) {
const auto* vstorage = v->storage_info();
for (int level = 0; level < vstorage->num_levels(); ++level) {
for (const auto& file : vstorage->LevelFiles(level)) {
assert(file->fd.largest_seqno <= descriptor_last_sequence_);
}
}
}
}
#endif // NDEBUG
// wake up all the waiting writers
while (true) {
ManifestWriter* ready = manifest_writers_.front();
@ -4751,16 +4780,13 @@ Status VersionSet::LogAndApply(
new_cf_options);
}
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
void VersionSet::LogAndApplyCFHelper(VersionEdit* edit,
SequenceNumber* max_last_sequence) {
assert(max_last_sequence != nullptr);
assert(edit->IsColumnFamilyManipulation());
edit->SetNextFile(next_file_number_.load());
// The log might have data that is not visible to memtbale and hence have not
// updated the last_sequence_ yet. It is also possible that the log has is
// expecting some new data that is not written yet. Since LastSequence is an
// upper bound on the sequence, it is ok to record
// last_allocated_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);
assert(!edit->HasLastSequence());
edit->SetLastSequence(*max_last_sequence);
if (edit->is_column_family_drop_) {
// if we drop column family, we have to make sure to save max column family,
// so that we don't reuse existing ID
@ -4770,12 +4796,14 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
VersionBuilder* builder, VersionEdit* edit,
SequenceNumber* max_last_sequence,
InstrumentedMutex* mu) {
#ifdef NDEBUG
(void)cfd;
#endif
mu->AssertHeld();
assert(!edit->IsColumnFamilyManipulation());
assert(max_last_sequence != nullptr);
if (edit->has_log_number_) {
assert(edit->log_number_ >= cfd->GetLogNumber());
@ -4786,13 +4814,11 @@ Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_.load());
// The log might have data that is not visible to memtbale and hence have not
// updated the last_sequence_ yet. It is also possible that the log has is
// expecting some new data that is not written yet. Since LastSequence is an
// upper bound on the sequence, it is ok to record
// last_allocated_sequence_ as the last sequence.
edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
: last_sequence_);
if (edit->HasLastSequence() && edit->GetLastSequence() > *max_last_sequence) {
*max_last_sequence = edit->GetLastSequence();
} else {
edit->SetLastSequence(*max_last_sequence);
}
// The builder can be nullptr only if edit is WAL manipulation,
// because WAL edits do not need to be applied to versions,
@ -5431,6 +5457,9 @@ Status VersionSet::WriteCurrentStateToManifest(
if (!full_history_ts_low.empty()) {
edit.SetFullHistoryTsLow(full_history_ts_low);
}
edit.SetLastSequence(descriptor_last_sequence_);
std::string record;
if (!edit.EncodeTo(&record)) {
return Status::Corruption(

View File

@ -1386,6 +1386,9 @@ class VersionSet {
// the memtable but when using two write queues it could also indicate the
// last sequence in the WAL visible to reads.
std::atomic<uint64_t> last_sequence_;
// The last sequence number of data committed to the descriptor (manifest
// file).
SequenceNumber descriptor_last_sequence_ = 0;
// The last seq that is already allocated. It is applicable only when we have
// two write queues. In that case seq might or might not have appreated in
// memtable but it is expected to appear in the WAL.
@ -1434,9 +1437,11 @@ class VersionSet {
bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options);
void LogAndApplyCFHelper(VersionEdit* edit);
void LogAndApplyCFHelper(VersionEdit* edit,
SequenceNumber* max_last_sequence);
Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b,
VersionEdit* edit, InstrumentedMutex* mu);
VersionEdit* edit, SequenceNumber* max_last_sequence,
InstrumentedMutex* mu);
};
// ReactiveVersionSet represents a collection of versions of the column

View File

@ -1476,7 +1476,7 @@ public class RocksDBTest {
try (final RocksDB db = RocksDB.open(options, dbPath)) {
final RocksDB.LiveFiles livefiles = db.getLiveFiles(true);
assertThat(livefiles).isNotNull();
assertThat(livefiles.manifestFileSize).isEqualTo(57);
assertThat(livefiles.manifestFileSize).isEqualTo(59);
assertThat(livefiles.files.size()).isEqualTo(3);
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");