Fix deleting obsolete files #2
Summary: For description of the bug, see comment in db_test. The fix is pretty straight forward. Test Plan: added unit test. eventually we need better testing of FOF/POF process. Reviewers: yhchiang, rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D33081
This commit is contained in:
parent
1851f977c2
commit
863009b5a5
@ -464,8 +464,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
}
|
||||
}
|
||||
|
||||
// don't delete files that might be currently written to from compaction
|
||||
// threads
|
||||
if (!pending_outputs_.empty()) {
|
||||
job_context->min_pending_output = *pending_outputs_.begin();
|
||||
} else {
|
||||
// delete all of them
|
||||
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
// get obsolete files
|
||||
versions_->GetObsoleteFiles(&job_context->sst_delete_files);
|
||||
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
|
||||
job_context->min_pending_output);
|
||||
|
||||
// store the current filenum, lognum, etc
|
||||
job_context->manifest_file_number = versions_->manifest_file_number();
|
||||
@ -474,14 +484,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
job_context->log_number = versions_->MinLogNumber();
|
||||
job_context->prev_log_number = versions_->prev_log_number();
|
||||
|
||||
// don't delete live files
|
||||
if (pending_outputs_.size()) {
|
||||
job_context->min_pending_output = *pending_outputs_.begin();
|
||||
} else {
|
||||
// delete all of them
|
||||
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
versions_->AddLiveFiles(&job_context->sst_live);
|
||||
if (doing_the_full_scan) {
|
||||
for (uint32_t path_id = 0; path_id < db_options_.db_paths.size();
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "db/job_context.h"
|
||||
#include "db/version_set.h"
|
||||
#include "db/write_batch_internal.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/cache.h"
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
#include "rocksdb/db.h"
|
||||
@ -10425,6 +10426,95 @@ TEST(DBTest, MutexWaitStats) {
|
||||
ThreadStatus::STATE_MUTEX_WAIT, 0);
|
||||
}
|
||||
|
||||
// This reproduces a bug where we don't delete a file because when it was
|
||||
// supposed to be deleted, it was blocked by pending_outputs
|
||||
// Consider:
|
||||
// 1. current file_number is 13
|
||||
// 2. compaction (1) starts, blocks deletion of all files starting with 13
|
||||
// (pending outputs)
|
||||
// 3. file 13 is created by compaction (2)
|
||||
// 4. file 13 is consumed by compaction (3) and file 15 was created. Since file
|
||||
// 13 has no references, it is put into VersionSet::obsolete_files_
|
||||
// 5. FindObsoleteFiles() gets file 13 from VersionSet::obsolete_files_. File 13
|
||||
// is deleted from obsolete_files_ set.
|
||||
// 6. PurgeObsoleteFiles() tries to delete file 13, but this file is blocked by
|
||||
// pending outputs since compaction (1) is still running. It is not deleted and
|
||||
// it is not present in obsolete_files_ anymore. Therefore, we never delete it.
|
||||
TEST(DBTest, DeleteObsoleteFilesPendingOutputs) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.write_buffer_size = 2 * 1024 * 1024; // 2 MB
|
||||
options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
|
||||
options.level0_file_num_compaction_trigger =
|
||||
2; // trigger compaction when we have 2 files
|
||||
options.max_background_flushes = 2;
|
||||
options.max_background_compactions = 2;
|
||||
Reopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
// Create two 1MB sst files
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
// Create 1MB sst file
|
||||
for (int j = 0; j < 100; ++j) {
|
||||
ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
// this should execute both L0->L1 and L1->(move)->L2 compactions
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("0,0,1", FilesPerLevel(0));
|
||||
|
||||
SleepingBackgroundTask blocking_thread;
|
||||
port::Mutex mutex_;
|
||||
bool already_blocked(false);
|
||||
|
||||
// block the flush
|
||||
std::function<void()> block_first_time = [&]() {
|
||||
bool blocking = false;
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
if (!already_blocked) {
|
||||
blocking = true;
|
||||
already_blocked = true;
|
||||
}
|
||||
}
|
||||
if (blocking) {
|
||||
blocking_thread.DoSleep();
|
||||
}
|
||||
};
|
||||
env_->table_write_callback_ = &block_first_time;
|
||||
// Create 1MB sst file
|
||||
for (int j = 0; j < 256; ++j) {
|
||||
ASSERT_OK(Put(Key(j), RandomString(&rnd, 10 * 1024)));
|
||||
}
|
||||
// this should trigger a flush, which is blocked with block_first_time
|
||||
// pending_file is protecting all the files created after
|
||||
|
||||
ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr));
|
||||
|
||||
ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
|
||||
std::vector<LiveFileMetaData> metadata;
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(metadata.size(), 1U);
|
||||
auto file_on_L2 = metadata[0].name;
|
||||
|
||||
ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr));
|
||||
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
|
||||
|
||||
// finish the flush!
|
||||
blocking_thread.WakeUp();
|
||||
blocking_thread.WaitUntilDone();
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
ASSERT_EQ("1,0,0,0,1", FilesPerLevel(0));
|
||||
|
||||
metadata.clear();
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(metadata.size(), 2U);
|
||||
|
||||
// This file should have been deleted
|
||||
ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + file_on_L2));
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -2772,9 +2772,17 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
}
|
||||
}
|
||||
|
||||
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files) {
|
||||
files->insert(files->end(), obsolete_files_.begin(), obsolete_files_.end());
|
||||
obsolete_files_.clear();
|
||||
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
||||
uint64_t min_pending_output) {
|
||||
std::vector<FileMetaData*> pending_files;
|
||||
for (auto f : obsolete_files_) {
|
||||
if (f->fd.GetNumber() < min_pending_output) {
|
||||
files->push_back(f);
|
||||
} else {
|
||||
pending_files.push_back(f);
|
||||
}
|
||||
}
|
||||
obsolete_files_.swap(pending_files);
|
||||
}
|
||||
|
||||
ColumnFamilyData* VersionSet::CreateColumnFamily(
|
||||
|
@ -590,7 +590,8 @@ class VersionSet {
|
||||
|
||||
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
|
||||
|
||||
void GetObsoleteFiles(std::vector<FileMetaData*>* files);
|
||||
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
|
||||
uint64_t min_pending_output);
|
||||
|
||||
ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); }
|
||||
const EnvOptions& env_options() { return env_options_; }
|
||||
|
Loading…
x
Reference in New Issue
Block a user