Sync parent directory after deleting a file in delete scheduler

Summary:
sync parent directory after deleting a file in delete scheduler. Otherwise, trim speed may not be as smooth as what we want.
Closes https://github.com/facebook/rocksdb/pull/3767

Differential Revision: D7760136

Pulled By: siying

fbshipit-source-id: ec131d53b61953f09c60d67e901e5eeb2716b05f
This commit is contained in:
Siying Dong 2018-04-26 13:51:39 -07:00 committed by Facebook Github Bot
parent 7e4e381495
commit 63c965cdb4
12 changed files with 95 additions and 43 deletions

View File

@ -11,6 +11,7 @@
* TransactionDBOptions::write_policy can be configured to enable WritePrepared 2PC transactions. Read more about them in the wiki.
* Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage.
* Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks.
* Fsync parent directory after deleting a file in delete scheduler.
### Bug Fixes
* Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob.

View File

@ -871,13 +871,14 @@ void DBImpl::BackgroundCallPurge() {
if (!purge_queue_.empty()) {
auto purge_file = purge_queue_.begin();
auto fname = purge_file->fname;
auto dir_to_sync = purge_file->dir_to_sync;
auto type = purge_file->type;
auto number = purge_file->number;
auto job_id = purge_file->job_id;
purge_queue_.pop_front();
mutex_.Unlock();
DeleteObsoleteFileImpl(job_id, fname, type, number);
DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
mutex_.Lock();
} else {
assert(!logs_to_free_queue_.empty());
@ -2443,7 +2444,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile) {
del = DeleteSSTFile(&soptions, path_to_delete);
del = DeleteSSTFile(&soptions, path_to_delete, dbname);
} else {
del = env->DeleteFile(path_to_delete);
}
@ -2478,7 +2479,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
if (ParseFileName(filenames[i], &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
std::string table_path = path + "/" + filenames[i];
Status del = DeleteSSTFile(&soptions, table_path);
Status del = DeleteSSTFile(&soptions, table_path, path);
if (result.ok() && !del.ok()) {
result = del;
}

View File

@ -798,7 +798,8 @@ class DBImpl : public DB {
void DeleteObsoleteFiles();
// Delete obsolete files and log status and information of file deletion
void DeleteObsoleteFileImpl(int job_id, const std::string& fname,
FileType type, uint64_t number);
const std::string& path_to_sync, FileType type,
uint64_t number);
// Background process needs to call
// auto x = CaptureCurrentFileNumberInPendingOutputs()
@ -919,8 +920,8 @@ class DBImpl : public DB {
void MaybeScheduleFlushOrCompaction();
void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
int job_id);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id);
static void BGWorkCompaction(void* arg);
// Runs a pre-chosen universal compaction involving bottom level in a
// separate, bottom-pri thread pool.
@ -1164,11 +1165,13 @@ class DBImpl : public DB {
// purge_queue_
struct PurgeFileInfo {
std::string fname;
std::string dir_to_sync;
FileType type;
uint64_t number;
int job_id;
PurgeFileInfo(std::string fn, FileType t, uint64_t num, int jid)
: fname(fn), type(t), number(num), job_id(jid) {}
PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num,
int jid)
: fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {}
};
// flush_queue_ and compaction_queue_ hold column families that we need to

View File

@ -1320,10 +1320,10 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
}
}
void DBImpl::SchedulePendingPurge(std::string fname, FileType type,
uint64_t number, int job_id) {
void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id) {
mutex_.AssertHeld();
PurgeFileInfo file_info(fname, type, number, job_id);
PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
purge_queue_.push_back(std::move(file_info));
}

View File

