Merge branch 'master' of github.com:facebook/rocksdb
This commit is contained in:
commit
8f679c2901
@ -430,9 +430,10 @@ void DBImpl::MaybeDumpStats() {
|
||||
}
|
||||
}
|
||||
|
||||
// * Returns the list of live files in 'sst_live'
|
||||
// If it's doing full scan:
|
||||
// * Returns the list of live files in 'full_scan_sst_live' and the list
|
||||
// of all files in the filesystem in 'full_scan_candidate_files'.
|
||||
// * Returns the list of all files in the filesystem in
|
||||
// 'full_scan_candidate_files'.
|
||||
// Otherwise, gets obsolete files from VersionSet.
|
||||
// no_full_scan = true -- never do the full scan using GetChildren()
|
||||
// force = false -- don't force the full scan, except every
|
||||
@ -440,7 +441,6 @@ void DBImpl::MaybeDumpStats() {
|
||||
// force = true -- force the full scan
|
||||
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
bool no_full_scan) {
|
||||
// TODO(icanadi) clean up FindObsoleteFiles, no need to do full scans anymore
|
||||
mutex_.AssertHeld();
|
||||
|
||||
// if deletion is disabled, do nothing
|
||||
@ -482,13 +482,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
versions_->AddLiveFiles(&job_context->sst_live);
|
||||
if (doing_the_full_scan) {
|
||||
// Here we find all files in the DB directory and all the live files. In the
|
||||
// DeleteObsoleteFiles(), we will calculate a set difference (all_files -
|
||||
// live_files) and delete all files in that difference. If we're not doing
|
||||
// the full scan we don't need to get live files, because all files returned
|
||||
// by GetObsoleteFiles() will be dead (and need to be deleted)
|
||||
versions_->AddLiveFiles(&job_context->full_scan_sst_live);
|
||||
for (uint32_t path_id = 0; path_id < db_options_.db_paths.size();
|
||||
path_id++) {
|
||||
// set of all files in the directory. We'll exclude files that are still
|
||||
@ -554,7 +549,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
|
||||
// Now, convert live list to an unordered map, WITHOUT mutex held;
|
||||
// set is slow.
|
||||
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
|
||||
for (const FileDescriptor& fd : state.full_scan_sst_live) {
|
||||
for (const FileDescriptor& fd : state.sst_live) {
|
||||
sst_live_map[fd.GetNumber()] = &fd;
|
||||
}
|
||||
|
||||
@ -1566,7 +1561,6 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
|
||||
VersionEdit edit;
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) {
|
||||
f->moved = true;
|
||||
edit.DeleteFile(level, f->fd.GetNumber());
|
||||
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
@ -2223,7 +2217,6 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||
// Move file to next level
|
||||
assert(c->num_input_files(0) == 1);
|
||||
FileMetaData* f = c->input(0, 0);
|
||||
f->moved = true;
|
||||
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
||||
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
@ -3885,23 +3878,10 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
||||
const Options& soptions(SanitizeOptions(dbname, &comparator, options));
|
||||
Env* env = soptions.env;
|
||||
std::vector<std::string> filenames;
|
||||
std::vector<std::string> archiveFiles;
|
||||
|
||||
std::string archivedir = ArchivalDirectory(dbname);
|
||||
// Ignore error in case directory does not exist
|
||||
env->GetChildren(dbname, &filenames);
|
||||
|
||||
if (dbname != soptions.wal_dir) {
|
||||
std::vector<std::string> logfilenames;
|
||||
env->GetChildren(soptions.wal_dir, &logfilenames);
|
||||
filenames.insert(filenames.end(), logfilenames.begin(), logfilenames.end());
|
||||
archivedir = ArchivalDirectory(soptions.wal_dir);
|
||||
}
|
||||
|
||||
if (filenames.empty()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
FileLock* lock;
|
||||
const std::string lockname = LockFileName(dbname);
|
||||
Status result = env->LockFile(lockname, &lock);
|
||||
@ -3915,8 +3895,6 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
||||
Status del;
|
||||
if (type == kMetaDatabase) {
|
||||
del = DestroyDB(dbname + "/" + filenames[i], options);
|
||||
} else if (type == kLogFile) {
|
||||
del = env->DeleteFile(soptions.wal_dir + "/" + filenames[i]);
|
||||
} else {
|
||||
del = env->DeleteFile(dbname + "/" + filenames[i]);
|
||||
}
|
||||
@ -3939,6 +3917,24 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> walDirFiles;
|
||||
std::string archivedir = ArchivalDirectory(dbname);
|
||||
if (dbname != soptions.wal_dir) {
|
||||
env->GetChildren(soptions.wal_dir, &walDirFiles);
|
||||
archivedir = ArchivalDirectory(soptions.wal_dir);
|
||||
}
|
||||
|
||||
// Delete log files in the WAL dir
|
||||
for (const auto& file : walDirFiles) {
|
||||
if (ParseFileName(file, &number, &type) && type == kLogFile) {
|
||||
Status del = env->DeleteFile(soptions.wal_dir + "/" + file);
|
||||
if (result.ok() && !del.ok()) {
|
||||
result = del;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> archiveFiles;
|
||||
env->GetChildren(archivedir, &archiveFiles);
|
||||
// Delete archival files.
|
||||
for (size_t i = 0; i < archiveFiles.size(); ++i) {
|
||||
|
@ -624,7 +624,7 @@ class DBTest {
|
||||
options.db_log_dir = test::TmpDir(env_);
|
||||
break;
|
||||
case kWalDirAndMmapReads:
|
||||
options.wal_dir = test::TmpDir(env_) + "/wal";
|
||||
options.wal_dir = dbname_ + "/wal";
|
||||
// mmap reads should be orthogonal to WalDir setting, so we piggyback to
|
||||
// this option config to test mmap reads as well
|
||||
options.allow_mmap_reads = true;
|
||||
@ -2595,8 +2595,9 @@ TEST(DBTest, IgnoreRecoveredLog) {
|
||||
Options options = CurrentOptions();
|
||||
options.create_if_missing = true;
|
||||
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
|
||||
options.wal_dir = dbname_ + "/logs";
|
||||
DestroyAndReopen(options);
|
||||
options.wal_dir = dbname_ + "/wal";
|
||||
Destroy(options);
|
||||
Reopen(options);
|
||||
|
||||
// fill up the DB
|
||||
std::string one, two;
|
||||
@ -10255,6 +10256,80 @@ TEST(DBTest, DontDeleteMovedFile) {
|
||||
Reopen(options);
|
||||
}
|
||||
|
||||
TEST(DBTest, DeleteMovedFileAfterCompaction) {
|
||||
// iter 1 -- delete_obsolete_files_period_micros == 0
|
||||
for (int iter = 0; iter < 2; ++iter) {
|
||||
// This test triggers move compaction and verifies that the file is not
|
||||
// deleted when it's part of move compaction
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
if (iter == 1) {
|
||||
options.delete_obsolete_files_period_micros = 0;
|
||||
}
|
||||
options.create_if_missing = true;
|
||||
options.level0_file_num_compaction_trigger =
|
||||
2; // trigger compaction when we have 2 files
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
// Create two 1MB sst files
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
// Create 1MB sst file
|
||||
for (int j = 0; j < 100; ++j) {
|
||||
ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
// this should execute L0->L1
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("0,1", FilesPerLevel(0));
|
||||
|
||||
// block compactions
|
||||
SleepingBackgroundTask sleeping_task;
|
||||
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task,
|
||||
Env::Priority::LOW);
|
||||
|
||||
options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
|
||||
Reopen(options);
|
||||
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
|
||||
ASSERT_EQ("0,1", FilesPerLevel(0));
|
||||
// let compactions go
|
||||
sleeping_task.WakeUp();
|
||||
sleeping_task.WaitUntilDone();
|
||||
|
||||
// this should execute L1->L2 (move)
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
ASSERT_EQ("0,0,1", FilesPerLevel(0));
|
||||
|
||||
std::vector<LiveFileMetaData> metadata;
|
||||
db_->GetLiveFilesMetaData(&metadata);
|
||||
ASSERT_EQ(metadata.size(), 1U);
|
||||
auto moved_file_name = metadata[0].name;
|
||||
|
||||
// Create two more 1MB sst files
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
// Create 1MB sst file
|
||||
for (int j = 0; j < 100; ++j) {
|
||||
ASSERT_OK(Put(Key(i * 50 + j + 100), RandomString(&rnd, 10 * 1024)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
// this should execute both L0->L1 and L1->L2 (merge with previous file)
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
|
||||
ASSERT_EQ("0,0,2", FilesPerLevel(0));
|
||||
|
||||
// iterator is holding the file
|
||||
ASSERT_TRUE(env_->FileExists(dbname_ + "/" + moved_file_name));
|
||||
|
||||
iterator.reset();
|
||||
|
||||
// this file should have been compacted away
|
||||
ASSERT_TRUE(!env_->FileExists(dbname_ + "/" + moved_file_name));
|
||||
}
|
||||
}
|
||||
|
||||
TEST(DBTest, EncodeDecompressedBlockSizeTest) {
|
||||
// iter 0 -- zlib
|
||||
// iter 1 -- bzip2
|
||||
|
@ -43,8 +43,7 @@ struct JobContext {
|
||||
std::vector<CandidateFileInfo> full_scan_candidate_files;
|
||||
|
||||
// the list of all live sst files that cannot be deleted
|
||||
// (filled only if we're doing full scan)
|
||||
std::vector<FileDescriptor> full_scan_sst_live;
|
||||
std::vector<FileDescriptor> sst_live;
|
||||
|
||||
// a list of sst files that we need to delete
|
||||
std::vector<FileMetaData*> sst_delete_files;
|
||||
|
@ -215,7 +215,6 @@ class VersionBuilder::Rep {
|
||||
const int level = new_file.first;
|
||||
FileMetaData* f = new FileMetaData(new_file.second);
|
||||
f->refs = 1;
|
||||
f->moved = false;
|
||||
|
||||
assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
|
||||
levels_[level].added_files.end());
|
||||
|
@ -87,10 +87,6 @@ struct FileMetaData {
|
||||
bool init_stats_from_file; // true if the data-entry stats of this file
|
||||
// has initialized from file.
|
||||
|
||||
// Always false for new files. Set to true if the file was part of move
|
||||
// compaction. Can only be mutated from the compaction process, under DB mutex
|
||||
bool moved;
|
||||
|
||||
FileMetaData()
|
||||
: refs(0),
|
||||
being_compacted(false),
|
||||
@ -100,8 +96,7 @@ struct FileMetaData {
|
||||
num_deletions(0),
|
||||
raw_key_size(0),
|
||||
raw_value_size(0),
|
||||
init_stats_from_file(false),
|
||||
moved(false) {}
|
||||
init_stats_from_file(false) {}
|
||||
};
|
||||
|
||||
// A compressed copy of file meta data that just contain
|
||||
|
@ -309,13 +309,7 @@ Version::~Version() {
|
||||
cfd_->table_cache()->ReleaseHandle(f->table_reader_handle);
|
||||
f->table_reader_handle = nullptr;
|
||||
}
|
||||
if (!f->moved) {
|
||||
vset_->obsolete_files_.push_back(f);
|
||||
} else {
|
||||
// moved!
|
||||
// TODO(icanadi) delete this outside of mutex
|
||||
delete f;
|
||||
}
|
||||
vset_->obsolete_files_.push_back(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -204,7 +204,11 @@ DBOptions::DBOptions()
|
||||
env(Env::Default()),
|
||||
rate_limiter(nullptr),
|
||||
info_log(nullptr),
|
||||
#ifdef NDEBUG
|
||||
info_log_level(INFO_LEVEL),
|
||||
#else
|
||||
info_log_level(DEBUG_LEVEL),
|
||||
#endif // NDEBUG
|
||||
max_open_files(5000),
|
||||
max_total_wal_size(0),
|
||||
statistics(nullptr),
|
||||
|
@ -54,6 +54,7 @@ struct StateInfo {
|
||||
// rows in this global table.
|
||||
static StateInfo global_state_table[] = {
|
||||
{ThreadStatus::STATE_UNKNOWN, ""},
|
||||
{ThreadStatus::STATE_MUTEX_WAIT, "Mutex Wait"},
|
||||
};
|
||||
|
||||
#else
|
||||
|
Loading…
x
Reference in New Issue
Block a user