diff --git a/HISTORY.md b/HISTORY.md index 07d290f3c..a988e32f9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Unreleased ### Bug Fixes +* Fixed a bug where RocksDB could corrupt DBs with `avoid_flush_during_recovery == true` by removing valid WALs, leading to `Status::Corruption` with message like "SST file is ahead of WALs" when attempting to reopen. * RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue. ## 7.2.0 (04/15/2022) diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 7cd5ad7e2..c480ce51a 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -337,6 +337,68 @@ TEST_F(CorruptionTest, Recovery) { Check(36, 36); } +TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) { + // Repro for bug where WALs following the point-in-time recovery were not + // retained leading to the next recovery failing. + CloseDb(); + + options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + + const std::string test_cf_name = "test_cf"; + std::vector cf_descs; + cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); + cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions()); + + uint64_t log_num; + { + options_.create_missing_column_families = true; + std::vector cfhs; + ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_)); + + ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v")); + ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v")); + ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2")); + std::vector file_nums; + GetSortedWalFiles(file_nums); + log_num = file_nums.back(); + for (auto* cfh : cfhs) { + delete cfh; + } + CloseDb(); + } + + CorruptFileWithTruncation(FileType::kWalFile, log_num, + /*bytes_to_truncate=*/1); + + { + // Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation. + options_.avoid_flush_during_recovery = true; + std::vector cfhs; + ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_)); + // Flush one but not both CFs and write some data so there's a seqno gap + // between the PITR corruption and the next DB session's first WAL. + ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2")); + ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1])); + + for (auto* cfh : cfhs) { + delete cfh; + } + CloseDb(); + } + + // With the bug, this DB open would remove the WALs following the PITR + // corruption. Then, the next recovery would fail. + for (int i = 0; i < 2; ++i) { + std::vector cfhs; + ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_)); + + for (auto* cfh : cfhs) { + delete cfh; + } + CloseDb(); + } +} + TEST_F(CorruptionTest, RecoverWriteError) { env_->writable_file_error_ = true; Status s = TryReopen(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 77987ab41..97e3d1b8a 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1240,43 +1240,6 @@ class DBImpl : public DB { std::atomic shutting_down_; - // RecoveryContext struct stores the context about version edits along - // with corresponding column_family_data and column_family_options. - class RecoveryContext { - public: - ~RecoveryContext() { - for (auto& edit_list : edit_lists_) { - for (auto* edit : edit_list) { - delete edit; - } - edit_list.clear(); - } - cfds_.clear(); - mutable_cf_opts_.clear(); - edit_lists_.clear(); - files_to_delete_.clear(); - } - - void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) { - if (map_.find(cfd->GetID()) == map_.end()) { - uint32_t size = static_cast(map_.size()); - map_.emplace(cfd->GetID(), size); - cfds_.emplace_back(cfd); - mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions()); - edit_lists_.emplace_back(autovector()); - } - uint32_t i = map_[cfd->GetID()]; - edit_lists_[i].emplace_back(new VersionEdit(edit)); - } - - std::unordered_map map_; // cf_id to index; - autovector cfds_; - autovector mutable_cf_opts_; - autovector> edit_lists_; - // files_to_delete_ contains sst files - std::set files_to_delete_; - }; - // Except in DB::Open(), WriteOptionsFile can only be called when: // Persist options to options file. // If need_mutex_lock = false, the method will lock DB mutex. @@ -1393,19 +1356,16 @@ class DBImpl : public DB { // be made to the descriptor are added to *edit. // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is // skipped. - // recovery_ctx stores the context about version edits and all those - // edits are persisted to new Manifest after successfully syncing the new WAL. virtual Status Recover( const std::vector& column_families, bool read_only = false, bool error_if_wal_file_exists = false, bool error_if_data_exists_in_wals = false, - uint64_t* recovered_seq = nullptr, - RecoveryContext* recovery_ctx = nullptr); + uint64_t* recovered_seq = nullptr); virtual bool OwnTablesAndLogs() const { return true; } // Set DB identity file, and write DB ID to manifest if necessary. - Status SetDBId(bool read_only, RecoveryContext* recovery_ctx); + Status SetDBId(bool read_only); // REQUIRES: db mutex held when calling this function, but the db mutex can // be released and re-acquired. Db mutex will be held when the function @@ -1414,15 +1374,12 @@ class DBImpl : public DB { // not referenced in the MANIFEST (e.g. // 1. It's best effort recovery; // 2. The VersionEdits referencing the SST files are appended to - // RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are + // MANIFEST, DB crashes when syncing the MANIFEST, the VersionEdits are // still not synced to MANIFEST during recovery.) - // It stores the SST files to be deleted in RecoveryContext. In the + // We delete these SST files. In the // meantime, we find out the largest file number present in the paths, and // bump up the version set's next_file_number_ to be 1 + largest_file_number. - // recovery_ctx stores the context about version edits and files to be - // deleted. All those edits are persisted to new Manifest after successfully - // syncing the new WAL. - Status DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx); + Status DeleteUnreferencedSstFiles(); // SetDbSessionId() should be called in the constuctor DBImpl() // to ensure that db_session_id_ gets updated every time the DB is opened @@ -1432,11 +1389,6 @@ class DBImpl : public DB { Status FailIfTsSizesMismatch(const ColumnFamilyHandle* column_family, const Slice& ts) const; - // recovery_ctx stores the context about version edits and - // LogAndApplyForRecovery persist all those edits to new Manifest after - // successfully syncing new WAL. - Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx); - private: friend class DB; friend class ErrorHandler; @@ -1691,10 +1643,9 @@ class DBImpl : public DB { // REQUIRES: log_numbers are sorted in ascending order // corrupted_log_found is set to true if we recover from a corrupted log file. - Status RecoverLogFiles(std::vector& log_numbers, + Status RecoverLogFiles(const std::vector& log_numbers, SequenceNumber* next_sequence, bool read_only, - bool* corrupted_log_found, - RecoveryContext* recovery_ctx); + bool* corrupted_log_found); // The following two methods are used to flush a memtable to // storage. The first one is used at database RecoveryTime (when the @@ -1704,12 +1655,6 @@ class DBImpl : public DB { Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); - // Move all the WAL files starting from corrupted WAL found to - // max_wal_number to avoid column family inconsistency error on recovery. It - // also removes the deleted file from the vector wal_numbers. - void MoveCorruptedWalFiles(std::vector& wal_numbers, - uint64_t corrupted_wal_number); - // Get the size of a log file and, if truncate is true, truncate the // log file to its actual size, thereby freeing preallocated space. // Return success even if truncate fails diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 0d3a3bea7..1790ed836 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -863,7 +863,7 @@ uint64_t PrecomputeMinLogNumberToKeep2PC( return min_log_number_to_keep; } -Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) { +Status DBImpl::SetDBId(bool read_only) { Status s; // Happens when immutable_db_options_.write_dbid_to_manifest is set to true // the very first time. @@ -890,14 +890,14 @@ Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) { } s = GetDbIdentityFromIdentityFile(&db_id_); if (immutable_db_options_.write_dbid_to_manifest && s.ok()) { - assert(!read_only); - assert(recovery_ctx != nullptr); - assert(versions_->GetColumnFamilySet() != nullptr); VersionEdit edit; edit.SetDBId(db_id_); + Options options; + MutableCFOptions mutable_cf_options(options); versions_->db_id_ = db_id_; - recovery_ctx->UpdateVersionEdits( - versions_->GetColumnFamilySet()->GetDefault(), edit); + s = versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(), + mutable_cf_options, &edit, &mutex_, nullptr, + /* new_descriptor_log */ false); } } else if (!read_only) { s = SetIdentityFile(env_, dbname_, db_id_); @@ -905,7 +905,7 @@ Status DBImpl::SetDBId(bool read_only, RecoveryContext* recovery_ctx) { return s; } -Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) { +Status DBImpl::DeleteUnreferencedSstFiles() { mutex_.AssertHeld(); std::vector paths; paths.push_back(NormalizePath(dbname_ + std::string(1, kFilePathSeparator))); @@ -925,6 +925,7 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) { uint64_t next_file_number = versions_->current_next_file_number(); uint64_t largest_file_number = next_file_number; + std::set files_to_delete; Status s; for (const auto& path : paths) { std::vector files; @@ -942,9 +943,8 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) { const std::string normalized_fpath = path + fname; largest_file_number = std::max(largest_file_number, number); if (type == kTableFile && number >= next_file_number && - recovery_ctx->files_to_delete_.find(normalized_fpath) == - recovery_ctx->files_to_delete_.end()) { - recovery_ctx->files_to_delete_.insert(normalized_fpath); + files_to_delete.find(normalized_fpath) == files_to_delete.end()) { + files_to_delete.insert(normalized_fpath); } } } @@ -961,7 +961,21 @@ Status DBImpl::DeleteUnreferencedSstFiles(RecoveryContext* recovery_ctx) { assert(versions_->GetColumnFamilySet()); ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault(); assert(default_cfd); - recovery_ctx->UpdateVersionEdits(default_cfd, edit); + s = versions_->LogAndApply( + default_cfd, *default_cfd->GetLatestMutableCFOptions(), &edit, &mutex_, + directories_.GetDbDir(), /*new_descriptor_log*/ false); + if (!s.ok()) { + return s; + } + + mutex_.Unlock(); + for (const auto& fname : files_to_delete) { + s = env_->DeleteFile(fname); + if (!s.ok()) { + break; + } + } + mutex_.Lock(); return s; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 37b567baf..0377a7a5c 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -399,7 +399,7 @@ IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname, Status DBImpl::Recover( const std::vector& column_families, bool read_only, bool error_if_wal_file_exists, bool error_if_data_exists_in_wals, - uint64_t* recovered_seq, RecoveryContext* recovery_ctx) { + uint64_t* recovered_seq) { mutex_.AssertHeld(); bool is_new_db = false; @@ -518,10 +518,9 @@ Status DBImpl::Recover( if (!s.ok()) { return s; } - - s = SetDBId(read_only, recovery_ctx); + s = SetDBId(read_only); if (s.ok() && !read_only) { - s = DeleteUnreferencedSstFiles(recovery_ctx); + s = DeleteUnreferencedSstFiles(); } if (immutable_db_options_.paranoid_checks && s.ok()) { @@ -536,6 +535,10 @@ Status DBImpl::Recover( } } } + // DB mutex is already held + if (s.ok() && immutable_db_options_.persist_stats_to_disk) { + s = InitPersistStatsColumnFamily(); + } std::vector files_in_wal_dir; if (s.ok()) { @@ -605,10 +608,7 @@ Status DBImpl::Recover( WalNumber max_wal_number = versions_->GetWalSet().GetWals().rbegin()->first; edit.DeleteWalsBefore(max_wal_number + 1); - assert(recovery_ctx != nullptr); - assert(versions_->GetColumnFamilySet() != nullptr); - recovery_ctx->UpdateVersionEdits( - versions_->GetColumnFamilySet()->GetDefault(), edit); + s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_); } if (!s.ok()) { return s; @@ -644,8 +644,8 @@ Status DBImpl::Recover( std::sort(wals.begin(), wals.end()); bool corrupted_wal_found = false; - s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found, - recovery_ctx); + s = RecoverLogFiles(wals, &next_sequence, read_only, + &corrupted_wal_found); if (corrupted_wal_found && recovered_seq != nullptr) { *recovered_seq = next_sequence; } @@ -805,30 +805,10 @@ Status DBImpl::InitPersistStatsColumnFamily() { return s; } -Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) { - mutex_.AssertHeld(); - assert(versions_->descriptor_log_ == nullptr); - Status s = versions_->LogAndApply( - recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_, - recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir()); - if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) { - mutex_.Unlock(); - for (const auto& fname : recovery_ctx.files_to_delete_) { - s = env_->DeleteFile(fname); - if (!s.ok()) { - break; - } - } - mutex_.Lock(); - } - return s; -} - // REQUIRES: wal_numbers are sorted in ascending order -Status DBImpl::RecoverLogFiles(std::vector& wal_numbers, +Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, SequenceNumber* next_sequence, bool read_only, - bool* corrupted_wal_found, - RecoveryContext* recovery_ctx) { + bool* corrupted_wal_found) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -853,7 +833,6 @@ Status DBImpl::RecoverLogFiles(std::vector& wal_numbers, edit.SetColumnFamily(cfd->GetID()); version_edits.insert({cfd->GetID(), edit}); } - int job_id = next_job_id_.fetch_add(1); { auto stream = event_logger_.Log(); @@ -1277,7 +1256,6 @@ Status DBImpl::RecoverLogFiles(std::vector& wal_numbers, edit->SetLogNumber(max_wal_number + 1); } } - if (status.ok()) { // we must mark the next log number as used, even though it's // not actually used. that is because VersionSet assumes @@ -1285,40 +1263,42 @@ Status DBImpl::RecoverLogFiles(std::vector& wal_numbers, // log number versions_->MarkFileNumberUsed(max_wal_number + 1); - if (corrupted_wal_found != nullptr && *corrupted_wal_found == true && - immutable_db_options_.wal_recovery_mode == - WALRecoveryMode::kPointInTimeRecovery) { - MoveCorruptedWalFiles(wal_numbers, corrupted_wal_number); - } - - assert(recovery_ctx != nullptr); + autovector cfds; + autovector cf_opts; + autovector> edit_lists; for (auto* cfd : *versions_->GetColumnFamilySet()) { + cfds.push_back(cfd); + cf_opts.push_back(cfd->GetLatestMutableCFOptions()); auto iter = version_edits.find(cfd->GetID()); assert(iter != version_edits.end()); - recovery_ctx->UpdateVersionEdits(cfd, iter->second); + edit_lists.push_back({&iter->second}); } + std::unique_ptr wal_deletion; if (flushed) { - VersionEdit wal_deletion; + wal_deletion = std::make_unique(); if (immutable_db_options_.track_and_verify_wals_in_manifest) { - wal_deletion.DeleteWalsBefore(max_wal_number + 1); + wal_deletion->DeleteWalsBefore(max_wal_number + 1); } if (!allow_2pc()) { // In non-2pc mode, flushing the memtables of the column families // means we can advance min_log_number_to_keep. - wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1); + wal_deletion->SetMinLogNumberToKeep(max_wal_number + 1); } - assert(versions_->GetColumnFamilySet() != nullptr); - recovery_ctx->UpdateVersionEdits( - versions_->GetColumnFamilySet()->GetDefault(), wal_deletion); + edit_lists.back().push_back(wal_deletion.get()); } + + // write MANIFEST with update + status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_, + directories_.GetDbDir(), + /*new_descriptor_log=*/true); } } if (status.ok()) { if (data_seen && !flushed) { status = RestoreAliveLogFiles(wal_numbers); - } else if (!wal_numbers.empty()) { + } else { // If there's no data in the WAL, or we flushed all the data, still // truncate the log file. If the process goes into a crash loop before // the file is deleted, the preallocated space will never get freed. @@ -1334,48 +1314,6 @@ Status DBImpl::RecoverLogFiles(std::vector& wal_numbers, return status; } -void DBImpl::MoveCorruptedWalFiles(std::vector& wal_numbers, - uint64_t corrupted_wal_number) { - size_t num_wals = wal_numbers.size(); - // Find the first corrupted wal. - auto iter = std::lower_bound(wal_numbers.begin(), wal_numbers.end(), - corrupted_wal_number); - auto corrupt_start_iter = iter; - - // Increment iter to move WAL files from first corrupted_wal_number + 1. - iter++; - - std::string archival_path = - ArchivalDirectory(immutable_db_options_.GetWalDir()); - Status create_status = env_->CreateDirIfMissing(archival_path); - - // create_status is only checked when it needs to move the corrupted WAL files - // to archive folder. - create_status.PermitUncheckedError(); - - // Truncate the last WAL to reclaim the pre allocated space before - // moving it. - GetLogSizeAndMaybeTruncate(wal_numbers.back(), /*truncate=*/true, nullptr) - .PermitUncheckedError(); - - // Move all the WAL files from corrupted_wal_number + 1 to last WAL - // (max_wal_number) to avoid column family inconsistency error to archival - // directory. If its unable to create archive dir, it will delete the - // corrupted WAL files. - // We are moving all but first corrupted WAL file to a different folder. - while (iter != wal_numbers.end()) { - LogFileNumberSize log(*iter); - std::string fname = LogFileName(immutable_db_options_.GetWalDir(), *iter); -#ifndef ROCKSDB_LITE - if (create_status.ok()) { - wal_manager_.ArchiveWALFile(fname, *iter); - } -#endif - iter++; - } - wal_numbers.erase(corrupt_start_iter + 1, wal_numbers.begin() + num_wals); -} - Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate, LogFileNumberSize* log_ptr) { LogFileNumberSize log(wal_number); @@ -1438,8 +1376,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector& wal_numbers) { // log has such preallocated space, so we only truncate for the last log. LogFileNumberSize log; s = GetLogSizeAndMaybeTruncate( - wal_number, - /*truncate=*/(wal_number == wal_numbers.back()), &log); + wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log); if (!s.ok()) { break; } @@ -1800,13 +1737,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); impl->mutex_.Lock(); - - RecoveryContext recovery_ctx; - // Handles create_if_missing, error_if_exists uint64_t recovered_seq(kMaxSequenceNumber); - s = impl->Recover(column_families, false, false, false, &recovered_seq, - &recovery_ctx); + s = impl->Recover(column_families, false, false, false, &recovered_seq); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); log::Writer* new_log = nullptr; @@ -1823,6 +1756,40 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { + // set column family handles + for (auto cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (cfd != nullptr) { + handles->push_back( + new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + impl->NewThreadStatusCfInfo(cfd); + } else { + if (db_options.create_missing_column_families) { + // missing column family, create it + ColumnFamilyHandle* handle; + impl->mutex_.Unlock(); + s = impl->CreateColumnFamily(cf.options, cf.name, &handle); + impl->mutex_.Lock(); + if (s.ok()) { + handles->push_back(handle); + } else { + break; + } + } else { + s = Status::InvalidArgument("Column family not found", cf.name); + break; + } + } + } + } + if (s.ok()) { + SuperVersionContext sv_context(/* create_superversion */ true); + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + impl->InstallSuperVersionAndScheduleWork( + cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); + } + sv_context.Clean(); if (impl->two_write_queues_) { impl->log_write_mutex_.Lock(); } @@ -1835,15 +1802,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { // In WritePrepared there could be gap in sequence numbers. This breaks - // the trick we use in kPointInTimeRecovery which assumes the first seq - // in the log right after the corrupted log is one larger than the last - // seq we read from the wals. To let this trick keep working, we add a - // dummy entry with the expected sequence to the first log right after - // recovery. In non-WritePrepared case also the new log after recovery - // could be empty, and thus missing the consecutive seq hint to - // distinguish middle-log corruption to - // corrupted-log-remained-after-recovery. This case also will be - // addressed by a dummy write. + // the trick we use in kPointInTimeRecovery which assumes the first seq in + // the log right after the corrupted log is one larger than the last seq + // we read from the wals. To let this trick keep working, we add a dummy + // entry with the expected sequence to the first log right after recovery. + // In non-WritePrepared case also the new log after recovery could be + // empty, and thus missing the consecutive seq hint to distinguish + // middle-log corruption to corrupted-log-remained-after-recovery. This + // case also will be addressed by a dummy write. if (recovered_seq != kMaxSequenceNumber) { WriteBatch empty_batch; WriteBatchInternal::SetSequence(&empty_batch, recovered_seq); @@ -1862,52 +1828,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } } } - if (s.ok()) { - s = impl->LogAndApplyForRecovery(recovery_ctx); - } - - if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { - impl->mutex_.AssertHeld(); - s = impl->InitPersistStatsColumnFamily(); - } - - if (s.ok()) { - // set column family handles - for (auto cf : column_families) { - auto cfd = - impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); - if (cfd != nullptr) { - handles->push_back( - new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); - impl->NewThreadStatusCfInfo(cfd); - } else { - if (db_options.create_missing_column_families) { - // missing column family, create it - ColumnFamilyHandle* handle; - impl->mutex_.Unlock(); - s = impl->CreateColumnFamily(cf.options, cf.name, &handle); - impl->mutex_.Lock(); - if (s.ok()) { - handles->push_back(handle); - } else { - break; - } - } else { - s = Status::InvalidArgument("Column family not found", cf.name); - break; - } - } - } - } - - if (s.ok()) { - SuperVersionContext sv_context(/* create_superversion */ true); - for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - impl->InstallSuperVersionAndScheduleWork( - cfd, &sv_context, *cfd->GetLatestMutableCFOptions()); - } - sv_context.Clean(); - } if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) { // try to read format version s = impl->PersistentStatsProcessFormatVersion(); @@ -1933,8 +1853,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (cfd->ioptions()->merge_operator != nullptr && !cfd->mem()->IsMergeOperatorSupported()) { s = Status::InvalidArgument( - "The memtable of column family %s does not support merge " - "operator " + "The memtable of column family %s does not support merge operator " "its options.merge_operator is non-null", cfd->GetName().c_str()); } diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index d1a8709da..1e3c9f2ac 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -33,8 +33,7 @@ DBImplSecondary::~DBImplSecondary() {} Status DBImplSecondary::Recover( const std::vector& column_families, bool /*readonly*/, bool /*error_if_wal_file_exists*/, - bool /*error_if_data_exists_in_wals*/, uint64_t*, - RecoveryContext* /*recovery_ctx*/) { + bool /*error_if_data_exists_in_wals*/, uint64_t*) { mutex_.AssertHeld(); JobContext job_context(0); diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index fcc86cc87..d3a7940b5 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -81,8 +81,8 @@ class DBImplSecondary : public DBImpl { // and log_readers_ to facilitate future operations. Status Recover(const std::vector& column_families, bool read_only, bool error_if_wal_file_exists, - bool error_if_data_exists_in_wals, uint64_t* = nullptr, - RecoveryContext* recovery_ctx = nullptr) override; + bool error_if_data_exists_in_wals, + uint64_t* = nullptr) override; // Implementations of the DB interface using DB::Get; diff --git a/java/src/test/java/org/rocksdb/RocksDBTest.java b/java/src/test/java/org/rocksdb/RocksDBTest.java index 422bed40c..4193bcb44 100644 --- a/java/src/test/java/org/rocksdb/RocksDBTest.java +++ b/java/src/test/java/org/rocksdb/RocksDBTest.java @@ -1428,7 +1428,7 @@ public class RocksDBTest { assertThat(livefiles.manifestFileSize).isEqualTo(59); assertThat(livefiles.files.size()).isEqualTo(3); assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT"); - assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000005"); + assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004"); assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007"); } } diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index 1fe5503cb..59e7be3d9 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -604,14 +604,10 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { dbfull()->TEST_WaitForStatsDumpRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flush default cf - // LogNumbers: default: 16, stats: 10, pikachu: 5 - // Since in recovery process, cfd_stats column is created after WAL is - // created, synced and MANIFEST is persisted, its log number which depends on - // logfile_number_ will be different. Since "pikachu" is never flushed, thus - // its log_number should be the smallest of the three. + // LogNumbers: default: 14, stats: 4, pikachu: 4 ASSERT_OK(Flush()); - ASSERT_LT(cfd_test->GetLogNumber(), cfd_stats->GetLogNumber()); - ASSERT_LT(cfd_test->GetLogNumber(), cfd_default->GetLogNumber()); + ASSERT_EQ(cfd_stats->GetLogNumber(), cfd_test->GetLogNumber()); + ASSERT_LT(cfd_stats->GetLogNumber(), cfd_default->GetLogNumber()); ASSERT_OK(Put("foo1", "v1")); ASSERT_OK(Put("bar1", "v1"));