Support SstFileManager::SetDeleteRateBytesPerSecond()
Summary: Update DeleteScheduler component to support changing delete rate in runtime by introducing SstFileManager::SetDeleteRateBytesPerSecond() Closes https://github.com/facebook/rocksdb/pull/1994 Differential Revision: D4719906 Pulled By: IslamAbdelRahman fbshipit-source-id: e6b8d9e
This commit is contained in:
parent
5e0bddf837
commit
ee33e299a0
@ -328,9 +328,10 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
|
||||
std::string trash_dir = test::TmpDir(env_) + "/trash";
|
||||
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
|
||||
Status s;
|
||||
options.sst_file_manager.reset(NewSstFileManager(
|
||||
env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
|
||||
options.sst_file_manager.reset(
|
||||
NewSstFileManager(env_, nullptr, trash_dir, 0, false, &s));
|
||||
ASSERT_OK(s);
|
||||
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
|
||||
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
|
||||
|
||||
ASSERT_OK(TryReopen(options));
|
||||
|
@ -50,6 +50,11 @@ class SstFileManager {
|
||||
// Return delete rate limit in bytes per second.
|
||||
// thread-safe
|
||||
virtual int64_t GetDeleteRateBytesPerSecond() = 0;
|
||||
|
||||
// Update the delete rate limit in bytes per second.
|
||||
// zero means disable delete rate limiting and delete files immediately
|
||||
// thread-safe
|
||||
virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) = 0;
|
||||
};
|
||||
|
||||
// Create a new SstFileManager that can be shared among multiple RocksDB
|
||||
|
@ -29,13 +29,8 @@ DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir,
|
||||
cv_(&mu_),
|
||||
info_log_(info_log),
|
||||
sst_file_manager_(sst_file_manager) {
|
||||
if (rate_bytes_per_sec_ <= 0) {
|
||||
// Rate limiting is disabled
|
||||
bg_thread_.reset();
|
||||
} else {
|
||||
bg_thread_.reset(
|
||||
new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
|
||||
}
|
||||
bg_thread_.reset(
|
||||
new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
|
||||
}
|
||||
|
||||
DeleteScheduler::~DeleteScheduler() {
|
||||
@ -51,8 +46,9 @@ DeleteScheduler::~DeleteScheduler() {
|
||||
|
||||
Status DeleteScheduler::DeleteFile(const std::string& file_path) {
|
||||
Status s;
|
||||
if (rate_bytes_per_sec_ <= 0) {
|
||||
if (rate_bytes_per_sec_.load() <= 0) {
|
||||
// Rate limiting is disabled
|
||||
TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
|
||||
s = env_->DeleteFile(file_path);
|
||||
if (s.ok() && sst_file_manager_) {
|
||||
sst_file_manager_->OnDeleteFile(file_path);
|
||||
@ -147,7 +143,16 @@ void DeleteScheduler::BackgroundEmptyTrash() {
|
||||
// Delete all files in queue_
|
||||
uint64_t start_time = env_->NowMicros();
|
||||
uint64_t total_deleted_bytes = 0;
|
||||
int64_t current_delete_rate = rate_bytes_per_sec_.load();
|
||||
while (!queue_.empty() && !closing_) {
|
||||
if (current_delete_rate != rate_bytes_per_sec_.load()) {
|
||||
// User changed the delete rate
|
||||
current_delete_rate = rate_bytes_per_sec_.load();
|
||||
start_time = env_->NowMicros();
|
||||
total_deleted_bytes = 0;
|
||||
}
|
||||
|
||||
// Get new file to delete
|
||||
std::string path_in_trash = queue_.front();
|
||||
queue_.pop();
|
||||
|
||||
@ -164,9 +169,16 @@ void DeleteScheduler::BackgroundEmptyTrash() {
|
||||
}
|
||||
|
||||
// Apply penlty if necessary
|
||||
uint64_t total_penlty =
|
||||
((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_);
|
||||
while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
|
||||
uint64_t total_penlty;
|
||||
if (current_delete_rate > 0) {
|
||||
// rate limiting is enabled
|
||||
total_penlty =
|
||||
((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
|
||||
while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
|
||||
} else {
|
||||
// rate limiting is disabled
|
||||
total_penlty = 0;
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
|
||||
&total_penlty);
|
||||
|
||||
|
@ -39,7 +39,12 @@ class DeleteScheduler {
|
||||
~DeleteScheduler();
|
||||
|
||||
// Return delete rate limit in bytes per second
|
||||
int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; }
|
||||
int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_.load(); }
|
||||
|
||||
// Set delete rate limit in bytes per second
|
||||
void SetRateBytesPerSecond(int64_t bytes_per_sec) {
|
||||
return rate_bytes_per_sec_.store(bytes_per_sec);
|
||||
}
|
||||
|
||||
// Move file to trash directory and schedule it's deletion
|
||||
Status DeleteFile(const std::string& fname);
|
||||
@ -64,7 +69,7 @@ class DeleteScheduler {
|
||||
// Path to the trash directory
|
||||
std::string trash_dir_;
|
||||
// Maximum number of bytes that should be deleted per second
|
||||
int64_t rate_bytes_per_sec_;
|
||||
std::atomic<int64_t> rate_bytes_per_sec_;
|
||||
// Mutex to protect queue_, pending_files_, bg_errors_, closing_
|
||||
InstrumentedMutex mu_;
|
||||
// Queue of files in trash that need to be deleted
|
||||
|
@ -422,6 +422,90 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) {
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DeleteSchedulerTest, DynamicRateLimiting1) {
|
||||
std::vector<uint64_t> penalties;
|
||||
int bg_delete_file = 0;
|
||||
int fg_delete_file = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DeleteScheduler::DeleteTrashFile:DeleteFile",
|
||||
[&](void* arg) { bg_delete_file++; });
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DeleteScheduler::DeleteFile",
|
||||
[&](void* arg) { fg_delete_file++; });
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"DeleteScheduler::BackgroundEmptyTrash:Wait",
|
||||
[&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
||||
{"DeleteSchedulerTest::DynamicRateLimiting1:1",
|
||||
"DeleteScheduler::BackgroundEmptyTrash"},
|
||||
});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
rate_bytes_per_sec_ = 0; // Disable rate limiting initially
|
||||
NewDeleteScheduler();
|
||||
|
||||
|
||||
int num_files = 10; // 10 files
|
||||
uint64_t file_size = 1024; // every file is 1 kb
|
||||
|
||||
std::vector<int64_t> delete_kbs_per_sec = {512, 200, 0, 100, 50, -2, 25};
|
||||
for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
|
||||
penalties.clear();
|
||||
bg_delete_file = 0;
|
||||
fg_delete_file = 0;
|
||||
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
DestroyAndCreateDir(dummy_files_dir_);
|
||||
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
|
||||
delete_scheduler_->SetRateBytesPerSecond(rate_bytes_per_sec_);
|
||||
|
||||
// Create 100 dummy files, every file is 1 Kb
|
||||
std::vector<std::string> generated_files;
|
||||
for (int i = 0; i < num_files; i++) {
|
||||
std::string file_name = "file" + ToString(i) + ".data";
|
||||
generated_files.push_back(NewDummyFile(file_name, file_size));
|
||||
}
|
||||
|
||||
// Delete dummy files and measure time spent to empty trash
|
||||
for (int i = 0; i < num_files; i++) {
|
||||
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i]));
|
||||
}
|
||||
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
|
||||
|
||||
if (rate_bytes_per_sec_ > 0) {
|
||||
uint64_t delete_start_time = env_->NowMicros();
|
||||
TEST_SYNC_POINT("DeleteSchedulerTest::DynamicRateLimiting1:1");
|
||||
delete_scheduler_->WaitForEmptyTrash();
|
||||
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
|
||||
|
||||
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
|
||||
ASSERT_EQ(bg_errors.size(), 0);
|
||||
|
||||
uint64_t total_files_size = 0;
|
||||
uint64_t expected_penlty = 0;
|
||||
ASSERT_EQ(penalties.size(), num_files);
|
||||
for (int i = 0; i < num_files; i++) {
|
||||
total_files_size += file_size;
|
||||
expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec_);
|
||||
ASSERT_EQ(expected_penlty, penalties[i]);
|
||||
}
|
||||
ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
|
||||
ASSERT_EQ(bg_delete_file, num_files);
|
||||
ASSERT_EQ(fg_delete_file, 0);
|
||||
} else {
|
||||
ASSERT_EQ(penalties.size(), 0);
|
||||
ASSERT_EQ(bg_delete_file, 0);
|
||||
ASSERT_EQ(fg_delete_file, num_files);
|
||||
}
|
||||
|
||||
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -87,6 +87,10 @@ int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
|
||||
return delete_scheduler_.GetRateBytesPerSecond();
|
||||
}
|
||||
|
||||
void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
|
||||
return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
|
||||
}
|
||||
|
||||
Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) {
|
||||
return delete_scheduler_.DeleteFile(file_path);
|
||||
}
|
||||
@ -127,7 +131,7 @@ SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
|
||||
new SstFileManagerImpl(env, info_log, trash_dir, rate_bytes_per_sec);
|
||||
|
||||
Status s;
|
||||
if (trash_dir != "" && rate_bytes_per_sec > 0) {
|
||||
if (trash_dir != "") {
|
||||
s = env->CreateDirIfMissing(trash_dir);
|
||||
if (s.ok() && delete_existing_trash) {
|
||||
std::vector<std::string> files_in_trash;
|
||||
|
@ -64,6 +64,9 @@ class SstFileManagerImpl : public SstFileManager {
|
||||
// Return delete rate limit in bytes per second.
|
||||
virtual int64_t GetDeleteRateBytesPerSecond() override;
|
||||
|
||||
// Update the delete rate limit in bytes per second.
|
||||
virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) override;
|
||||
|
||||
// Move file to trash directory and schedule it's deletion.
|
||||
virtual Status ScheduleFileDeletion(const std::string& file_path);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user