@ -351,11 +351,12 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
// Delete obsolete files and log status and information of file deletion
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
const std::string& path_to_sync,
FileType type, uint64_t number) {
Status file_deletion_status;
if (type == kTableFile) {
file_deletion_status =
DeleteSSTFile(&immutable_db_options_, fname);
DeleteSSTFile(&immutable_db_options_, fname, path_to_sync);
} else {
file_deletion_status = env_->DeleteFile(fname);
}
@ -518,13 +519,16 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
}
std::string fname;
std::string dir_to_sync;
if (type == kTableFile) {
// evict from cache
TableCache::Evict(table_cache_.get(), number);
fname = MakeTableFileName(candidate_file.file_path, number);
dir_to_sync = candidate_file.file_path;
} else {
fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) +
"/" + to_delete;
dir_to_sync =
(type == kLogFile) ? immutable_db_options_.wal_dir : dbname_;
fname = dir_to_sync + "/" + to_delete;
}
#ifndef ROCKSDB_LITE
@ -538,9 +542,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
Status file_deletion_status;
if (schedule_only) {
InstrumentedMutexLock guard_lock(&mutex_);
SchedulePendingPurge(fname, type, number, state.job_id);
SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
} else {
DeleteObsoleteFileImpl(state.job_id, fname, type, number);
DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
}
}

View File

