Persist the new MANIFEST after successfully syncing the new WAL during recovery (#9922)

Summary:
In case of non-TransactionDB and avoid_flush_during_recovery = true, RocksDB won't
flush the data from WAL to L0 for all column families if possible. As a
result, not all column families can increase their log_numbers, and
min_log_number_to_keep won't change.
For transaction DB (.allow_2pc), even with the flush, there may be old WAL files that it must not delete because they can contain data of uncommitted transactions and min_log_number_to_keep won't change.
If we persist a new MANIFEST with
advanced log_numbers for some column families, then during a second
crash after persisting the MANIFEST, RocksDB will see some column
families' log_numbers larger than the corrupted wal, and the "column family inconsistency" error will be hit, causing recovery to fail.

As a solution, RocksDB will persist the new MANIFEST after successfully syncing the new WAL.
If a future recovery starts from the new MANIFEST, then it means the new WAL is successfully synced. Due to the sentinel empty write batch at the beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
If future recovery starts from the old MANIFEST, it means the writing the new MANIFEST failed. We won't have the "SST ahead of WAL" error.
Currently, RocksDB DB::Open() may creates and writes to two new MANIFEST files even before recovery succeeds. This PR buffers the edits in a structure and writes to a new MANIFEST after recovery is successful

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9922

Test Plan:
1. Update unit tests to fail without this change
2. make crast_test -j

Branch with unit test and no fix  https://github.com/facebook/rocksdb/pull/9942 to keep track of unit test (without fix)

Reviewed By: riversand963

Differential Revision: D36043701

Pulled By: akankshamahajan15

fbshipit-source-id: 5760970db0a0920fb73d3c054a4155733500acd9
This commit is contained in:
Akanksha Mahajan 2022-06-01 10:52:26 -07:00 committed by Yanqin Jin
parent 8244f13448
commit 405a35f8c3
8 changed files with 236 additions and 113 deletions

View File

@ -1065,7 +1065,7 @@ INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest,
// The combination of corrupting a WAL and injecting an error during subsequent
// re-open exposes the bug of prematurely persisting a new MANIFEST with
// advanced ColumnFamilyData::log_number.
TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
CloseDb();
Options options;
options.track_and_verify_wals_in_manifest =
@ -1107,7 +1107,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
// number. TEST_SwitchMemtable makes sure WALs are not synced and test can
// corrupt un-sync WAL.
for (int i = 0; i < 2; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), "value"));
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
}
@ -1188,6 +1189,23 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
{
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
// Verify that data is not lost.
{
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
ASSERT_EQ("dontcare", v);
v.clear();
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v));
ASSERT_EQ("value" + std::to_string(0), v);
// Since it's corrupting second last wal, below key is not found.
v.clear();
ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v),
Status::NotFound());
}
for (auto* h : handles) {
delete h;
}
@ -1219,8 +1237,7 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest, DISABLED_CrashDuringRecovery) {
// The combination of corrupting a WAL and injecting an error during subsequent
// re-open exposes the bug of prematurely persisting a new MANIFEST with
// advanced ColumnFamilyData::log_number.
TEST_P(CrashDuringRecoveryWithCorruptionTest,
DISABLED_TxnDbCrashDuringRecovery) {
TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
CloseDb();
Options options;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
@ -1271,13 +1288,14 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
// Put and flush cf0
for (int i = 0; i < 2; ++i) {
ASSERT_OK(txn_db->Put(WriteOptions(), "dontcare", "value"));
ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
}
// Put cf1
txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->Put(handles[1], "foo1", "value"));
ASSERT_OK(txn->Put(handles[1], "foo1", "value1"));
ASSERT_OK(txn->Commit());
delete txn;
@ -1337,7 +1355,6 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
std::vector<uint64_t> file_nums;
GetSortedWalFiles(file_nums);
size_t size = file_nums.size();
assert(size >= 2);
uint64_t log_num = file_nums[size - 1];
CorruptFileWithTruncation(FileType::kWalFile, log_num);
}
@ -1354,6 +1371,27 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
{
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
&handles, &txn_db));
// Verify that data is not lost.
{
std::string v;
// Key not visible since it's not committed.
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v),
Status::NotFound());
v.clear();
ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v));
ASSERT_EQ("value" + std::to_string(0), v);
// Last WAL is corrupted which contains two keys below.
v.clear();
ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v),
Status::NotFound());
v.clear();
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v),
Status::NotFound());
}
for (auto* h : handles) {
delete h;
}
@ -1396,8 +1434,7 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
// The combination of corrupting a WAL and injecting an error during subsequent
// re-open exposes the bug of prematurely persisting a new MANIFEST with
// advanced ColumnFamilyData::log_number.
TEST_P(CrashDuringRecoveryWithCorruptionTest,
DISABLED_CrashDuringRecoveryWithFlush) {
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) {
CloseDb();
Options options;
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
@ -1430,7 +1467,8 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
// Write to default_cf and flush this cf several times to advance wal
// number.
for (int i = 0; i < 2; ++i) {
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i), "value"));
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
"value" + std::to_string(i)));
ASSERT_OK(db_->Flush(FlushOptions()));
}
@ -1483,6 +1521,25 @@ TEST_P(CrashDuringRecoveryWithCorruptionTest,
{
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
// Verify that data is not lost.
{
std::string v;
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
ASSERT_EQ("dontcare", v);
for (int i = 0; i < 2; ++i) {
v.clear();
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v));
ASSERT_EQ("value" + std::to_string(i), v);
}
// Since it's corrupting last wal after Flush, below key is not found.
v.clear();
ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v),
Status::NotFound());
}
for (auto* h : handles) {
delete h;
}

