Cleanup stale manifests outside of full purge
Summary: - Keep track of obsolete manifests in VersionSet - Updated FindObsoleteFiles() to put obsolete manifests in the JobContext for later use by PurgeObsoleteFiles() - Added test case that verifies a stale manifest is deleted by a non-full purge Test Plan: $ ./backupable_db_test --gtest_filter=BackupableDBTest.ChangeManifestDuringBackupCreation Reviewers: IslamAbdelRahman, yoshinorim, sdong Reviewed By: sdong Subscribers: andrewkr, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D55269
This commit is contained in:
parent
f71fc77b7c
commit
d9620239d2
@ -572,6 +572,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
|||||||
// Get obsolete files. This function will also update the list of
|
// Get obsolete files. This function will also update the list of
|
||||||
// pending files in VersionSet().
|
// pending files in VersionSet().
|
||||||
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
|
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
|
||||||
|
&job_context->manifest_delete_files,
|
||||||
job_context->min_pending_output);
|
job_context->min_pending_output);
|
||||||
|
|
||||||
// store the current filenum, lognum, etc
|
// store the current filenum, lognum, etc
|
||||||
@ -689,9 +690,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto candidate_files = state.full_scan_candidate_files;
|
auto candidate_files = state.full_scan_candidate_files;
|
||||||
candidate_files.reserve(candidate_files.size() +
|
candidate_files.reserve(
|
||||||
state.sst_delete_files.size() +
|
candidate_files.size() + state.sst_delete_files.size() +
|
||||||
state.log_delete_files.size());
|
state.log_delete_files.size() + state.manifest_delete_files.size());
|
||||||
// We may ignore the dbname when generating the file names.
|
// We may ignore the dbname when generating the file names.
|
||||||
const char* kDumbDbName = "";
|
const char* kDumbDbName = "";
|
||||||
for (auto file : state.sst_delete_files) {
|
for (auto file : state.sst_delete_files) {
|
||||||
@ -707,6 +708,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
|
|||||||
0);
|
0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for (const auto& filename : state.manifest_delete_files) {
|
||||||
|
candidate_files.emplace_back(filename, 0);
|
||||||
|
}
|
||||||
|
|
||||||
// dedup state.candidate_files so we don't try to delete the same
|
// dedup state.candidate_files so we don't try to delete the same
|
||||||
// file twice
|
// file twice
|
||||||
|
@ -22,9 +22,9 @@ class MemTable;
|
|||||||
struct JobContext {
|
struct JobContext {
|
||||||
inline bool HaveSomethingToDelete() const {
|
inline bool HaveSomethingToDelete() const {
|
||||||
return full_scan_candidate_files.size() || sst_delete_files.size() ||
|
return full_scan_candidate_files.size() || sst_delete_files.size() ||
|
||||||
log_delete_files.size() || new_superversion != nullptr ||
|
log_delete_files.size() || manifest_delete_files.size() ||
|
||||||
superversions_to_free.size() > 0 || memtables_to_free.size() > 0 ||
|
new_superversion != nullptr || superversions_to_free.size() > 0 ||
|
||||||
logs_to_free.size() > 0;
|
memtables_to_free.size() > 0 || logs_to_free.size() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Structure to store information for candidate files to delete.
|
// Structure to store information for candidate files to delete.
|
||||||
@ -56,6 +56,9 @@ struct JobContext {
|
|||||||
// a list of log files that we need to delete
|
// a list of log files that we need to delete
|
||||||
std::vector<uint64_t> log_delete_files;
|
std::vector<uint64_t> log_delete_files;
|
||||||
|
|
||||||
|
// a list of manifest files that we need to delete
|
||||||
|
std::vector<std::string> manifest_delete_files;
|
||||||
|
|
||||||
// a list of memtables to be free
|
// a list of memtables to be free
|
||||||
autovector<MemTable*> memtables_to_free;
|
autovector<MemTable*> memtables_to_free;
|
||||||
|
|
||||||
|
@ -2254,6 +2254,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
|
|||||||
db_options_->disableDataSync ? nullptr : db_directory);
|
db_options_->disableDataSync ? nullptr : db_directory);
|
||||||
// Leave the old file behind since PurgeObsoleteFiles will take care of it
|
// Leave the old file behind since PurgeObsoleteFiles will take care of it
|
||||||
// later. It's unsafe to delete now since file deletion may be disabled.
|
// later. It's unsafe to delete now since file deletion may be disabled.
|
||||||
|
obsolete_manifests_.emplace_back(
|
||||||
|
DescriptorFileName("", manifest_file_number_));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
@ -3388,7 +3390,10 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
||||||
|
std::vector<std::string>* manifest_filenames,
|
||||||
uint64_t min_pending_output) {
|
uint64_t min_pending_output) {
|
||||||
|
assert(manifest_filenames->empty());
|
||||||
|
obsolete_manifests_.swap(*manifest_filenames);
|
||||||
std::vector<FileMetaData*> pending_files;
|
std::vector<FileMetaData*> pending_files;
|
||||||
for (auto f : obsolete_files_) {
|
for (auto f : obsolete_files_) {
|
||||||
if (f->fd.GetNumber() < min_pending_output) {
|
if (f->fd.GetNumber() < min_pending_output) {
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <set>
|
#include <set>
|
||||||
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@ -697,6 +698,7 @@ class VersionSet {
|
|||||||
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
|
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
|
||||||
|
|
||||||
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
||||||
|
std::vector<std::string>* manifest_filenames,
|
||||||
uint64_t min_pending_output);
|
uint64_t min_pending_output);
|
||||||
|
|
||||||
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
|
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
|
||||||
@ -758,6 +760,7 @@ class VersionSet {
|
|||||||
uint64_t manifest_file_size_;
|
uint64_t manifest_file_size_;
|
||||||
|
|
||||||
std::vector<FileMetaData*> obsolete_files_;
|
std::vector<FileMetaData*> obsolete_files_;
|
||||||
|
std::vector<std::string> obsolete_manifests_;
|
||||||
|
|
||||||
// env options for all reads and writes except compactions
|
// env options for all reads and writes except compactions
|
||||||
const EnvOptions& env_options_;
|
const EnvOptions& env_options_;
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
|
#include "db/db_impl.h"
|
||||||
#include "db/filename.h"
|
#include "db/filename.h"
|
||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
#include "port/stack_trace.h"
|
#include "port/stack_trace.h"
|
||||||
@ -1318,10 +1319,22 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) {
|
|||||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
|
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
|
||||||
|
|
||||||
flush_thread.join();
|
flush_thread.join();
|
||||||
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
|
||||||
|
// The last manifest roll would've already been cleaned up by the full scan
|
||||||
|
// that happens when CreateNewBackup invokes EnableFileDeletions. We need to
|
||||||
|
// trigger another roll to verify non-full scan purges stale manifests.
|
||||||
|
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_.get());
|
||||||
|
std::string prev_manifest_path =
|
||||||
|
DescriptorFileName(dbname_, db_impl->TEST_Current_Manifest_FileNo());
|
||||||
|
FillDB(db_.get(), 0, 100);
|
||||||
|
ASSERT_OK(env_->FileExists(prev_manifest_path));
|
||||||
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||||
|
ASSERT_TRUE(env_->FileExists(prev_manifest_path).IsNotFound());
|
||||||
|
|
||||||
CloseDBAndBackupEngine();
|
CloseDBAndBackupEngine();
|
||||||
DestroyDB(dbname_, Options());
|
DestroyDB(dbname_, Options());
|
||||||
AssertBackupConsistency(0, 0, 100);
|
AssertBackupConsistency(0, 0, 100);
|
||||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// see https://github.com/facebook/rocksdb/issues/921
|
// see https://github.com/facebook/rocksdb/issues/921
|
||||||
|
Loading…
x
Reference in New Issue
Block a user