BlobDB: Fix expired file not being evicted (#4294)
Summary: Fix expired file not being evicted from the DB. We have a background task (previously called `CheckSeqFiles` and I rename it to `EvictExpiredFiles`) to scan and remove expired files, but it only close the files, not marking them as expired. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4294 Differential Revision: D9415984 Pulled By: yiwu-arbug fbshipit-source-id: eff7bf0331c52a7ccdb02318602bff7f64f3ef3d
This commit is contained in:
parent
d5612b43de
commit
7188bd34f3
@ -212,8 +212,8 @@ void BlobDBImpl::StartBackgroundTasks() {
|
|||||||
tqueue_.add(kSanityCheckPeriodMillisecs,
|
tqueue_.add(kSanityCheckPeriodMillisecs,
|
||||||
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
|
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
|
||||||
tqueue_.add(
|
tqueue_.add(
|
||||||
kCheckSeqFilesPeriodMillisecs,
|
kEvictExpiredFilesPeriodMillisecs,
|
||||||
std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1));
|
std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
|
Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
|
||||||
@ -1282,29 +1282,38 @@ bool BlobDBImpl::VisibleToActiveSnapshot(
|
|||||||
return oldest_snapshot < obsolete_sequence;
|
return oldest_snapshot < obsolete_sequence;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
|
std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
|
||||||
if (aborted) return std::make_pair(false, -1);
|
if (aborted) {
|
||||||
|
return std::make_pair(false, -1);
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<std::shared_ptr<BlobFile>> process_files;
|
std::vector<std::shared_ptr<BlobFile>> process_files;
|
||||||
|
uint64_t now = EpochNow();
|
||||||
{
|
{
|
||||||
uint64_t epoch_now = EpochNow();
|
|
||||||
|
|
||||||
ReadLock rl(&mutex_);
|
ReadLock rl(&mutex_);
|
||||||
for (auto bfile : open_ttl_files_) {
|
for (auto p : blob_files_) {
|
||||||
|
auto& blob_file = p.second;
|
||||||
|
ReadLock file_lock(&blob_file->mutex_);
|
||||||
|
if (blob_file->HasTTL() && !blob_file->Obsolete() &&
|
||||||
|
blob_file->GetExpirationRange().second <= now) {
|
||||||
|
process_files.push_back(blob_file);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SequenceNumber seq = GetLatestSequenceNumber();
|
||||||
{
|
{
|
||||||
ReadLock lockbfile_r(&bfile->mutex_);
|
|
||||||
|
|
||||||
if (bfile->expiration_range_.second > epoch_now) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
process_files.push_back(bfile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
MutexLock l(&write_mutex_);
|
MutexLock l(&write_mutex_);
|
||||||
for (auto bfile : process_files) {
|
for (auto& blob_file : process_files) {
|
||||||
CloseBlobFile(bfile);
|
WriteLock file_lock(&blob_file->mutex_);
|
||||||
|
if (!blob_file->Immutable()) {
|
||||||
|
CloseBlobFile(blob_file, false /*need_lock*/);
|
||||||
|
}
|
||||||
|
// Need to double check if the file is obsolete.
|
||||||
|
if (!blob_file->Obsolete()) {
|
||||||
|
ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::make_pair(true, -1);
|
return std::make_pair(true, -1);
|
||||||
@ -1581,8 +1590,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// We don't add the file to open_ttl_files_ or open_non_ttl_files_, to
|
// We don't add the file to open_ttl_files_ or open_non_ttl_files_, to
|
||||||
// avoid user writes writing to the file, and avoid CheckSeqFiles close
|
// avoid user writes writing to the file, and avoid
|
||||||
// the file by mistake.
|
// EvictExpiredFiles close the file by mistake.
|
||||||
WriteLock wl(&mutex_);
|
WriteLock wl(&mutex_);
|
||||||
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
|
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
|
||||||
}
|
}
|
||||||
@ -1851,6 +1860,10 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
|
|||||||
|
|
||||||
void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
|
void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
|
||||||
|
|
||||||
|
void BlobDBImpl::TEST_EvictExpiredFiles() {
|
||||||
|
EvictExpiredFiles(false /*abort*/);
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
|
uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
|
||||||
#endif // !NDEBUG
|
#endif // !NDEBUG
|
||||||
|
|
||||||
|
@ -115,8 +115,8 @@ class BlobDBImpl : public BlobDB {
|
|||||||
// how often to schedule delete obs files periods
|
// how often to schedule delete obs files periods
|
||||||
static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
|
static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
|
||||||
|
|
||||||
// how often to schedule check seq files period
|
// how often to schedule expired files eviction.
|
||||||
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
|
static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
|
||||||
|
|
||||||
// when should oldest file be evicted:
|
// when should oldest file be evicted:
|
||||||
// on reaching 90% of blob_dir_size
|
// on reaching 90% of blob_dir_size
|
||||||
@ -204,6 +204,8 @@ class BlobDBImpl : public BlobDB {
|
|||||||
|
|
||||||
void TEST_RunGC();
|
void TEST_RunGC();
|
||||||
|
|
||||||
|
void TEST_EvictExpiredFiles();
|
||||||
|
|
||||||
void TEST_DeleteObsoleteFiles();
|
void TEST_DeleteObsoleteFiles();
|
||||||
|
|
||||||
uint64_t TEST_live_sst_size();
|
uint64_t TEST_live_sst_size();
|
||||||
@ -270,7 +272,7 @@ class BlobDBImpl : public BlobDB {
|
|||||||
|
|
||||||
// periodically check if open blob files and their TTL's has expired
|
// periodically check if open blob files and their TTL's has expired
|
||||||
// if expired, close the sequential writer and make the file immutable
|
// if expired, close the sequential writer and make the file immutable
|
||||||
std::pair<bool, int64_t> CheckSeqFiles(bool aborted);
|
std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
|
||||||
|
|
||||||
// if the number of open files, approaches ULIMIT's this
|
// if the number of open files, approaches ULIMIT's this
|
||||||
// task will close random readers, which are kept around for
|
// task will close random readers, which are kept around for
|
||||||
|
@ -1385,6 +1385,36 @@ TEST_F(BlobDBTest, FilterForFIFOEviction) {
|
|||||||
VerifyDB(data_after_compact);
|
VerifyDB(data_after_compact);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// File should be evicted after expiration.
|
||||||
|
TEST_F(BlobDBTest, EvictExpiredFile) {
|
||||||
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.ttl_range_secs = 100;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
|
bdb_options.disable_background_tasks = true;
|
||||||
|
Options options;
|
||||||
|
options.env = mock_env_.get();
|
||||||
|
Open(bdb_options, options);
|
||||||
|
mock_env_->set_current_time(50);
|
||||||
|
std::map<std::string, std::string> data;
|
||||||
|
ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
|
||||||
|
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
|
||||||
|
ASSERT_EQ(1, blob_files.size());
|
||||||
|
auto blob_file = blob_files[0];
|
||||||
|
ASSERT_FALSE(blob_file->Immutable());
|
||||||
|
ASSERT_FALSE(blob_file->Obsolete());
|
||||||
|
VerifyDB(data);
|
||||||
|
mock_env_->set_current_time(250);
|
||||||
|
// The key should expired now.
|
||||||
|
blob_db_impl()->TEST_EvictExpiredFiles();
|
||||||
|
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||||
|
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||||
|
ASSERT_TRUE(blob_file->Immutable());
|
||||||
|
ASSERT_TRUE(blob_file->Obsolete());
|
||||||
|
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||||
|
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||||
|
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace blob_db
|
} // namespace blob_db
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user