Let DBSecondary close files after catch up (#6114)
Summary: After secondary instance replays the logs from primary, certain files become obsolete. The secondary should find these files, evict their table readers from table cache and close them. If this is not done, the secondary will hold on to these files and prevent their space from being freed. Test plan (devserver): ``` $./db_secondary_test --gtest_filter=DBSecondaryTest.SecondaryCloseFiles $make check $./db_stress -ops_per_thread=100000 -enable_secondary=true -threads=32 -secondary_catch_up_one_in=10000 -clear_column_family_one_in=1000 -reopen=100 ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6114 Differential Revision: D18769998 Pulled By: riversand963 fbshipit-source-id: 5d1f151567247196164e1b79d8402fa2045b9120
This commit is contained in:
parent
96da9d7224
commit
98c414772c
@ -1109,6 +1109,8 @@ class DBImpl : public DB {
|
||||
bool read_only = false, bool error_if_log_file_exist = false,
|
||||
bool error_if_data_exists_in_logs = false);
|
||||
|
||||
virtual bool OwnTablesAndLogs() const { return true; }
|
||||
|
||||
private:
|
||||
friend class DB;
|
||||
friend class ErrorHandler;
|
||||
|
@ -385,6 +385,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
|
||||
w->Close();
|
||||
}
|
||||
|
||||
bool own_files = OwnTablesAndLogs();
|
||||
std::unordered_set<uint64_t> files_to_del;
|
||||
for (const auto& candidate_file : candidate_files) {
|
||||
const std::string& to_delete = candidate_file.file_name;
|
||||
@ -484,6 +485,12 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
// If I do not own these files, e.g. secondary instance with max_open_files
|
||||
// = -1, then no need to delete or schedule delete these files since they
|
||||
// will be removed by their owner, e.g. the primary instance.
|
||||
if (!own_files) {
|
||||
continue;
|
||||
}
|
||||
Status file_deletion_status;
|
||||
if (schedule_only) {
|
||||
InstrumentedMutexLock guard_lock(&mutex_);
|
||||
@ -495,7 +502,6 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
|
||||
|
||||
{
|
||||
// After purging obsolete files, remove them from files_grabbed_for_purge_.
|
||||
// Use a temporary vector to perform bulk deletion via swap.
|
||||
InstrumentedMutexLock guard_lock(&mutex_);
|
||||
autovector<uint64_t> to_be_removed;
|
||||
for (auto fn : files_grabbed_for_purge_) {
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "db/merge_context.h"
|
||||
#include "logging/auto_roll_logger.h"
|
||||
#include "monitoring/perf_context_imp.h"
|
||||
#include "util/cast_util.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -497,45 +498,61 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
|
||||
// read the manifest and apply new changes to the secondary instance
|
||||
std::unordered_set<ColumnFamilyData*> cfds_changed;
|
||||
JobContext job_context(0, true /*create_superversion*/);
|
||||
InstrumentedMutexLock lock_guard(&mutex_);
|
||||
s = static_cast<ReactiveVersionSet*>(versions_.get())
|
||||
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
|
||||
{
|
||||
InstrumentedMutexLock lock_guard(&mutex_);
|
||||
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
|
||||
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
|
||||
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
|
||||
static_cast<uint64_t>(versions_->LastSequence()));
|
||||
for (ColumnFamilyData* cfd : cfds_changed) {
|
||||
if (cfd->IsDropped()) {
|
||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
|
||||
cfd->GetName().c_str());
|
||||
continue;
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
|
||||
static_cast<uint64_t>(versions_->LastSequence()));
|
||||
for (ColumnFamilyData* cfd : cfds_changed) {
|
||||
if (cfd->IsDropped()) {
|
||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
|
||||
cfd->GetName().c_str());
|
||||
continue;
|
||||
}
|
||||
VersionStorageInfo::LevelSummaryStorage tmp;
|
||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
||||
"[%s] Level summary: %s\n", cfd->GetName().c_str(),
|
||||
cfd->current()->storage_info()->LevelSummary(&tmp));
|
||||
}
|
||||
VersionStorageInfo::LevelSummaryStorage tmp;
|
||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] Level summary: %s\n",
|
||||
cfd->GetName().c_str(),
|
||||
cfd->current()->storage_info()->LevelSummary(&tmp));
|
||||
}
|
||||
|
||||
// list wal_dir to discover new WALs and apply new changes to the secondary
|
||||
// instance
|
||||
if (s.ok()) {
|
||||
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
|
||||
}
|
||||
if (s.IsPathNotFound()) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"Secondary tries to read WAL, but WAL file(s) have already "
|
||||
"been purged by primary.");
|
||||
s = Status::OK();
|
||||
}
|
||||
if (s.ok()) {
|
||||
for (auto cfd : cfds_changed) {
|
||||
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
|
||||
&job_context.memtables_to_free);
|
||||
auto& sv_context = job_context.superversion_contexts.back();
|
||||
cfd->InstallSuperVersion(&sv_context, &mutex_);
|
||||
sv_context.NewSuperVersion();
|
||||
// list wal_dir to discover new WALs and apply new changes to the secondary
|
||||
// instance
|
||||
if (s.ok()) {
|
||||
s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
|
||||
}
|
||||
if (s.IsPathNotFound()) {
|
||||
ROCKS_LOG_INFO(
|
||||
immutable_db_options_.info_log,
|
||||
"Secondary tries to read WAL, but WAL file(s) have already "
|
||||
"been purged by primary.");
|
||||
s = Status::OK();
|
||||
}
|
||||
if (s.ok()) {
|
||||
for (auto cfd : cfds_changed) {
|
||||
cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
|
||||
&job_context.memtables_to_free);
|
||||
auto& sv_context = job_context.superversion_contexts.back();
|
||||
cfd->InstallSuperVersion(&sv_context, &mutex_);
|
||||
sv_context.NewSuperVersion();
|
||||
}
|
||||
}
|
||||
job_context.Clean();
|
||||
}
|
||||
job_context.Clean();
|
||||
|
||||
// Cleanup unused, obsolete files.
|
||||
JobContext purge_files_job_context(0);
|
||||
{
|
||||
InstrumentedMutexLock lock_guard(&mutex_);
|
||||
// Currently, secondary instance does not own the database files, thus it
|
||||
// is unnecessary for the secondary to force full scan.
|
||||
FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
|
||||
}
|
||||
if (purge_files_job_context.HaveSomethingToDelete()) {
|
||||
PurgeObsoleteFiles(purge_files_job_context);
|
||||
}
|
||||
purge_files_job_context.Clean();
|
||||
return s;
|
||||
}
|
||||
|
||||
|
@ -172,6 +172,24 @@ class DBImplSecondary : public DBImpl {
|
||||
return Status::NotSupported("Not supported operation in secondary mode.");
|
||||
}
|
||||
|
||||
using DBImpl::SetDBOptions;
|
||||
Status SetDBOptions(const std::unordered_map<std::string, std::string>&
|
||||
/*options_map*/) override {
|
||||
// Currently not supported because changing certain options may cause
|
||||
// flush/compaction.
|
||||
return Status::NotSupported("Not supported operation in secondary mode.");
|
||||
}
|
||||
|
||||
using DBImpl::SetOptions;
|
||||
Status SetOptions(
|
||||
ColumnFamilyHandle* /*cfd*/,
|
||||
const std::unordered_map<std::string, std::string>& /*options_map*/)
|
||||
override {
|
||||
// Currently not supported because changing certain options may cause
|
||||
// flush/compaction and/or write to MANIFEST.
|
||||
return Status::NotSupported("Not supported operation in secondary mode.");
|
||||
}
|
||||
|
||||
using DBImpl::SyncWAL;
|
||||
Status SyncWAL() override {
|
||||
return Status::NotSupported("Not supported operation in secondary mode.");
|
||||
@ -269,6 +287,14 @@ class DBImplSecondary : public DBImpl {
|
||||
return s;
|
||||
}
|
||||
|
||||
bool OwnTablesAndLogs() const override {
|
||||
// Currently, the secondary instance does not own the database files. It
|
||||
// simply opens the files of the primary instance and tracks their file
|
||||
// descriptors until they become obsolete. In the future, the secondary may
|
||||
// create links to database files. OwnTablesAndLogs will return true then.
|
||||
return false;
|
||||
}
|
||||
|
||||
private:
|
||||
friend class DB;
|
||||
|
||||
|
@ -195,6 +195,90 @@ TEST_F(DBSecondaryTest, OpenAsSecondary) {
|
||||
verify_db_func("new_foo_value", "new_bar_value");
|
||||
}
|
||||
|
||||
namespace {
|
||||
class TraceFileEnv : public EnvWrapper {
|
||||
public:
|
||||
explicit TraceFileEnv(Env* target) : EnvWrapper(target) {}
|
||||
Status NewRandomAccessFile(const std::string& f,
|
||||
std::unique_ptr<RandomAccessFile>* r,
|
||||
const EnvOptions& env_options) override {
|
||||
class TracedRandomAccessFile : public RandomAccessFile {
|
||||
public:
|
||||
TracedRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
|
||||
std::atomic<int>& counter)
|
||||
: target_(std::move(target)), files_closed_(counter) {}
|
||||
~TracedRandomAccessFile() override {
|
||||
files_closed_.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
Status Read(uint64_t offset, size_t n, Slice* result,
|
||||
char* scratch) const override {
|
||||
return target_->Read(offset, n, result, scratch);
|
||||
}
|
||||
|
||||
private:
|
||||
std::unique_ptr<RandomAccessFile> target_;
|
||||
std::atomic<int>& files_closed_;
|
||||
};
|
||||
Status s = target()->NewRandomAccessFile(f, r, env_options);
|
||||
if (s.ok()) {
|
||||
r->reset(new TracedRandomAccessFile(std::move(*r), files_closed_));
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
int files_closed() const {
|
||||
return files_closed_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
private:
|
||||
std::atomic<int> files_closed_{0};
|
||||
};
|
||||
} // namespace
|
||||
|
||||
TEST_F(DBSecondaryTest, SecondaryCloseFiles) {
|
||||
Options options;
|
||||
options.env = env_;
|
||||
options.max_open_files = 1;
|
||||
options.disable_auto_compactions = true;
|
||||
Reopen(options);
|
||||
Options options1;
|
||||
std::unique_ptr<Env> traced_env(new TraceFileEnv(env_));
|
||||
options1.env = traced_env.get();
|
||||
OpenSecondary(options1);
|
||||
|
||||
static const auto verify_db = [&]() {
|
||||
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
|
||||
std::unique_ptr<Iterator> iter2(db_secondary_->NewIterator(ReadOptions()));
|
||||
for (iter1->SeekToFirst(), iter2->SeekToFirst();
|
||||
iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
|
||||
ASSERT_EQ(iter1->key(), iter2->key());
|
||||
ASSERT_EQ(iter1->value(), iter2->value());
|
||||
}
|
||||
ASSERT_FALSE(iter1->Valid());
|
||||
ASSERT_FALSE(iter2->Valid());
|
||||
};
|
||||
|
||||
ASSERT_OK(Put("a", "value"));
|
||||
ASSERT_OK(Put("c", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
|
||||
verify_db();
|
||||
|
||||
ASSERT_OK(Put("b", "value"));
|
||||
ASSERT_OK(Put("d", "value"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
|
||||
verify_db();
|
||||
|
||||
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
|
||||
ASSERT_EQ(2, static_cast<TraceFileEnv*>(traced_env.get())->files_closed());
|
||||
|
||||
Status s = db_secondary_->SetDBOptions({{"max_open_files", "-1"}});
|
||||
ASSERT_TRUE(s.IsNotSupported());
|
||||
CloseSecondary();
|
||||
}
|
||||
|
||||
TEST_F(DBSecondaryTest, OpenAsSecondaryWALTailing) {
|
||||
Options options;
|
||||
options.env = env_;
|
||||
|
Loading…
Reference in New Issue
Block a user