View File

@ -1240,6 +1240,39 @@ class DBImpl : public DB {
std::atomic<bool> shutting_down_;
// RecoveryContext struct stores the context about version edits along
// with corresponding column_family_data and column_family_options.
class RecoveryContext {
public:
~RecoveryContext() {
for (auto& edit_list : edit_lists_) {
for (auto* edit : edit_list) {
delete edit;
}
}
}
void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
assert(cfd != nullptr);
if (map_.find(cfd->GetID()) == map_.end()) {
uint32_t size = static_cast<uint32_t>(map_.size());
map_.emplace(cfd->GetID(), size);
cfds_.emplace_back(cfd);
mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions());
edit_lists_.emplace_back(autovector<VersionEdit*>());
}
uint32_t i = map_[cfd->GetID()];
edit_lists_[i].emplace_back(new VersionEdit(edit));
}
std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index;
autovector<ColumnFamilyData*> cfds_;
autovector<const MutableCFOptions*> mutable_cf_opts_;
autovector<autovector<VersionEdit*>> edit_lists_;
// files_to_delete_ contains sst files
std::unordered_set<std::string> files_to_delete_;
};
// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
@ -1356,16 +1389,19 @@ class DBImpl : public DB {
// be made to the descriptor are added to *edit.
// recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
// skipped.
// recovery_ctx stores the context about version edits and all those
// edits are persisted to new Manifest after successfully syncing the new WAL.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_wal_file_exists = false,
bool error_if_data_exists_in_wals = false,
uint64_t* recovered_seq = nullptr);
uint64_t* recovered_seq = nullptr,
RecoveryContext* recovery_ctx = nullptr);
virtual bool OwnTablesAndLogs() const { return true; }
// Set DB identity file, and write DB ID to manifest if necessary.
Status SetDBId(bool read_only);
Status SetDBId(bool read_only, RecoveryContext* recovery_ctx);
// REQUIRES: db mutex held when calling this function, but the db mutex can
// be released and re-acquired. Db mutex will be held when the function
@ -1374,12 +1410,15 @@ class DBImpl : public DB {
// not referenced in the MANIFEST (e.g.
// 1. It's best effort recovery;
// 2. The VersionEdits referencing the SST files are appended to
// MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are
// RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
// still not synced to MANIFEST during recovery.)
// We delete these SST files. In the
// It stores the SST files to be deleted in RecoveryContext. In the
// meantime, we find out the largest file number present in the paths, and
// bump up the version set's next_file_number_ to be 1 + largest_file_number.
Status DeleteUnreferencedSstFiles();
// recovery_ctx stores the context about version edits and files to be
// deleted. All those edits are persisted to new Manifest after successfully
// syncing the new WAL.
Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx);
// SetDbSessionId() should be called in the constuctor DBImpl()
// to ensure that db_session_id_ gets updated every time the DB is opened
@ -1389,6 +1428,14 @@ class DBImpl : public DB {
Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family,
const Slice& ts) const;
// recovery_ctx stores the context about version edits and
// LogAndApplyForRecovery persist all those edits to new Manifest after
// successfully syncing new WAL.
// LogAndApplyForRecovery should be called only once during recovery and it
// should be called when RocksDB writes to a first new MANIFEST since this
// recovery.
Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);
private:
friend class DB;
friend class ErrorHandler;
@ -1645,7 +1692,8 @@ class DBImpl : public DB {
// corrupted_log_found is set to true if we recover from a corrupted log file.
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_log_found);
bool* corrupted_log_found,
RecoveryContext* recovery_ctx);
// The following two methods are used to flush a memtable to
// storage. The first one is used at database RecoveryTime (when the