@ -51,7 +51,8 @@ DeleteScheduler::~DeleteScheduler() {
}
}
Status DeleteScheduler::DeleteFile(const std::string& file_path) {
Status DeleteScheduler::DeleteFile(const std::string& file_path,
const std::string& dir_to_sync) {
Status s;
if (rate_bytes_per_sec_.load() <= 0 ||
total_trash_size_.load() >
@ -87,7 +88,7 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path) {
// Add file to delete queue
{
InstrumentedMutexLock l(&mu_);
queue_.push(trash_file);
queue_.emplace(trash_file, dir_to_sync);
pending_files_++;
if (pending_files_ == 1) {
cv_.SignalAll();
@ -128,7 +129,7 @@ Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
if (sfm) {
// We have an SstFileManager that will schedule the file delete
sfm->OnAddFile(trash_file);
file_delete = sfm->ScheduleFileDeletion(trash_file);
file_delete = sfm->ScheduleFileDeletion(trash_file, path);
} else {
// Delete the file immediately
file_delete = env->DeleteFile(trash_file);
@ -209,14 +210,16 @@ void DeleteScheduler::BackgroundEmptyTrash() {
}
// Get new file to delete
std::string path_in_trash = queue_.front();
const FileAndDir& fad = queue_.front();
std::string path_in_trash = fad.fname;
// We dont need to hold the lock while deleting the file
mu_.Unlock();
uint64_t deleted_bytes = 0;
bool is_complete = true;
// Delete file from trash and update total_penlty value
Status s = DeleteTrashFile(path_in_trash, &deleted_bytes, &is_complete);
Status s =
DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);
total_deleted_bytes += deleted_bytes;
mu_.Lock();
if (is_complete) {
@ -254,6 +257,7 @@ void DeleteScheduler::BackgroundEmptyTrash() {
}
Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
const std::string& dir_to_sync,
uint64_t* deleted_bytes,
bool* is_complete) {
uint64_t file_size;
@ -286,6 +290,18 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
if (need_full_delete) {
s = env_->DeleteFile(path_in_trash);
if (!dir_to_sync.empty()) {
std::unique_ptr<Directory> dir_obj;
if (s.ok()) {
s = env_->NewDirectory(dir_to_sync, &dir_obj);
}
if (s.ok()) {
s = dir_obj->Fsync();
TEST_SYNC_POINT_CALLBACK(
"DeleteScheduler::DeleteTrashFile::AfterSyncDir",
reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
}
}
*deleted_bytes = file_size;
sst_file_manager_->OnDeleteFile(path_in_trash);
}

View File

@ -47,7 +47,7 @@ class DeleteScheduler {
}
// Mark file as trash directory and schedule it's deletion
Status DeleteFile(const std::string& fname);
Status DeleteFile(const std::string& fname, const std::string& dir_to_sync);
// Wait for all files being deleteing in the background to finish or for
// destructor to be called.
@ -82,6 +82,7 @@ class DeleteScheduler {
Status MarkAsTrash(const std::string& file_path, std::string* path_in_trash);
Status DeleteTrashFile(const std::string& path_in_trash,
const std::string& dir_to_sync,
uint64_t* deleted_bytes, bool* is_complete);
void BackgroundEmptyTrash();
@ -93,8 +94,15 @@ class DeleteScheduler {
std::atomic<int64_t> rate_bytes_per_sec_;
// Mutex to protect queue_, pending_files_, bg_errors_, closing_
InstrumentedMutex mu_;
struct FileAndDir {
FileAndDir(const std::string& f, const std::string& d) : fname(f), dir(d) {}
std::string fname;
std::string dir; // empty will be skipped.
};
// Queue of trash files that need to be deleted
std::queue<std::string> queue_;
std::queue<FileAndDir> queue_;
// Number of trash files that are waiting to be deleted
int32_t pending_files_;
uint64_t bytes_max_delete_chunk_;

View File

@ -127,6 +127,13 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
int dir_synced = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile::AfterSyncDir", [&](void* arg) {
dir_synced++;
std::string* dir = reinterpret_cast<std::string*>(arg);
EXPECT_EQ(dummy_files_dirs_[0], *dir);
});
int num_files = 100; // 100 files
uint64_t file_size = 1024; // every file is 1 kb
@ -141,6 +148,7 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
NewDeleteScheduler();
dir_synced = 0;
// Create 100 dummy files, every file is 1 Kb
std::vector<std::string> generated_files;
for (int i = 0; i < num_files; i++) {
@ -150,7 +158,8 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
// Delete dummy files and measure time spent to empty trash
for (int i = 0; i < num_files; i++) {
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i]));
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i],
dummy_files_dirs_[0]));
}
ASSERT_EQ(CountNormalFiles(), 0);
@ -172,6 +181,8 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
}
ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
ASSERT_EQ(num_files, dir_synced);
ASSERT_EQ(CountTrashFiles(), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
@ -197,7 +208,7 @@ TEST_F(DeleteSchedulerTest, MultiDirectoryDeletionsScheduled) {
// Mark dummy files as trash
for (size_t i = 0; i < kNumFiles; i++) {
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i]));
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], ""));
ASSERT_EQ(0, CountNormalFiles(i));
ASSERT_EQ(1, CountTrashFiles(i));
}
@ -260,7 +271,7 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
int range_start = idx * num_files;
int range_end = range_start + num_files;
for (int j = range_start; j < range_end; j++) {
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[j]));
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[j], ""));
}
};
@ -313,7 +324,7 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
for (int i = 0; i < 10; i++) {
// Every file we delete will be deleted immediately
std::string dummy_file = NewDummyFile("dummy.data");
ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file));
ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file, ""));
ASSERT_TRUE(env_->FileExists(dummy_file).IsNotFound());
ASSERT_EQ(CountNormalFiles(), 0);
ASSERT_EQ(CountTrashFiles(), 0);
@ -343,7 +354,7 @@ TEST_F(DeleteSchedulerTest, ConflictNames) {
// Create "conflict.data" and move it to trash 10 times
for (int i = 0; i < 10; i++) {
std::string dummy_file = NewDummyFile("conflict.data");
ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file));
ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file, ""));
}
ASSERT_EQ(CountNormalFiles(), 0);
// 10 files ("conflict.data" x 10) in trash
@ -379,7 +390,7 @@ TEST_F(DeleteSchedulerTest, BackgroundError) {
// Generate 10 dummy files and move them to trash
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), ""));
}
ASSERT_EQ(CountNormalFiles(), 0);
ASSERT_EQ(CountTrashFiles(), 10);
@ -421,7 +432,7 @@ TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) {
// Generate 10 dummy files and move them to trash
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), ""));
}
ASSERT_EQ(CountNormalFiles(), 0);
delete_scheduler_->WaitForEmptyTrash();
@ -450,10 +461,13 @@ TEST_F(DeleteSchedulerTest, DeletePartialFile) {
NewDeleteScheduler();
// Should delete in 4 batch
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile("data_1", 500 * 1024)));
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile("data_2", 100 * 1024)));
ASSERT_OK(
delete_scheduler_->DeleteFile(NewDummyFile("data_1", 500 * 1024), ""));
ASSERT_OK(
delete_scheduler_->DeleteFile(NewDummyFile("data_2", 100 * 1024), ""));
// Should delete in 2 batch
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile("data_2", 200 * 1024)));
ASSERT_OK(
delete_scheduler_->DeleteFile(NewDummyFile("data_2", 200 * 1024), ""));
delete_scheduler_->WaitForEmptyTrash();
@ -481,7 +495,7 @@ TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) {
for (int i = 0; i < 100; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), ""));
}
// Deleting 100 files will need >28 hours to delete
@ -542,7 +556,7 @@ TEST_F(DeleteSchedulerTest, DISABLED_DynamicRateLimiting1) {
// Delete dummy files and measure time spent to empty trash
for (int i = 0; i < num_files; i++) {
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i]));
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], ""));
}
ASSERT_EQ(CountNormalFiles(), 0);
@ -602,7 +616,7 @@ TEST_F(DeleteSchedulerTest, ImmediateDeleteOn25PercDBSize) {
}
for (std::string& file_name : generated_files) {
delete_scheduler_->DeleteFile(file_name);
delete_scheduler_->DeleteFile(file_name, "");
}
// When we end up with 26 files in trash we will start

