Speed up FindObsoleteFiles()
Summary: There are two versions of FindObsoleteFiles(): * full scan, which is executed every 6 hours (and it's terribly slow) * no full scan, which is executed every time a background process finishes and iterator is deleted This diff is optimizing the second case (no full scan). Here's what we do before the diff: * Get the list of obsolete files (files with ref==0). Some files in obsolete_files set might actually be live. * Get the list of live files to avoid deleting files that are live. * Delete files that are in obsolete_files and not in live_files. After this diff: * The only files with ref==0 that are still live are files that have been part of move compaction. Don't include moved files in obsolete_files. * Get the list of obsolete files (which exclude moved files). * No need to get the list of live files, since all files in obsolete_files need to be deleted. I'll post the benchmark results, but you can get the feel of it here: https://reviews.facebook.net/D30123 This depends on D30123. P.S. We should do full scan only in failure scenarios, not every 6 hours. I'll do this in a follow-up diff. Test Plan: One new unit test. Made sure that unit test fails if we don't have a `if (!f->moved)` safeguard in ~Version. make check Big number of compactions and flushes: ./db_stress --threads=30 --ops_per_thread=20000000 --max_key=10000 --column_families=20 --clear_column_family_one_in=10000000 --verify_before_write=0 --reopen=15 --max_background_compactions=10 --max_background_flushes=10 --db=/fast-rocksdb-tmp/db_stress --prefixpercent=0 --iterpercent=0 --writepercent=75 --db_write_buffer_size=2000000 Reviewers: yhchiang, rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D30249
This commit is contained in:
parent
d8c4ce6b50
commit
0acc738810
@ -7,6 +7,7 @@
|
||||
specifying them in db_paths along with the target_size.
|
||||
Lower numbered levels will be placed earlier in the db_paths and higher
|
||||
numbered levels will be placed later in the db_paths vector.
|
||||
* Potentially big performance improvements if you're using RocksDB with lots of column families (100-1000)
|
||||
|
||||
### 3.9.0 (12/8/2014)
|
||||
|
||||
|
@ -367,9 +367,8 @@ DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/"
|
||||
"deletepercent), so deletepercent must be smaller than (100 - "
|
||||
"FLAGS_readwritepercent)");
|
||||
|
||||
DEFINE_uint64(delete_obsolete_files_period_micros, 0, "Option to delete "
|
||||
"obsolete files periodically. 0 means that obsolete files are"
|
||||
" deleted after every compaction run.");
|
||||
DEFINE_uint64(delete_obsolete_files_period_micros, 0,
|
||||
"Ignored. Left here for backward compatibility");
|
||||
|
||||
namespace {
|
||||
enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
|
||||
@ -2008,8 +2007,6 @@ class Benchmark {
|
||||
options.compression_per_level[i] = FLAGS_compression_type_e;
|
||||
}
|
||||
}
|
||||
options.delete_obsolete_files_period_micros =
|
||||
FLAGS_delete_obsolete_files_period_micros;
|
||||
options.soft_rate_limit = FLAGS_soft_rate_limit;
|
||||
options.hard_rate_limit = FLAGS_hard_rate_limit;
|
||||
options.rate_limit_delay_max_milliseconds =
|
||||
|
@ -213,7 +213,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
||||
bg_flush_scheduled_(0),
|
||||
manual_compaction_(nullptr),
|
||||
disable_delete_obsolete_files_(0),
|
||||
delete_obsolete_files_last_run_(options.env->NowMicros()),
|
||||
delete_obsolete_files_next_run_(
|
||||
options.env->NowMicros() +
|
||||
db_options_.delete_obsolete_files_period_micros),
|
||||
last_stats_dump_time_microsec_(0),
|
||||
flush_on_destroy_(false),
|
||||
env_options_(options),
|
||||
@ -421,14 +423,17 @@ void DBImpl::MaybeDumpStats() {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the list of live files in 'sst_live' and the list
|
||||
// of all files in the filesystem in 'candidate_files'.
|
||||
// If it's doing full scan:
|
||||
// * Returns the list of live files in 'full_scan_sst_live' and the list
|
||||
// of all files in the filesystem in 'full_scan_candidate_files'.
|
||||
// Otherwise, gets obsolete files from VersionSet.
|
||||
// no_full_scan = true -- never do the full scan using GetChildren()
|
||||
// force = false -- don't force the full scan, except every
|
||||
// db_options_.delete_obsolete_files_period_micros
|
||||
// force = true -- force the full scan
|
||||
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
bool no_full_scan) {
|
||||
// TODO(icanadi) clean up FindObsoleteFiles, no need to do full scans anymore
|
||||
mutex_.AssertHeld();
|
||||
|
||||
// if deletion is disabled, do nothing
|
||||
@ -445,10 +450,10 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
doing_the_full_scan = true;
|
||||
} else {
|
||||
const uint64_t now_micros = env_->NowMicros();
|
||||
if (delete_obsolete_files_last_run_ +
|
||||
db_options_.delete_obsolete_files_period_micros < now_micros) {
|
||||
if (delete_obsolete_files_next_run_ < now_micros) {
|
||||
doing_the_full_scan = true;
|
||||
delete_obsolete_files_last_run_ = now_micros;
|
||||
delete_obsolete_files_next_run_ =
|
||||
now_micros + db_options_.delete_obsolete_files_period_micros;
|
||||
}
|
||||
}
|
||||
|
||||
@ -462,13 +467,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
job_context->log_number = versions_->MinLogNumber();
|
||||
job_context->prev_log_number = versions_->prev_log_number();
|
||||
|
||||
if (!doing_the_full_scan && !job_context->HaveSomethingToDelete()) {
|
||||
// avoid filling up sst_live if we're sure that we
|
||||
// are not going to do the full scan and that we don't have
|
||||
// anything to delete at the moment
|
||||
return;
|
||||
}
|
||||
|
||||
// don't delete live files
|
||||
if (pending_outputs_.size()) {
|
||||
job_context->min_pending_output = *pending_outputs_.begin();
|
||||
@ -476,11 +474,16 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
// delete all of them
|
||||
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
versions_->AddLiveFiles(&job_context->sst_live);
|
||||
|
||||
if (doing_the_full_scan) {
|
||||
for (uint32_t path_id = 0;
|
||||
path_id < db_options_.db_paths.size(); path_id++) {
|
||||
// Here we find all files in the DB directory and all the live files. In the
|
||||
// DeleteObsoleteFiles(), we will calculate a set difference (all_files -
|
||||
// live_files) and delete all files in that difference. If we're not doing
|
||||
// the full scan we don't need to get live files, because all files returned
|
||||
// by GetObsoleteFiles() will be dead (and need to be deleted)
|
||||
versions_->AddLiveFiles(&job_context->full_scan_sst_live);
|
||||
for (uint32_t path_id = 0; path_id < db_options_.db_paths.size();
|
||||
path_id++) {
|
||||
// set of all files in the directory. We'll exclude files that are still
|
||||
// alive in the subsequent processings.
|
||||
std::vector<std::string> files;
|
||||
@ -488,7 +491,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
&files); // Ignore errors
|
||||
for (std::string file : files) {
|
||||
// TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
|
||||
job_context->candidate_files.emplace_back("/" + file, path_id);
|
||||
job_context->full_scan_candidate_files.emplace_back("/" + file,
|
||||
path_id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -497,7 +501,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
std::vector<std::string> log_files;
|
||||
env_->GetChildren(db_options_.wal_dir, &log_files); // Ignore errors
|
||||
for (std::string log_file : log_files) {
|
||||
job_context->candidate_files.emplace_back(log_file, 0);
|
||||
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
|
||||
}
|
||||
}
|
||||
// Add info log files in db_log_dir
|
||||
@ -506,7 +510,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
// Ignore errors
|
||||
env_->GetChildren(db_options_.db_log_dir, &info_log_files);
|
||||
for (std::string log_file : info_log_files) {
|
||||
job_context->candidate_files.emplace_back(log_file, 0);
|
||||
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -543,11 +547,11 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
|
||||
// Now, convert live list to an unordered map, WITHOUT mutex held;
|
||||
// set is slow.
|
||||
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
|
||||
for (const FileDescriptor& fd : state.sst_live) {
|
||||
for (const FileDescriptor& fd : state.full_scan_sst_live) {
|
||||
sst_live_map[fd.GetNumber()] = &fd;
|
||||
}
|
||||
|
||||
auto candidate_files = state.candidate_files;
|
||||
auto candidate_files = state.full_scan_candidate_files;
|
||||
candidate_files.reserve(candidate_files.size() +
|
||||
state.sst_delete_files.size() +
|
||||
state.log_delete_files.size());
|
||||
@ -1491,6 +1495,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
|
||||
VersionEdit edit;
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
for (const auto& f : cfd->current()->storage_info()->LevelFiles(level)) {
|
||||
f->moved = true;
|
||||
edit.DeleteFile(level, f->fd.GetNumber());
|
||||
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
@ -2137,6 +2142,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
|
||||
// Move file to next level
|
||||
assert(c->num_input_files(0) == 1);
|
||||
FileMetaData* f = c->input(0, 0);
|
||||
f->moved = true;
|
||||
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
||||
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
|
@ -532,8 +532,8 @@ class DBImpl : public DB {
|
||||
// without any synchronization
|
||||
int disable_delete_obsolete_files_;
|
||||
|
||||
// last time when DeleteObsoleteFiles was invoked
|
||||
uint64_t delete_obsolete_files_last_run_;
|
||||
// next time when we should run DeleteObsoleteFiles with full scan
|
||||
uint64_t delete_obsolete_files_next_run_;
|
||||
|
||||
// last time stats were dumped to LOG
|
||||
std::atomic<uint64_t> last_stats_dump_time_microsec_;
|
||||
|
@ -85,7 +85,7 @@ static bool LZ4HCCompressionSupported(const CompressionOptions &options) {
|
||||
return port::LZ4HC_Compress(options, in.data(), in.size(), &out);
|
||||
}
|
||||
|
||||
static std::string RandomString(Random *rnd, int len) {
|
||||
static std::string RandomString(Random* rnd, int len) {
|
||||
std::string r;
|
||||
test::RandomString(rnd, len, &r);
|
||||
return r;
|
||||
@ -9993,6 +9993,36 @@ TEST(DBTest, DontDeletePendingOutputs) {
|
||||
Compact("a", "b");
|
||||
}
|
||||
|
||||
TEST(DBTest, DontDeleteMovedFile) {
|
||||
// This test triggers move compaction and verifies that the file is not
|
||||
// deleted when it's part of move compaction
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
|
||||
options.level0_file_num_compaction_trigger =
|
||||
2; // trigger compaction when we have 2 files
|
||||
DestroyAndReopen(options);
|
||||
|
||||
Random rnd(301);
|
||||
// Create two 1MB sst files
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
// Create 1MB sst file
|
||||
for (int j = 0; j < 100; ++j) {
|
||||
ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
|
||||
}
|
||||
ASSERT_OK(Flush());
|
||||
}
|
||||
// this should execute both L0->L1 and L1->(move)->L2 compactions
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
ASSERT_EQ("0,0,1", FilesPerLevel(0));
|
||||
|
||||
// If the moved file is actually deleted (the move-safeguard in
|
||||
// ~Version::Version() is not there), we get this failure:
|
||||
// Corruption: Can't access /000009.sst
|
||||
Reopen(options);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -20,7 +20,7 @@ class MemTable;
|
||||
|
||||
struct JobContext {
|
||||
inline bool HaveSomethingToDelete() const {
|
||||
return candidate_files.size() || sst_delete_files.size() ||
|
||||
return full_scan_candidate_files.size() || sst_delete_files.size() ||
|
||||
log_delete_files.size() || new_superversion != nullptr ||
|
||||
superversions_to_free.size() > 0 || memtables_to_free.size() > 0;
|
||||
}
|
||||
@ -39,10 +39,12 @@ struct JobContext {
|
||||
// a list of all files that we'll consider deleting
|
||||
// (every once in a while this is filled up with all files
|
||||
// in the DB directory)
|
||||
std::vector<CandidateFileInfo> candidate_files;
|
||||
// (filled only if we're doing full scan)
|
||||
std::vector<CandidateFileInfo> full_scan_candidate_files;
|
||||
|
||||
// the list of all live sst files that cannot be deleted
|
||||
std::vector<FileDescriptor> sst_live;
|
||||
// (filled only if we're doing full scan)
|
||||
std::vector<FileDescriptor> full_scan_sst_live;
|
||||
|
||||
// a list of sst files that we need to delete
|
||||
std::vector<FileMetaData*> sst_delete_files;
|
||||
|
@ -206,6 +206,7 @@ class VersionBuilder::Rep {
|
||||
const int level = new_file.first;
|
||||
FileMetaData* f = new FileMetaData(new_file.second);
|
||||
f->refs = 1;
|
||||
f->moved = false;
|
||||
|
||||
assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
|
||||
levels_[level].added_files.end());
|
||||
|
@ -85,6 +85,10 @@ struct FileMetaData {
|
||||
bool init_stats_from_file; // true if the data-entry stats of this file
|
||||
// has initialized from file.
|
||||
|
||||
// Always false for new files. Set to true if the file was part of move
|
||||
// compaction. Can only be mutated from the compaction process, under DB mutex
|
||||
bool moved;
|
||||
|
||||
FileMetaData()
|
||||
: refs(0),
|
||||
being_compacted(false),
|
||||
@ -94,7 +98,8 @@ struct FileMetaData {
|
||||
num_deletions(0),
|
||||
raw_key_size(0),
|
||||
raw_value_size(0),
|
||||
init_stats_from_file(false) {}
|
||||
init_stats_from_file(false),
|
||||
moved(false) {}
|
||||
};
|
||||
|
||||
// A compressed copy of file meta data that just contain
|
||||
|
@ -309,7 +309,13 @@ Version::~Version() {
|
||||
cfd_->table_cache()->ReleaseHandle(f->table_reader_handle);
|
||||
f->table_reader_handle = nullptr;
|
||||
}
|
||||
vset_->obsolete_files_.push_back(f);
|
||||
if (!f->moved) {
|
||||
vset_->obsolete_files_.push_back(f);
|
||||
} else {
|
||||
// moved!
|
||||
// TODO(icanadi) delete this outside of mutex
|
||||
delete f;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user