View File

@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC(
return min_log_number_to_keep;
}
Status DBImpl::SetDBId(bool read_only) {
Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) {
Status s;
// Happens when immutable_db_options_.write_dbid_to_manifest is set to true
// the very first time.
@ -890,14 +890,14 @@ Status DBImpl::SetDBId(bool read_only) {
}
s = GetDbIdentityFromIdentityFile(&db_id_);
if (immutable_db_options_.write_dbid_to_manifest && s.ok()) {
assert(!read_only);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
VersionEdit edit;
edit.SetDBId(db_id_);
Options options;
MutableCFOptions mutable_cf_options(options);
versions_->db_id_ = db_id_;
s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
mutable_cf_options, &edit, &mutex_, nullptr,
/* new_descriptor_log */ false);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
}
} else if (!read_only) {
s = SetIdentityFile(env_, dbname_, db_id_);
@ -905,7 +905,7 @@ Status DBImpl::SetDBId(bool read_only) {
return s;
}
Status DBImpl::DeleteUnreferencedSstFiles() {
Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
std::vector<std::string> paths;
paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator)));
@ -925,7 +925,6 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
uint64_t next_file_number = versions_->current_next_file_number();
uint64_t largest_file_number = next_file_number;
std::set<std::string> files_to_delete;
Status s;
for (const auto& path : paths) {
std::vector<std::string> files;
@ -943,8 +942,9 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
const std::string normalized_fpath = path + fname;
largest_file_number = std::max(largest_file_number, number);
if (type == kTableFile && number >= next_file_number &&
files_to_delete.find(normalized_fpath) == files_to_delete.end()) {
files_to_delete.insert(normalized_fpath);
recovery_ctx->files_to_delete_.find(normalized_fpath) ==
recovery_ctx->files_to_delete_.end()) {
recovery_ctx->files_to_delete_.emplace(normalized_fpath);
}
}
}
@ -961,21 +961,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
assert(versions_->GetColumnFamilySet());
ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
assert(default_cfd);
s = versions_->LogAndApply(
default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
directories_.GetDbDir(), /*new_descriptor_log*/ false);
if (!s.ok()) {
return s;
}
mutex_.Unlock();
for (const auto& fname : files_to_delete) {
s = env_->DeleteFile(fname);
if (!s.ok()) {
break;
}
}
mutex_.Lock();
recovery_ctx->UpdateVersionEdits(default_cfd, edit);
return s;
}

View File