View File

@ -82,16 +82,17 @@ Status CreateFile(Env* env, const std::string& destination,
}
Status DeleteSSTFile(const ImmutableDBOptions* db_options,
const std::string& fname) {
const std::string& fname, const std::string& dir_to_sync) {
#ifndef ROCKSDB_LITE
auto sfm =
static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
if (sfm) {
return sfm->ScheduleFileDeletion(fname);
return sfm->ScheduleFileDeletion(fname, dir_to_sync);
} else {
return db_options->env->DeleteFile(fname);
}
#else
(void)dir_to_sync;
// SstFileManager is not supported in ROCKSDB_LITE
return db_options->env->DeleteFile(fname);
#endif

View File

@ -22,6 +22,7 @@ extern Status CreateFile(Env* env, const std::string& destination,
const std::string& contents);
extern Status DeleteSSTFile(const ImmutableDBOptions* db_options,
const std::string& fname);
const std::string& fname,
const std::string& path_to_sync);
} // namespace rocksdb

View File

@ -162,8 +162,9 @@ void SstFileManagerImpl::SetMaxTrashDBRatio(double r) {
return delete_scheduler_.SetMaxTrashDBRatio(r);
}
Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) {
return delete_scheduler_.DeleteFile(file_path);
Status SstFileManagerImpl::ScheduleFileDeletion(
const std::string& file_path, const std::string& path_to_sync) {
return delete_scheduler_.DeleteFile(file_path, path_to_sync);
}
void SstFileManagerImpl::WaitForEmptyTrash() {
@ -218,7 +219,8 @@ SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
std::string path_in_trash = trash_dir + "/" + trash_file;
res->OnAddFile(path_in_trash);
Status file_delete = res->ScheduleFileDeletion(path_in_trash);
Status file_delete =
res->ScheduleFileDeletion(path_in_trash, trash_dir);
if (s.ok() && !file_delete.ok()) {
s = file_delete;
}

View File

@ -94,7 +94,8 @@ class SstFileManagerImpl : public SstFileManager {
virtual void SetMaxTrashDBRatio(double ratio) override;
// Mark file as trash and schedule it's deletion.
virtual Status ScheduleFileDeletion(const std::string& file_path);
virtual Status ScheduleFileDeletion(const std::string& file_path,
const std::string& dir_to_sync);
// Wait for all files being deleteing in the background to finish or for
// destructor to be called.