From 63c965cdb4983342fe5bffa44bceb6d0bfc071fb Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Thu, 26 Apr 2018 13:51:39 -0700 Subject: [PATCH] Sync parent directory after deleting a file in delete scheduler Summary: sync parent directory after deleting a file in delete scheduler. Otherwise, trim speed may not be as smooth as what we want. Closes https://github.com/facebook/rocksdb/pull/3767 Differential Revision: D7760136 Pulled By: siying fbshipit-source-id: ec131d53b61953f09c60d67e901e5eeb2716b05f --- HISTORY.md | 1 + db/db_impl.cc | 7 +++--- db/db_impl.h | 13 ++++++----- db/db_impl_compaction_flush.cc | 6 ++--- db/db_impl_files.cc | 14 +++++++----- util/delete_scheduler.cc | 26 +++++++++++++++++----- util/delete_scheduler.h | 12 ++++++++-- util/delete_scheduler_test.cc | 40 +++++++++++++++++++++++----------- util/file_util.cc | 5 +++-- util/file_util.h | 3 ++- util/sst_file_manager_impl.cc | 8 ++++--- util/sst_file_manager_impl.h | 3 ++- 12 files changed, 95 insertions(+), 43 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index c0809b919..0ecefece8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ * TransactionDBOptions::write_policy can be configured to enable WritePrepared 2PC transactions. Read more about them in the wiki. * Add DB properties "rocksdb.block-cache-capacity", "rocksdb.block-cache-usage", "rocksdb.block-cache-pinned-usage" to show block cache usage. * Add `Env::LowerThreadPoolCPUPriority(Priority)` method, which lowers the CPU priority of background (esp. compaction) threads to minimize interference with foreground tasks. +* Fsync parent directory after deleting a file in delete scheduler. ### Bug Fixes * Fsync after writing global seq number to the ingestion file in ExternalSstFileIngestionJob. diff --git a/db/db_impl.cc b/db/db_impl.cc index 1972f33b9..53c9a9ca7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -871,13 +871,14 @@ void DBImpl::BackgroundCallPurge() { if (!purge_queue_.empty()) { auto purge_file = purge_queue_.begin(); auto fname = purge_file->fname; + auto dir_to_sync = purge_file->dir_to_sync; auto type = purge_file->type; auto number = purge_file->number; auto job_id = purge_file->job_id; purge_queue_.pop_front(); mutex_.Unlock(); - DeleteObsoleteFileImpl(job_id, fname, type, number); + DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number); mutex_.Lock(); } else { assert(!logs_to_free_queue_.empty()); @@ -2443,7 +2444,7 @@ Status DestroyDB(const std::string& dbname, const Options& options, if (type == kMetaDatabase) { del = DestroyDB(path_to_delete, options); } else if (type == kTableFile) { - del = DeleteSSTFile(&soptions, path_to_delete); + del = DeleteSSTFile(&soptions, path_to_delete, dbname); } else { del = env->DeleteFile(path_to_delete); } @@ -2478,7 +2479,7 @@ Status DestroyDB(const std::string& dbname, const Options& options, if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) { // Lock file will be deleted at end std::string table_path = path + "/" + filenames[i]; - Status del = DeleteSSTFile(&soptions, table_path); + Status del = DeleteSSTFile(&soptions, table_path, path); if (result.ok() && !del.ok()) { result = del; } diff --git a/db/db_impl.h b/db/db_impl.h index f7af54349..25455f2b7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -798,7 +798,8 @@ class DBImpl : public DB { void DeleteObsoleteFiles(); // Delete obsolete files and log status and information of file deletion void DeleteObsoleteFileImpl(int job_id, const std::string& fname, - FileType type, uint64_t number); + const std::string& path_to_sync, FileType type, + uint64_t number); // Background process needs to call // auto x = CaptureCurrentFileNumberInPendingOutputs() @@ -919,8 +920,8 @@ class DBImpl : public DB { void MaybeScheduleFlushOrCompaction(); void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason); void SchedulePendingCompaction(ColumnFamilyData* cfd); - void SchedulePendingPurge(std::string fname, FileType type, uint64_t number, - int job_id); + void SchedulePendingPurge(std::string fname, std::string dir_to_sync, + FileType type, uint64_t number, int job_id); static void BGWorkCompaction(void* arg); // Runs a pre-chosen universal compaction involving bottom level in a // separate, bottom-pri thread pool. @@ -1164,11 +1165,13 @@ class DBImpl : public DB { // purge_queue_ struct PurgeFileInfo { std::string fname; + std::string dir_to_sync; FileType type; uint64_t number; int job_id; - PurgeFileInfo(std::string fn, FileType t, uint64_t num, int jid) - : fname(fn), type(t), number(num), job_id(jid) {} + PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num, + int jid) + : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {} }; // flush_queue_ and compaction_queue_ hold column families that we need to diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 48ca4be89..066616ba0 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -1320,10 +1320,10 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { } } -void DBImpl::SchedulePendingPurge(std::string fname, FileType type, - uint64_t number, int job_id) { +void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync, + FileType type, uint64_t number, int job_id) { mutex_.AssertHeld(); - PurgeFileInfo file_info(fname, type, number, job_id); + PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id); purge_queue_.push_back(std::move(file_info)); } diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index ecb84f6f7..9892c5b9d 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -351,11 +351,12 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, // Delete obsolete files and log status and information of file deletion void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, + const std::string& path_to_sync, FileType type, uint64_t number) { Status file_deletion_status; if (type == kTableFile) { file_deletion_status = - DeleteSSTFile(&immutable_db_options_, fname); + DeleteSSTFile(&immutable_db_options_, fname, path_to_sync); } else { file_deletion_status = env_->DeleteFile(fname); } @@ -518,13 +519,16 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { } std::string fname; + std::string dir_to_sync; if (type == kTableFile) { // evict from cache TableCache::Evict(table_cache_.get(), number); fname = MakeTableFileName(candidate_file.file_path, number); + dir_to_sync = candidate_file.file_path; } else { - fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) + - "/" + to_delete; + dir_to_sync = + (type == kLogFile) ? immutable_db_options_.wal_dir : dbname_; + fname = dir_to_sync + "/" + to_delete; } #ifndef ROCKSDB_LITE @@ -538,9 +542,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { Status file_deletion_status; if (schedule_only) { InstrumentedMutexLock guard_lock(&mutex_); - SchedulePendingPurge(fname, type, number, state.job_id); + SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id); } else { - DeleteObsoleteFileImpl(state.job_id, fname, type, number); + DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number); } } diff --git a/util/delete_scheduler.cc b/util/delete_scheduler.cc index 8b05a5c90..8ef66fdfa 100644 --- a/util/delete_scheduler.cc +++ b/util/delete_scheduler.cc @@ -51,7 +51,8 @@ DeleteScheduler::~DeleteScheduler() { } } -Status DeleteScheduler::DeleteFile(const std::string& file_path) { +Status DeleteScheduler::DeleteFile(const std::string& file_path, + const std::string& dir_to_sync) { Status s; if (rate_bytes_per_sec_.load() <= 0 || total_trash_size_.load() > @@ -87,7 +88,7 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path) { // Add file to delete queue { InstrumentedMutexLock l(&mu_); - queue_.push(trash_file); + queue_.emplace(trash_file, dir_to_sync); pending_files_++; if (pending_files_ == 1) { cv_.SignalAll(); @@ -128,7 +129,7 @@ Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm, if (sfm) { // We have an SstFileManager that will schedule the file delete sfm->OnAddFile(trash_file); - file_delete = sfm->ScheduleFileDeletion(trash_file); + file_delete = sfm->ScheduleFileDeletion(trash_file, path); } else { // Delete the file immediately file_delete = env->DeleteFile(trash_file); @@ -209,14 +210,16 @@ void DeleteScheduler::BackgroundEmptyTrash() { } // Get new file to delete - std::string path_in_trash = queue_.front(); + const FileAndDir& fad = queue_.front(); + std::string path_in_trash = fad.fname; // We dont need to hold the lock while deleting the file mu_.Unlock(); uint64_t deleted_bytes = 0; bool is_complete = true; // Delete file from trash and update total_penlty value - Status s = DeleteTrashFile(path_in_trash, &deleted_bytes, &is_complete); + Status s = + DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete); total_deleted_bytes += deleted_bytes; mu_.Lock(); if (is_complete) { @@ -254,6 +257,7 @@ void DeleteScheduler::BackgroundEmptyTrash() { } Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, + const std::string& dir_to_sync, uint64_t* deleted_bytes, bool* is_complete) { uint64_t file_size; @@ -286,6 +290,18 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, if (need_full_delete) { s = env_->DeleteFile(path_in_trash); + if (!dir_to_sync.empty()) { + std::unique_ptr dir_obj; + if (s.ok()) { + s = env_->NewDirectory(dir_to_sync, &dir_obj); + } + if (s.ok()) { + s = dir_obj->Fsync(); + TEST_SYNC_POINT_CALLBACK( + "DeleteScheduler::DeleteTrashFile::AfterSyncDir", + reinterpret_cast(const_cast(&dir_to_sync))); + } + } *deleted_bytes = file_size; sst_file_manager_->OnDeleteFile(path_in_trash); } diff --git a/util/delete_scheduler.h b/util/delete_scheduler.h index cc456dcb9..0d94ca72d 100644 --- a/util/delete_scheduler.h +++ b/util/delete_scheduler.h @@ -47,7 +47,7 @@ class DeleteScheduler { } // Mark file as trash directory and schedule it's deletion - Status DeleteFile(const std::string& fname); + Status DeleteFile(const std::string& fname, const std::string& dir_to_sync); // Wait for all files being deleteing in the background to finish or for // destructor to be called. @@ -82,6 +82,7 @@ class DeleteScheduler { Status MarkAsTrash(const std::string& file_path, std::string* path_in_trash); Status DeleteTrashFile(const std::string& path_in_trash, + const std::string& dir_to_sync, uint64_t* deleted_bytes, bool* is_complete); void BackgroundEmptyTrash(); @@ -93,8 +94,15 @@ class DeleteScheduler { std::atomic rate_bytes_per_sec_; // Mutex to protect queue_, pending_files_, bg_errors_, closing_ InstrumentedMutex mu_; + + struct FileAndDir { + FileAndDir(const std::string& f, const std::string& d) : fname(f), dir(d) {} + std::string fname; + std::string dir; // empty will be skipped. + }; + // Queue of trash files that need to be deleted - std::queue queue_; + std::queue queue_; // Number of trash files that are waiting to be deleted int32_t pending_files_; uint64_t bytes_max_delete_chunk_; diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc index f47469795..e7fd8d49f 100644 --- a/util/delete_scheduler_test.cc +++ b/util/delete_scheduler_test.cc @@ -127,6 +127,13 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "DeleteScheduler::BackgroundEmptyTrash:Wait", [&](void* arg) { penalties.push_back(*(static_cast(arg))); }); + int dir_synced = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteTrashFile::AfterSyncDir", [&](void* arg) { + dir_synced++; + std::string* dir = reinterpret_cast(arg); + EXPECT_EQ(dummy_files_dirs_[0], *dir); + }); int num_files = 100; // 100 files uint64_t file_size = 1024; // every file is 1 kb @@ -141,6 +148,7 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) { rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024; NewDeleteScheduler(); + dir_synced = 0; // Create 100 dummy files, every file is 1 Kb std::vector generated_files; for (int i = 0; i < num_files; i++) { @@ -150,7 +158,8 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) { // Delete dummy files and measure time spent to empty trash for (int i = 0; i < num_files; i++) { - ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i])); + ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], + dummy_files_dirs_[0])); } ASSERT_EQ(CountNormalFiles(), 0); @@ -172,6 +181,8 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) { } ASSERT_GT(time_spent_deleting, expected_penlty * 0.9); + ASSERT_EQ(num_files, dir_synced); + ASSERT_EQ(CountTrashFiles(), 0); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } @@ -197,7 +208,7 @@ TEST_F(DeleteSchedulerTest, MultiDirectoryDeletionsScheduled) { // Mark dummy files as trash for (size_t i = 0; i < kNumFiles; i++) { - ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i])); + ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], "")); ASSERT_EQ(0, CountNormalFiles(i)); ASSERT_EQ(1, CountTrashFiles(i)); } @@ -260,7 +271,7 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) { int range_start = idx * num_files; int range_end = range_start + num_files; for (int j = range_start; j < range_end; j++) { - ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[j])); + ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[j], "")); } }; @@ -313,7 +324,7 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) { for (int i = 0; i < 10; i++) { // Every file we delete will be deleted immediately std::string dummy_file = NewDummyFile("dummy.data"); - ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file)); + ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file, "")); ASSERT_TRUE(env_->FileExists(dummy_file).IsNotFound()); ASSERT_EQ(CountNormalFiles(), 0); ASSERT_EQ(CountTrashFiles(), 0); @@ -343,7 +354,7 @@ TEST_F(DeleteSchedulerTest, ConflictNames) { // Create "conflict.data" and move it to trash 10 times for (int i = 0; i < 10; i++) { std::string dummy_file = NewDummyFile("conflict.data"); - ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file)); + ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file, "")); } ASSERT_EQ(CountNormalFiles(), 0); // 10 files ("conflict.data" x 10) in trash @@ -379,7 +390,7 @@ TEST_F(DeleteSchedulerTest, BackgroundError) { // Generate 10 dummy files and move them to trash for (int i = 0; i < 10; i++) { std::string file_name = "data_" + ToString(i) + ".data"; - ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); + ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), "")); } ASSERT_EQ(CountNormalFiles(), 0); ASSERT_EQ(CountTrashFiles(), 10); @@ -421,7 +432,7 @@ TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) { // Generate 10 dummy files and move them to trash for (int i = 0; i < 10; i++) { std::string file_name = "data_" + ToString(i) + ".data"; - ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); + ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), "")); } ASSERT_EQ(CountNormalFiles(), 0); delete_scheduler_->WaitForEmptyTrash(); @@ -450,10 +461,13 @@ TEST_F(DeleteSchedulerTest, DeletePartialFile) { NewDeleteScheduler(); // Should delete in 4 batch - ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile("data_1", 500 * 1024))); - ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile("data_2", 100 * 1024))); + ASSERT_OK( + delete_scheduler_->DeleteFile(NewDummyFile("data_1", 500 * 1024), "")); + ASSERT_OK( + delete_scheduler_->DeleteFile(NewDummyFile("data_2", 100 * 1024), "")); // Should delete in 2 batch - ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile("data_2", 200 * 1024))); + ASSERT_OK( + delete_scheduler_->DeleteFile(NewDummyFile("data_2", 200 * 1024), "")); delete_scheduler_->WaitForEmptyTrash(); @@ -481,7 +495,7 @@ TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) { for (int i = 0; i < 100; i++) { std::string file_name = "data_" + ToString(i) + ".data"; - ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); + ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name), "")); } // Deleting 100 files will need >28 hours to delete @@ -542,7 +556,7 @@ TEST_F(DeleteSchedulerTest, DISABLED_DynamicRateLimiting1) { // Delete dummy files and measure time spent to empty trash for (int i = 0; i < num_files; i++) { - ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i])); + ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], "")); } ASSERT_EQ(CountNormalFiles(), 0); @@ -602,7 +616,7 @@ TEST_F(DeleteSchedulerTest, ImmediateDeleteOn25PercDBSize) { } for (std::string& file_name : generated_files) { - delete_scheduler_->DeleteFile(file_name); + delete_scheduler_->DeleteFile(file_name, ""); } // When we end up with 26 files in trash we will start diff --git a/util/file_util.cc b/util/file_util.cc index ba7e54fa5..195fe2ec4 100644 --- a/util/file_util.cc +++ b/util/file_util.cc @@ -82,16 +82,17 @@ Status CreateFile(Env* env, const std::string& destination, } Status DeleteSSTFile(const ImmutableDBOptions* db_options, - const std::string& fname) { + const std::string& fname, const std::string& dir_to_sync) { #ifndef ROCKSDB_LITE auto sfm = static_cast(db_options->sst_file_manager.get()); if (sfm) { - return sfm->ScheduleFileDeletion(fname); + return sfm->ScheduleFileDeletion(fname, dir_to_sync); } else { return db_options->env->DeleteFile(fname); } #else + (void)dir_to_sync; // SstFileManager is not supported in ROCKSDB_LITE return db_options->env->DeleteFile(fname); #endif diff --git a/util/file_util.h b/util/file_util.h index df220256f..4df597275 100644 --- a/util/file_util.h +++ b/util/file_util.h @@ -22,6 +22,7 @@ extern Status CreateFile(Env* env, const std::string& destination, const std::string& contents); extern Status DeleteSSTFile(const ImmutableDBOptions* db_options, - const std::string& fname); + const std::string& fname, + const std::string& path_to_sync); } // namespace rocksdb diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index 92f86f6d2..ea955e3fa 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -162,8 +162,9 @@ void SstFileManagerImpl::SetMaxTrashDBRatio(double r) { return delete_scheduler_.SetMaxTrashDBRatio(r); } -Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) { - return delete_scheduler_.DeleteFile(file_path); +Status SstFileManagerImpl::ScheduleFileDeletion( + const std::string& file_path, const std::string& path_to_sync) { + return delete_scheduler_.DeleteFile(file_path, path_to_sync); } void SstFileManagerImpl::WaitForEmptyTrash() { @@ -218,7 +219,8 @@ SstFileManager* NewSstFileManager(Env* env, std::shared_ptr info_log, std::string path_in_trash = trash_dir + "/" + trash_file; res->OnAddFile(path_in_trash); - Status file_delete = res->ScheduleFileDeletion(path_in_trash); + Status file_delete = + res->ScheduleFileDeletion(path_in_trash, trash_dir); if (s.ok() && !file_delete.ok()) { s = file_delete; } diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index 5d22725b3..73b9cf8b7 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -94,7 +94,8 @@ class SstFileManagerImpl : public SstFileManager { virtual void SetMaxTrashDBRatio(double ratio) override; // Mark file as trash and schedule it's deletion. - virtual Status ScheduleFileDeletion(const std::string& file_path); + virtual Status ScheduleFileDeletion(const std::string& file_path, + const std::string& dir_to_sync); // Wait for all files being deleteing in the background to finish or for // destructor to be called.