@ -399,7 +399,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
uint64_t* recovered_seq) {
uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
bool is_new_db = false;
@ -524,9 +524,9 @@ Status DBImpl::Recover(
return s;
}
}
s = SetDBId(read_only);
s = SetDBId(read_only, recovery_ctx);
if (s.ok() && !read_only) {
s = DeleteUnreferencedSstFiles();
s = DeleteUnreferencedSstFiles(recovery_ctx);
}
if (immutable_db_options_.paranoid_checks && s.ok()) {
@ -541,10 +541,6 @@ Status DBImpl::Recover(
}
}
}
// DB mutex is already held
if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
s = InitPersistStatsColumnFamily();
}
std::vector<std::string> files_in_wal_dir;
if (s.ok()) {
@ -614,7 +610,10 @@ Status DBImpl::Recover(
WalNumber max_wal_number =
versions_->GetWalSet().GetWals().rbegin()->first;
edit.DeleteWalsBefore(max_wal_number + 1);
s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
assert(recovery_ctx != nullptr);
assert(versions_->GetColumnFamilySet() != nullptr);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), edit);
}
if (!s.ok()) {
return s;
@ -650,8 +649,8 @@ Status DBImpl::Recover(
std::sort(wals.begin(), wals.end());
bool corrupted_wal_found = false;
s = RecoverLogFiles(wals, &next_sequence, read_only,
&corrupted_wal_found);
s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found,
recovery_ctx);
if (corrupted_wal_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
@ -830,10 +829,30 @@ Status DBImpl::InitPersistStatsColumnFamily() {
return s;
}
Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
mutex_.AssertHeld();
assert(versions_->descriptor_log_ == nullptr);
Status s = versions_->LogAndApply(
recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_,
recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir());
if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) {
mutex_.Unlock();
for (const auto& fname : recovery_ctx.files_to_delete_) {
s = env_->DeleteFile(fname);
if (!s.ok()) {
break;
}
}
mutex_.Lock();
}
return s;
}
// REQUIRES: wal_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
bool* corrupted_wal_found) {
bool* corrupted_wal_found,
RecoveryContext* recovery_ctx) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@ -1291,44 +1310,36 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_wal_number + 1);
assert(recovery_ctx != nullptr);
autovector<ColumnFamilyData*> cfds;
autovector<const MutableCFOptions*> cf_opts;
autovector<autovector<VersionEdit*>> edit_lists;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
cfds.push_back(cfd);
cf_opts.push_back(cfd->GetLatestMutableCFOptions());
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
edit_lists.push_back({&iter->second});
recovery_ctx->UpdateVersionEdits(cfd, iter->second);
}
std::unique_ptr<VersionEdit> wal_deletion;
if (flushed) {
wal_deletion = std::make_unique<VersionEdit>();
VersionEdit wal_deletion;
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
wal_deletion->DeleteWalsBefore(max_wal_number + 1);
wal_deletion.DeleteWalsBefore(max_wal_number + 1);
}
if (!allow_2pc()) {
// In non-2pc mode, flushing the memtables of the column families
// means we can advance min_log_number_to_keep.
wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1);
wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
}
edit_lists.back().push_back(wal_deletion.get());
assert(versions_->GetColumnFamilySet() != nullptr);
recovery_ctx->UpdateVersionEdits(
versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
}
// write MANIFEST with update
status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
directories_.GetDbDir(),
/*new_descriptor_log=*/true);
}
}
if (status.ok()) {
if (data_seen && !flushed) {
status = RestoreAliveLogFiles(wal_numbers);
} else {
// If there's no data in the WAL, or we flushed all the data, still
} else if (!wal_numbers.empty()) { // If there's no data in the WAL, or we
// flushed all the data, still
// truncate the log file. If the process goes into a crash loop before
// the file is deleted, the preallocated space will never get freed.
const bool truncate = !read_only;
@ -1724,6 +1735,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
*dbptr = nullptr;
assert(handles);
handles->clear();
size_t max_write_buffer_size = 0;
@ -1766,11 +1778,13 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
RecoveryContext recovery_ctx;
impl->mutex_.Lock();
// Handles create_if_missing, error_if_exists
uint64_t recovered_seq(kMaxSequenceNumber);
s = impl->Recover(column_families, false, false, false, &recovered_seq);
s = impl->Recover(column_families, false, false, false, &recovered_seq,
&recovery_ctx);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;
@ -1787,40 +1801,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
// set column family handles
for (auto cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (cfd != nullptr) {
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
impl->NewThreadStatusCfInfo(cfd);
} else {
if (db_options.create_missing_column_families) {
// missing column family, create it
ColumnFamilyHandle* handle;
impl->mutex_.Unlock();
s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
impl->mutex_.Lock();
if (s.ok()) {
handles->push_back(handle);
} else {
break;
}
} else {
s = Status::InvalidArgument("Column family not found", cf.name);
break;
}
}
}
}
if (s.ok()) {
SuperVersionContext sv_context(/* create_superversion */ true);
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
impl->InstallSuperVersionAndScheduleWork(
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
}
sv_context.Clean();
if (impl->two_write_queues_) {
impl->log_write_mutex_.Lock();
}
@ -1860,6 +1840,53 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}
}
if (s.ok()) {
s = impl->LogAndApplyForRecovery(recovery_ctx);
}
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
impl->mutex_.AssertHeld();
s = impl->InitPersistStatsColumnFamily();
}
if (s.ok()) {
// set column family handles
for (auto cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (cfd != nullptr) {
handles->push_back(
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
impl->NewThreadStatusCfInfo(cfd);
} else {
if (db_options.create_missing_column_families) {
// missing column family, create it
ColumnFamilyHandle* handle = nullptr;
impl->mutex_.Unlock();
s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
impl->mutex_.Lock();
if (s.ok()) {
handles->push_back(handle);
} else {
break;
}
} else {
s = Status::InvalidArgument("Column family not found", cf.name);
break;
}
}
}
}
if (s.ok()) {
SuperVersionContext sv_context(/* create_superversion */ true);
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
impl->InstallSuperVersionAndScheduleWork(
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
}
sv_context.Clean();
}
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
// try to read format version
s = impl->PersistentStatsProcessFormatVersion();

View File

@ -33,7 +33,8 @@ DBImplSecondary::~DBImplSecondary() {}
Status DBImplSecondary::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_wal_file_exists*/,
bool /*error_if_data_exists_in_wals*/, uint64_t*) {
bool /*error_if_data_exists_in_wals*/, uint64_t*,
RecoveryContext* /*recovery_ctx*/) {
mutex_.AssertHeld();
JobContext job_context(0);

View File

@ -81,8 +81,8 @@ class DBImplSecondary : public DBImpl {
// and log_readers_ to facilitate future operations.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, bool error_if_wal_file_exists,
bool error_if_data_exists_in_wals,
uint64_t* = nullptr) override;
bool error_if_data_exists_in_wals, uint64_t* = nullptr,
RecoveryContext* recovery_ctx = nullptr) override;
// Implementations of the DB interface
using DB::Get;

View File

@ -1428,7 +1428,7 @@ public class RocksDBTest {
assertThat(livefiles.manifestFileSize).isEqualTo(59);
assertThat(livefiles.files.size()).isEqualTo(3);
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000005");
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007");
}
}

View File

@ -604,10 +604,14 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) {
dbfull()->TEST_WaitForStatsDumpRun(
[&] { mock_clock_->MockSleepForSeconds(kPeriodSec); });
// writing to all three cf, flush default cf
// LogNumbers: default: 14, stats: 4, pikachu: 4
// LogNumbers: default: 16, stats: 10, pikachu: 5
// Since in recovery process, cfd_stats column is created after WAL is
// created, synced and MANIFEST is persisted, its log number which depends on
// logfile_number_ will be different. Since "pikachu" is never flushed, thus
// its log_number should be the smallest of the three.
ASSERT_OK(Flush());
ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber());
ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber());
ASSERT_LT(cfd_test->GetLogNumber(), cfd_stats->GetLogNumber());
ASSERT_LT(cfd_test->GetLogNumber(), cfd_default->GetLogNumber());
ASSERT_OK(Put("foo1", "v1"));
ASSERT_OK(Put("bar1", "v1"));