Revert "Skip deleted WALs during recovery"
Summary:
This reverts commit 73f21a7b21
.
It breaks compatibility. When created a DB using a build with this new change, opening the DB and reading the data will fail with this error:
"Corruption: Can't access /000000.sst: IO error: while stat a file for size: /tmp/xxxx/000000.sst: No such file or directory"
This is because the dummy AddFile4 entry generated by the new code will be treated as a real entry by an older build. The older build will think there is a real file with number 0, but there isn't such a file.
Closes https://github.com/facebook/rocksdb/pull/3762
Differential Revision: D7730035
Pulled By: siying
fbshipit-source-id: f2051859eff20ef1837575ecb1e1bb96b3751e77
This commit is contained in:
parent
a8a28da215
commit
d5afa73789
@ -72,23 +72,19 @@ TEST_F(DBFlushTest, SyncFail) {
|
|||||||
auto* cfd =
|
auto* cfd =
|
||||||
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
|
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
|
||||||
->cfd();
|
->cfd();
|
||||||
|
int refs_before = cfd->current()->TEST_refs();
|
||||||
FlushOptions flush_options;
|
FlushOptions flush_options;
|
||||||
flush_options.wait = false;
|
flush_options.wait = false;
|
||||||
ASSERT_OK(dbfull()->Flush(flush_options));
|
ASSERT_OK(dbfull()->Flush(flush_options));
|
||||||
// Flush installs a new super-version. Get the ref count after that.
|
|
||||||
auto current_before = cfd->current();
|
|
||||||
int refs_before = cfd->current()->TEST_refs();
|
|
||||||
fault_injection_env->SetFilesystemActive(false);
|
fault_injection_env->SetFilesystemActive(false);
|
||||||
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
|
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
|
||||||
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
|
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
|
||||||
fault_injection_env->SetFilesystemActive(true);
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
// Now the background job will do the flush; wait for it.
|
|
||||||
dbfull()->TEST_WaitForFlushMemTable();
|
dbfull()->TEST_WaitForFlushMemTable();
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
ASSERT_EQ("", FilesPerLevel()); // flush failed.
|
ASSERT_EQ("", FilesPerLevel()); // flush failed.
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
// Backgroun flush job should release ref count to current version.
|
// Flush job should release ref count to current version.
|
||||||
ASSERT_EQ(current_before, cfd->current());
|
|
||||||
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
|
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
|
||||||
Destroy(options);
|
Destroy(options);
|
||||||
}
|
}
|
||||||
|
@ -350,8 +350,6 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
|
|||||||
}; // namespace
|
}; // namespace
|
||||||
|
|
||||||
// Delete obsolete files and log status and information of file deletion
|
// Delete obsolete files and log status and information of file deletion
|
||||||
// Note: All WAL files must be deleted through this function (unelss they are
|
|
||||||
// archived) to ensure that maniefest is updated properly.
|
|
||||||
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
|
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
|
||||||
FileType type, uint64_t number) {
|
FileType type, uint64_t number) {
|
||||||
Status file_deletion_status;
|
Status file_deletion_status;
|
||||||
@ -359,16 +357,6 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
|
|||||||
file_deletion_status =
|
file_deletion_status =
|
||||||
DeleteSSTFile(&immutable_db_options_, fname);
|
DeleteSSTFile(&immutable_db_options_, fname);
|
||||||
} else {
|
} else {
|
||||||
if (type == kLogFile) {
|
|
||||||
// Before deleting the file, mark file as deleted in the manifest
|
|
||||||
VersionEdit edit;
|
|
||||||
edit.SetDeletedLogNumber(number);
|
|
||||||
auto edit_cfd = versions_->GetColumnFamilySet()->GetDefault();
|
|
||||||
auto edit_cf_opts = edit_cfd->GetLatestMutableCFOptions();
|
|
||||||
mutex_.Lock();
|
|
||||||
versions_->LogAndApply(edit_cfd, *edit_cf_opts, &edit, &mutex_);
|
|
||||||
mutex_.Unlock();
|
|
||||||
}
|
|
||||||
file_deletion_status = env_->DeleteFile(fname);
|
file_deletion_status = env_->DeleteFile(fname);
|
||||||
}
|
}
|
||||||
TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
|
TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
|
||||||
|
@ -532,13 +532,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|||||||
bool flushed = false;
|
bool flushed = false;
|
||||||
uint64_t corrupted_log_number = kMaxSequenceNumber;
|
uint64_t corrupted_log_number = kMaxSequenceNumber;
|
||||||
for (auto log_number : log_numbers) {
|
for (auto log_number : log_numbers) {
|
||||||
if (log_number <= versions_->latest_deleted_log_number()) {
|
|
||||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
||||||
"Skipping log #%" PRIu64
|
|
||||||
" since it is not newer than latest deleted log #%" PRIu64,
|
|
||||||
log_number, versions_->latest_deleted_log_number());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// The previous incarnation may not have written any MANIFEST
|
// The previous incarnation may not have written any MANIFEST
|
||||||
// records after allocating this log number. So we manually
|
// records after allocating this log number. So we manually
|
||||||
// update the file number allocation counter in VersionSet.
|
// update the file number allocation counter in VersionSet.
|
||||||
|
@ -451,9 +451,8 @@ class SpecialEnv : public EnvWrapper {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status NewSequentialFile(const std::string& f,
|
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
|
||||||
unique_ptr<SequentialFile>* r,
|
const EnvOptions& soptions) override {
|
||||||
const EnvOptions& soptions) override {
|
|
||||||
class CountingFile : public SequentialFile {
|
class CountingFile : public SequentialFile {
|
||||||
public:
|
public:
|
||||||
CountingFile(unique_ptr<SequentialFile>&& target,
|
CountingFile(unique_ptr<SequentialFile>&& target,
|
||||||
|
@ -20,106 +20,6 @@ class DBWALTest : public DBTestBase {
|
|||||||
DBWALTest() : DBTestBase("/db_wal_test") {}
|
DBWALTest() : DBTestBase("/db_wal_test") {}
|
||||||
};
|
};
|
||||||
|
|
||||||
// A SpecialEnv enriched to give more insight about deleted files
|
|
||||||
class EnrichedSpecialEnv : public SpecialEnv {
|
|
||||||
public:
|
|
||||||
explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
|
|
||||||
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
|
|
||||||
const EnvOptions& soptions) override {
|
|
||||||
InstrumentedMutexLock l(&env_mutex_);
|
|
||||||
if (f == skipped_wal) {
|
|
||||||
deleted_wal_reopened = true;
|
|
||||||
if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
|
|
||||||
f.compare(largetest_deleted_wal) <= 0) {
|
|
||||||
gap_in_wals = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return SpecialEnv::NewSequentialFile(f, r, soptions);
|
|
||||||
}
|
|
||||||
Status DeleteFile(const std::string& fname) override {
|
|
||||||
if (IsWAL(fname)) {
|
|
||||||
deleted_wal_cnt++;
|
|
||||||
InstrumentedMutexLock l(&env_mutex_);
|
|
||||||
// If this is the first WAL, remember its name and skip deleting it. We
|
|
||||||
// remember its name partly because the application might attempt to
|
|
||||||
// delete the file again.
|
|
||||||
if (skipped_wal.size() != 0 && skipped_wal != fname) {
|
|
||||||
if (largetest_deleted_wal.size() == 0 ||
|
|
||||||
largetest_deleted_wal.compare(fname) < 0) {
|
|
||||||
largetest_deleted_wal = fname;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
skipped_wal = fname;
|
|
||||||
return Status::OK();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return SpecialEnv::DeleteFile(fname);
|
|
||||||
}
|
|
||||||
bool IsWAL(const std::string& fname) {
|
|
||||||
// printf("iswal %s\n", fname.c_str());
|
|
||||||
return fname.compare(fname.size() - 3, 3, "log") == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
InstrumentedMutex env_mutex_;
|
|
||||||
// the wal whose actual delete was skipped by the env
|
|
||||||
std::string skipped_wal = "";
|
|
||||||
// the largest WAL that was requested to be deleted
|
|
||||||
std::string largetest_deleted_wal = "";
|
|
||||||
// number of WALs that were successfully deleted
|
|
||||||
std::atomic<size_t> deleted_wal_cnt = {0};
|
|
||||||
// the WAL whose delete from fs was skipped is reopened during recovery
|
|
||||||
std::atomic<bool> deleted_wal_reopened = {false};
|
|
||||||
// whether a gap in the WALs was detected during recovery
|
|
||||||
std::atomic<bool> gap_in_wals = {false};
|
|
||||||
};
|
|
||||||
|
|
||||||
class DBWALTestWithEnrichedEnv : public DBTestBase {
|
|
||||||
public:
|
|
||||||
DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
|
|
||||||
enriched_env_ = new EnrichedSpecialEnv(env_->target());
|
|
||||||
auto options = CurrentOptions();
|
|
||||||
options.env = enriched_env_;
|
|
||||||
Reopen(options);
|
|
||||||
delete env_;
|
|
||||||
// to be deleted by the parent class
|
|
||||||
env_ = enriched_env_;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected:
|
|
||||||
EnrichedSpecialEnv* enriched_env_;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Test that the recovery would successfully avoid the gaps between the logs.
|
|
||||||
// One known scenario that could cause this is that the application issue the
|
|
||||||
// WAL deletion out of order. For the sake of simplicity in the test, here we
|
|
||||||
// create the gap by manipulating the env to skip deletion of the first WAL but
|
|
||||||
// not the ones after it.
|
|
||||||
TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
|
|
||||||
auto options = last_options_;
|
|
||||||
// To cause frequent WAL deletion
|
|
||||||
options.write_buffer_size = 128;
|
|
||||||
Reopen(options);
|
|
||||||
|
|
||||||
WriteOptions writeOpt = WriteOptions();
|
|
||||||
for (int i = 0; i < 128 * 5; i++) {
|
|
||||||
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
|
|
||||||
}
|
|
||||||
FlushOptions fo;
|
|
||||||
fo.wait = true;
|
|
||||||
ASSERT_OK(db_->Flush(fo));
|
|
||||||
|
|
||||||
// some wals are deleted
|
|
||||||
ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
|
|
||||||
// but not the first one
|
|
||||||
ASSERT_NE(0, enriched_env_->skipped_wal.size());
|
|
||||||
|
|
||||||
// Test that the WAL that was not deleted will be skipped during recovery
|
|
||||||
options = last_options_;
|
|
||||||
Reopen(options);
|
|
||||||
ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
|
|
||||||
ASSERT_FALSE(enriched_env_->gap_in_wals);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(DBWALTest, WAL) {
|
TEST_F(DBWALTest, WAL) {
|
||||||
do {
|
do {
|
||||||
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
||||||
@ -991,7 +891,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
|
|||||||
|
|
||||||
// Record the offset at this point
|
// Record the offset at this point
|
||||||
Env* env = options.env;
|
Env* env = options.env;
|
||||||
uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
|
int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1;
|
||||||
std::string fname = LogFileName(dbname_, wal_file_id);
|
std::string fname = LogFileName(dbname_, wal_file_id);
|
||||||
uint64_t offset_to_corrupt;
|
uint64_t offset_to_corrupt;
|
||||||
ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
|
ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));
|
||||||
|
@ -1413,12 +1413,8 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
|
|||||||
// fit in L3 but will overlap with compaction so will be added
|
// fit in L3 but will overlap with compaction so will be added
|
||||||
// to L2 but a compaction will trivially move it to L3
|
// to L2 but a compaction will trivially move it to L3
|
||||||
// and break LSM consistency
|
// and break LSM consistency
|
||||||
static std::atomic<bool> called = {false};
|
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
|
||||||
if (!called) {
|
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
|
||||||
called = true;
|
|
||||||
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
|
|
||||||
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
@ -30,7 +30,6 @@ enum Tag {
|
|||||||
kNewFile = 7,
|
kNewFile = 7,
|
||||||
// 8 was used for large value refs
|
// 8 was used for large value refs
|
||||||
kPrevLogNumber = 9,
|
kPrevLogNumber = 9,
|
||||||
kDeletedLogNumber = 10,
|
|
||||||
|
|
||||||
// these are new formats divergent from open source leveldb
|
// these are new formats divergent from open source leveldb
|
||||||
kNewFile2 = 100,
|
kNewFile2 = 100,
|
||||||
@ -45,11 +44,6 @@ enum Tag {
|
|||||||
enum CustomTag {
|
enum CustomTag {
|
||||||
kTerminate = 1, // The end of customized fields
|
kTerminate = 1, // The end of customized fields
|
||||||
kNeedCompaction = 2,
|
kNeedCompaction = 2,
|
||||||
// Since Manifest is not entirely currently forward-compatible, and the only
|
|
||||||
// forward-compatbile part is the CutsomtTag of kNewFile, we currently encode
|
|
||||||
// kDeletedLogNumber as part of a CustomTag as a hack. This should be removed
|
|
||||||
// when manifest becomes forward-comptabile.
|
|
||||||
kDeletedLogNumberHack = 3,
|
|
||||||
kPathId = 65,
|
kPathId = 65,
|
||||||
};
|
};
|
||||||
// If this bit for the custom tag is set, opening DB should fail if
|
// If this bit for the custom tag is set, opening DB should fail if
|
||||||
@ -69,14 +63,12 @@ void VersionEdit::Clear() {
|
|||||||
last_sequence_ = 0;
|
last_sequence_ = 0;
|
||||||
next_file_number_ = 0;
|
next_file_number_ = 0;
|
||||||
max_column_family_ = 0;
|
max_column_family_ = 0;
|
||||||
deleted_log_number_ = 0;
|
|
||||||
has_comparator_ = false;
|
has_comparator_ = false;
|
||||||
has_log_number_ = false;
|
has_log_number_ = false;
|
||||||
has_prev_log_number_ = false;
|
has_prev_log_number_ = false;
|
||||||
has_next_file_number_ = false;
|
has_next_file_number_ = false;
|
||||||
has_last_sequence_ = false;
|
has_last_sequence_ = false;
|
||||||
has_max_column_family_ = false;
|
has_max_column_family_ = false;
|
||||||
has_deleted_log_number_ = false;
|
|
||||||
deleted_files_.clear();
|
deleted_files_.clear();
|
||||||
new_files_.clear();
|
new_files_.clear();
|
||||||
column_family_ = 0;
|
column_family_ = 0;
|
||||||
@ -105,24 +97,6 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
|
|||||||
if (has_max_column_family_) {
|
if (has_max_column_family_) {
|
||||||
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
|
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
|
||||||
}
|
}
|
||||||
if (has_deleted_log_number_) {
|
|
||||||
// TODO(myabandeh): uncomment me when manifest is forward-compatible
|
|
||||||
// PutVarint32Varint64(dst, kDeletedLogNumber, deleted_log_number_);
|
|
||||||
// Since currently manifest is not forward compatible we encode this entry
|
|
||||||
// disguised as a kNewFile4 entry which has forward-compatible extensions.
|
|
||||||
PutVarint32(dst, kNewFile4);
|
|
||||||
PutVarint32Varint64(dst, 0u, 0ull); // level and number
|
|
||||||
PutVarint64(dst, 0ull); // file size
|
|
||||||
InternalKey dummy_key(Slice("dummy_key"), 0ull, ValueType::kTypeValue);
|
|
||||||
PutLengthPrefixedSlice(dst, dummy_key.Encode()); // smallest
|
|
||||||
PutLengthPrefixedSlice(dst, dummy_key.Encode()); // largest
|
|
||||||
PutVarint64Varint64(dst, 0ull, 0ull); // smallest_seqno and largerst
|
|
||||||
PutVarint32(dst, CustomTag::kDeletedLogNumberHack);
|
|
||||||
std::string buf;
|
|
||||||
PutFixed64(&buf, deleted_log_number_);
|
|
||||||
PutLengthPrefixedSlice(dst, Slice(buf));
|
|
||||||
PutVarint32(dst, CustomTag::kTerminate);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const auto& deleted : deleted_files_) {
|
for (const auto& deleted : deleted_files_) {
|
||||||
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
|
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
|
||||||
@ -244,10 +218,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
|||||||
uint64_t number;
|
uint64_t number;
|
||||||
uint32_t path_id = 0;
|
uint32_t path_id = 0;
|
||||||
uint64_t file_size;
|
uint64_t file_size;
|
||||||
// Since this is the only forward-compatible part of the code, we hack new
|
|
||||||
// extension into this record. When we do, we set this boolean to distinguish
|
|
||||||
// the record from the normal NewFile records.
|
|
||||||
bool this_is_not_a_new_file_record = false;
|
|
||||||
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
|
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
|
||||||
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
|
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
|
||||||
GetInternalKey(input, &f.largest) &&
|
GetInternalKey(input, &f.largest) &&
|
||||||
@ -282,15 +252,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
|||||||
}
|
}
|
||||||
f.marked_for_compaction = (field[0] == 1);
|
f.marked_for_compaction = (field[0] == 1);
|
||||||
break;
|
break;
|
||||||
case kDeletedLogNumberHack:
|
|
||||||
// This is a hack to encode kDeletedLogNumber in a forward-compatbile
|
|
||||||
// fashion.
|
|
||||||
this_is_not_a_new_file_record = true;
|
|
||||||
if (!GetFixed64(&field, &deleted_log_number_)) {
|
|
||||||
return "deleted log number malformatted";
|
|
||||||
}
|
|
||||||
has_deleted_log_number_ = true;
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
|
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
|
||||||
// Should not proceed if cannot understand it
|
// Should not proceed if cannot understand it
|
||||||
@ -302,10 +263,6 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
|
|||||||
} else {
|
} else {
|
||||||
return "new-file4 entry";
|
return "new-file4 entry";
|
||||||
}
|
}
|
||||||
if (this_is_not_a_new_file_record) {
|
|
||||||
// Since this has nothing to do with NewFile, return immediately.
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
f.fd = FileDescriptor(number, path_id, file_size);
|
f.fd = FileDescriptor(number, path_id, file_size);
|
||||||
new_files_.push_back(std::make_pair(level, f));
|
new_files_.push_back(std::make_pair(level, f));
|
||||||
return nullptr;
|
return nullptr;
|
||||||
@ -374,14 +331,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case kDeletedLogNumber:
|
|
||||||
if (GetVarint64(&input, &deleted_log_number_)) {
|
|
||||||
has_deleted_log_number_ = true;
|
|
||||||
} else {
|
|
||||||
msg = "deleted log number";
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case kCompactPointer:
|
case kCompactPointer:
|
||||||
if (GetLevel(&input, &level, &msg) &&
|
if (GetLevel(&input, &level, &msg) &&
|
||||||
GetInternalKey(&input, &key)) {
|
GetInternalKey(&input, &key)) {
|
||||||
@ -564,10 +513,6 @@ std::string VersionEdit::DebugString(bool hex_key) const {
|
|||||||
r.append("\n MaxColumnFamily: ");
|
r.append("\n MaxColumnFamily: ");
|
||||||
AppendNumberTo(&r, max_column_family_);
|
AppendNumberTo(&r, max_column_family_);
|
||||||
}
|
}
|
||||||
if (has_deleted_log_number_) {
|
|
||||||
r.append("\n DeletedLogNumber: ");
|
|
||||||
AppendNumberTo(&r, deleted_log_number_);
|
|
||||||
}
|
|
||||||
r.append("\n}\n");
|
r.append("\n}\n");
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
@ -637,9 +582,6 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
|
|||||||
if (has_max_column_family_) {
|
if (has_max_column_family_) {
|
||||||
jw << "MaxColumnFamily" << max_column_family_;
|
jw << "MaxColumnFamily" << max_column_family_;
|
||||||
}
|
}
|
||||||
if (has_deleted_log_number_) {
|
|
||||||
jw << "DeletedLogNumber" << deleted_log_number_;
|
|
||||||
}
|
|
||||||
|
|
||||||
jw.EndObject();
|
jw.EndObject();
|
||||||
|
|
||||||
|
@ -199,10 +199,6 @@ class VersionEdit {
|
|||||||
has_max_column_family_ = true;
|
has_max_column_family_ = true;
|
||||||
max_column_family_ = max_column_family;
|
max_column_family_ = max_column_family;
|
||||||
}
|
}
|
||||||
void SetDeletedLogNumber(uint64_t num) {
|
|
||||||
has_deleted_log_number_ = true;
|
|
||||||
deleted_log_number_ = num;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the specified file at the specified number.
|
// Add the specified file at the specified number.
|
||||||
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
|
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
|
||||||
@ -289,8 +285,6 @@ class VersionEdit {
|
|||||||
uint64_t prev_log_number_;
|
uint64_t prev_log_number_;
|
||||||
uint64_t next_file_number_;
|
uint64_t next_file_number_;
|
||||||
uint32_t max_column_family_;
|
uint32_t max_column_family_;
|
||||||
// The most recent WAL log number that is deleted
|
|
||||||
uint64_t deleted_log_number_;
|
|
||||||
SequenceNumber last_sequence_;
|
SequenceNumber last_sequence_;
|
||||||
bool has_comparator_;
|
bool has_comparator_;
|
||||||
bool has_log_number_;
|
bool has_log_number_;
|
||||||
@ -298,7 +292,6 @@ class VersionEdit {
|
|||||||
bool has_next_file_number_;
|
bool has_next_file_number_;
|
||||||
bool has_last_sequence_;
|
bool has_last_sequence_;
|
||||||
bool has_max_column_family_;
|
bool has_max_column_family_;
|
||||||
bool has_deleted_log_number_;
|
|
||||||
|
|
||||||
DeletedFileSet deleted_files_;
|
DeletedFileSet deleted_files_;
|
||||||
std::vector<std::pair<int, FileMetaData>> new_files_;
|
std::vector<std::pair<int, FileMetaData>> new_files_;
|
||||||
|
@ -181,16 +181,6 @@ TEST_F(VersionEditTest, ColumnFamilyTest) {
|
|||||||
TestEncodeDecode(edit);
|
TestEncodeDecode(edit);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(VersionEditTest, DeletedLogNumber) {
|
|
||||||
VersionEdit edit;
|
|
||||||
edit.SetDeletedLogNumber(13);
|
|
||||||
TestEncodeDecode(edit);
|
|
||||||
|
|
||||||
edit.Clear();
|
|
||||||
edit.SetDeletedLogNumber(23);
|
|
||||||
TestEncodeDecode(edit);
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -3122,7 +3122,6 @@ Status VersionSet::Recover(
|
|||||||
uint64_t log_number = 0;
|
uint64_t log_number = 0;
|
||||||
uint64_t previous_log_number = 0;
|
uint64_t previous_log_number = 0;
|
||||||
uint32_t max_column_family = 0;
|
uint32_t max_column_family = 0;
|
||||||
uint64_t deleted_log_number = 0;
|
|
||||||
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
|
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
|
||||||
|
|
||||||
// add default column family
|
// add default column family
|
||||||
@ -3263,11 +3262,6 @@ Status VersionSet::Recover(
|
|||||||
max_column_family = edit.max_column_family_;
|
max_column_family = edit.max_column_family_;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (edit.has_deleted_log_number_) {
|
|
||||||
deleted_log_number =
|
|
||||||
std::max(deleted_log_number, edit.deleted_log_number_);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (edit.has_last_sequence_) {
|
if (edit.has_last_sequence_) {
|
||||||
last_sequence = edit.last_sequence_;
|
last_sequence = edit.last_sequence_;
|
||||||
have_last_sequence = true;
|
have_last_sequence = true;
|
||||||
@ -3290,7 +3284,6 @@ Status VersionSet::Recover(
|
|||||||
|
|
||||||
column_family_set_->UpdateMaxColumnFamily(max_column_family);
|
column_family_set_->UpdateMaxColumnFamily(max_column_family);
|
||||||
|
|
||||||
MarkDeletedLogNumber(deleted_log_number);
|
|
||||||
MarkFileNumberUsed(previous_log_number);
|
MarkFileNumberUsed(previous_log_number);
|
||||||
MarkFileNumberUsed(log_number);
|
MarkFileNumberUsed(log_number);
|
||||||
}
|
}
|
||||||
@ -3362,12 +3355,11 @@ Status VersionSet::Recover(
|
|||||||
"manifest_file_number is %lu, next_file_number is %lu, "
|
"manifest_file_number is %lu, next_file_number is %lu, "
|
||||||
"last_sequence is %lu, log_number is %lu,"
|
"last_sequence is %lu, log_number is %lu,"
|
||||||
"prev_log_number is %lu,"
|
"prev_log_number is %lu,"
|
||||||
"max_column_family is %u,"
|
"max_column_family is %u\n",
|
||||||
"deleted_log_number is %lu\n",
|
|
||||||
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
|
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
|
||||||
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
|
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
|
||||||
(unsigned long)log_number, (unsigned long)prev_log_number_,
|
(unsigned long)log_number, (unsigned long)prev_log_number_,
|
||||||
column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number());
|
column_family_set_->GetMaxColumnFamily());
|
||||||
|
|
||||||
for (auto cfd : *column_family_set_) {
|
for (auto cfd : *column_family_set_) {
|
||||||
if (cfd->IsDropped()) {
|
if (cfd->IsDropped()) {
|
||||||
@ -3673,10 +3665,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
|
|||||||
if (edit.has_max_column_family_) {
|
if (edit.has_max_column_family_) {
|
||||||
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
|
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (edit.has_deleted_log_number_) {
|
|
||||||
MarkDeletedLogNumber(edit.deleted_log_number_);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
file_reader.reset();
|
file_reader.reset();
|
||||||
@ -3735,11 +3723,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
|
|||||||
|
|
||||||
printf(
|
printf(
|
||||||
"next_file_number %lu last_sequence "
|
"next_file_number %lu last_sequence "
|
||||||
"%lu prev_log_number %lu max_column_family %u deleted_log_number "
|
"%lu prev_log_number %lu max_column_family %u\n",
|
||||||
"%" PRIu64 "\n",
|
|
||||||
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
|
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
|
||||||
(unsigned long)previous_log_number,
|
(unsigned long)previous_log_number,
|
||||||
column_family_set_->GetMaxColumnFamily(), latest_deleted_log_number());
|
column_family_set_->GetMaxColumnFamily());
|
||||||
}
|
}
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
@ -3754,14 +3741,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Called only either from ::LogAndApply which is protected by mutex or during
|
|
||||||
// recovery which is single-threaded.
|
|
||||||
void VersionSet::MarkDeletedLogNumber(uint64_t number) {
|
|
||||||
if (latest_deleted_log_number_.load(std::memory_order_relaxed) < number) {
|
|
||||||
latest_deleted_log_number_.store(number, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Status VersionSet::WriteSnapshot(log::Writer* log) {
|
Status VersionSet::WriteSnapshot(log::Writer* log) {
|
||||||
// TODO: Break up into multiple records to reduce memory usage on recovery?
|
// TODO: Break up into multiple records to reduce memory usage on recovery?
|
||||||
|
|
||||||
|
@ -802,10 +802,6 @@ class VersionSet {
|
|||||||
|
|
||||||
uint64_t current_next_file_number() const { return next_file_number_.load(); }
|
uint64_t current_next_file_number() const { return next_file_number_.load(); }
|
||||||
|
|
||||||
uint64_t latest_deleted_log_number() const {
|
|
||||||
return latest_deleted_log_number_.load();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Allocate and return a new file number
|
// Allocate and return a new file number
|
||||||
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
|
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
|
||||||
|
|
||||||
@ -853,11 +849,6 @@ class VersionSet {
|
|||||||
// REQUIRED: this is only called during single-threaded recovery or repair.
|
// REQUIRED: this is only called during single-threaded recovery or repair.
|
||||||
void MarkFileNumberUsed(uint64_t number);
|
void MarkFileNumberUsed(uint64_t number);
|
||||||
|
|
||||||
// Mark the specified log number as deleted
|
|
||||||
// REQUIRED: this is only called during single-threaded recovery or repair, or
|
|
||||||
// from ::LogAndApply where the global mutex is held.
|
|
||||||
void MarkDeletedLogNumber(uint64_t number);
|
|
||||||
|
|
||||||
// Return the log file number for the log file that is currently
|
// Return the log file number for the log file that is currently
|
||||||
// being compacted, or zero if there is no such log file.
|
// being compacted, or zero if there is no such log file.
|
||||||
uint64_t prev_log_number() const { return prev_log_number_; }
|
uint64_t prev_log_number() const { return prev_log_number_; }
|
||||||
@ -955,8 +946,6 @@ class VersionSet {
|
|||||||
const std::string dbname_;
|
const std::string dbname_;
|
||||||
const ImmutableDBOptions* const db_options_;
|
const ImmutableDBOptions* const db_options_;
|
||||||
std::atomic<uint64_t> next_file_number_;
|
std::atomic<uint64_t> next_file_number_;
|
||||||
// Any log number equal or lower than this should be ignored during recovery.
|
|
||||||
std::atomic<uint64_t> latest_deleted_log_number_ = {0};
|
|
||||||
uint64_t manifest_file_number_;
|
uint64_t manifest_file_number_;
|
||||||
uint64_t options_file_number_;
|
uint64_t options_file_number_;
|
||||||
uint64_t pending_manifest_file_number_;
|
uint64_t pending_manifest_file_number_;
|
||||||
|
Loading…
Reference in New Issue
Block a user