BlobDB: Implement DisableFileDeletions (#4314)
Summary: `DB::DiableFileDeletions` and `DB::EnableFileDeletions` are used for applications to stop RocksDB background jobs to delete files while they are doing replication. Implement these methods for BlobDB. `DeleteObsolteFiles` now needs to check `disable_file_deletions_` before starting, and will hold `delete_file_mutex_` the whole time while it is running. `DisableFileDeletions` needs to wait on `delete_file_mutex_` for running `DeleteObsolteFiles` job and set `disable_file_deletions_` flag. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4314 Differential Revision: D9501373 Pulled By: yiwu-arbug fbshipit-source-id: 81064c1228f1724eff46da22b50ff765b16292cd
This commit is contained in:
parent
096e5f5d02
commit
023e9bf214
@ -612,6 +612,7 @@ set(SOURCES
|
||||
utilities/blob_db/blob_compaction_filter.cc
|
||||
utilities/blob_db/blob_db.cc
|
||||
utilities/blob_db/blob_db_impl.cc
|
||||
utilities/blob_db/blob_db_impl_filesnapshot.cc
|
||||
utilities/blob_db/blob_dump_tool.cc
|
||||
utilities/blob_db/blob_file.cc
|
||||
utilities/blob_db/blob_log_reader.cc
|
||||
|
22
TARGETS
22
TARGETS
@ -233,6 +233,7 @@ cpp_library(
|
||||
"utilities/blob_db/blob_compaction_filter.cc",
|
||||
"utilities/blob_db/blob_db.cc",
|
||||
"utilities/blob_db/blob_db_impl.cc",
|
||||
"utilities/blob_db/blob_db_impl_filesnapshot.cc",
|
||||
"utilities/blob_db/blob_dump_tool.cc",
|
||||
"utilities/blob_db/blob_file.cc",
|
||||
"utilities/blob_db/blob_log_format.cc",
|
||||
@ -376,11 +377,6 @@ ROCKS_TESTS = [
|
||||
"table/block_based_filter_block_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"data_block_hash_index_test",
|
||||
"table/data_block_hash_index_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"block_test",
|
||||
"table/block_test.cc",
|
||||
@ -506,6 +502,11 @@ ROCKS_TESTS = [
|
||||
"table/cuckoo_table_reader_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"data_block_hash_index_test",
|
||||
"table/data_block_hash_index_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"date_tiered_test",
|
||||
"utilities/date_tiered/date_tiered_test.cc",
|
||||
@ -956,11 +957,6 @@ ROCKS_TESTS = [
|
||||
"tools/sst_dump_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"trace_analyzer_test",
|
||||
"tools/trace_analyzer_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"statistics_test",
|
||||
"monitoring/statistics_test.cc",
|
||||
@ -996,6 +992,11 @@ ROCKS_TESTS = [
|
||||
"util/timer_queue_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"trace_analyzer_test",
|
||||
"tools/trace_analyzer_test.cc",
|
||||
"serial",
|
||||
],
|
||||
[
|
||||
"transaction_test",
|
||||
"utilities/transactions/transaction_test.cc",
|
||||
@ -1094,3 +1095,4 @@ if not is_opt_mode:
|
||||
deps = [":" + test_bin],
|
||||
command = [TEST_RUNNER, BUCK_BINS + test_bin]
|
||||
)
|
||||
|
||||
|
2
src.mk
2
src.mk
@ -122,6 +122,7 @@ LIB_SOURCES = \
|
||||
table/plain_table_reader.cc \
|
||||
table/sst_file_writer.cc \
|
||||
table/table_properties.cc \
|
||||
tools/trace_analyzer_tool.cc \
|
||||
table/two_level_iterator.cc \
|
||||
tools/dump/db_dump_tool.cc \
|
||||
util/arena.cc \
|
||||
@ -161,6 +162,7 @@ LIB_SOURCES = \
|
||||
utilities/blob_db/blob_compaction_filter.cc \
|
||||
utilities/blob_db/blob_db.cc \
|
||||
utilities/blob_db/blob_db_impl.cc \
|
||||
utilities/blob_db/blob_db_impl_filesnapshot.cc \
|
||||
utilities/blob_db/blob_file.cc \
|
||||
utilities/blob_db/blob_log_format.cc \
|
||||
utilities/blob_db/blob_log_reader.cc \
|
||||
|
@ -621,39 +621,6 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
||||
return db_->Write(options, blob_inserter.batch());
|
||||
}
|
||||
|
||||
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
ret.reserve(ret.size() + blob_files_.size());
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
ret.emplace_back(blob_file->PathName());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
db_->GetLiveFilesMetaData(metadata);
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
LiveFileMetaData filemetadata;
|
||||
filemetadata.size = blob_file->GetFileSize();
|
||||
filemetadata.name = blob_file->PathName();
|
||||
auto cfh =
|
||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||
filemetadata.column_family_name = cfh->GetName();
|
||||
metadata->emplace_back(filemetadata);
|
||||
}
|
||||
}
|
||||
|
||||
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value) {
|
||||
return PutUntil(options, key, value, kNoExpiration);
|
||||
@ -1680,16 +1647,21 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
||||
}
|
||||
|
||||
std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
|
||||
if (aborted) return std::make_pair(false, -1);
|
||||
if (aborted) {
|
||||
return std::make_pair(false, -1);
|
||||
}
|
||||
|
||||
{
|
||||
ReadLock rl(&mutex_);
|
||||
if (obsolete_files_.empty()) return std::make_pair(true, -1);
|
||||
MutexLock delete_file_lock(&delete_file_mutex_);
|
||||
if (disable_file_deletions_ > 0) {
|
||||
return std::make_pair(true, -1);
|
||||
}
|
||||
|
||||
std::list<std::shared_ptr<BlobFile>> tobsolete;
|
||||
{
|
||||
WriteLock wl(&mutex_);
|
||||
if (obsolete_files_.empty()) {
|
||||
return std::make_pair(true, -1);
|
||||
}
|
||||
tobsolete.swap(obsolete_files_);
|
||||
}
|
||||
|
||||
|
@ -155,12 +155,6 @@ class BlobDBImpl : public BlobDB {
|
||||
|
||||
virtual Status Close() override;
|
||||
|
||||
virtual Status GetLiveFiles(std::vector<std::string>&,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable = true) override;
|
||||
virtual void GetLiveFilesMetaData(
|
||||
std::vector<LiveFileMetaData>* ) override;
|
||||
|
||||
using BlobDB::PutWithTTL;
|
||||
Status PutWithTTL(const WriteOptions& options, const Slice& key,
|
||||
const Slice& value, uint64_t ttl) override;
|
||||
@ -175,6 +169,15 @@ class BlobDBImpl : public BlobDB {
|
||||
const DBOptions& db_options,
|
||||
const ColumnFamilyOptions& cf_options);
|
||||
|
||||
virtual Status DisableFileDeletions() override;
|
||||
|
||||
virtual Status EnableFileDeletions(bool force) override;
|
||||
|
||||
virtual Status GetLiveFiles(std::vector<std::string>&,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable = true) override;
|
||||
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
|
||||
|
||||
~BlobDBImpl();
|
||||
|
||||
Status Open(std::vector<ColumnFamilyHandle*>* handles);
|
||||
@ -408,6 +411,26 @@ class BlobDBImpl : public BlobDB {
|
||||
|
||||
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
|
||||
|
||||
// DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
|
||||
// on the mutex to avoid contention.
|
||||
//
|
||||
// While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
|
||||
// the difference. mutex_ only needs to be held when access the
|
||||
// data-structure, and delete_file_mutex_ needs to be held the whole time
|
||||
// during DeleteObsoleteFiles to avoid being run simultaneously with
|
||||
// DisableFileDeletions.
|
||||
//
|
||||
// If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
|
||||
// to hold delete_file_mutex_ first to avoid deadlock.
|
||||
mutable port::Mutex delete_file_mutex_;
|
||||
|
||||
// Each call of DisableFileDeletions will increase disable_file_deletion_
|
||||
// by 1. EnableFileDeletions will either decrease the count by 1 or reset
|
||||
// it to zeor, depending on the force flag.
|
||||
//
|
||||
// REQUIRES: access with delete_file_mutex_ held.
|
||||
int disable_file_deletions_ = 0;
|
||||
|
||||
uint32_t debug_level_;
|
||||
};
|
||||
|
||||
|
97
utilities/blob_db/blob_db_impl_filesnapshot.cc
Normal file
97
utilities/blob_db/blob_db_impl_filesnapshot.cc
Normal file
@ -0,0 +1,97 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "utilities/blob_db/blob_db_impl.h"
|
||||
|
||||
#include "util/logging.h"
|
||||
#include "util/mutexlock.h"
|
||||
|
||||
// BlobDBImpl methods to get snapshot of files, e.g. for replication.
|
||||
|
||||
namespace rocksdb {
|
||||
namespace blob_db {
|
||||
|
||||
Status BlobDBImpl::DisableFileDeletions() {
|
||||
// Disable base DB file deletions.
|
||||
Status s = db_impl_->DisableFileDeletions();
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
{
|
||||
// Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job
|
||||
// is running.
|
||||
MutexLock l(&delete_file_mutex_);
|
||||
count = ++disable_file_deletions_;
|
||||
}
|
||||
|
||||
ROCKS_LOG_INFO(db_options_.info_log,
|
||||
"Disalbed blob file deletions. count: %d", count);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlobDBImpl::EnableFileDeletions(bool force) {
|
||||
// Enable base DB file deletions.
|
||||
Status s = db_impl_->EnableFileDeletions(force);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
int count = 0;
|
||||
{
|
||||
MutexLock l(&delete_file_mutex_);
|
||||
if (force) {
|
||||
disable_file_deletions_ = 0;
|
||||
} else if (disable_file_deletions_ > 0) {
|
||||
count = --disable_file_deletions_;
|
||||
}
|
||||
assert(count >= 0);
|
||||
}
|
||||
|
||||
ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d",
|
||||
count);
|
||||
// Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to
|
||||
// make DeleteobsoleteFiles re-run interval configuration.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
uint64_t* manifest_file_size,
|
||||
bool flush_memtable) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
ret.reserve(ret.size() + blob_files_.size());
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
ret.emplace_back(blob_file->PathName());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
// Hold a lock in the beginning to avoid updates to base DB during the call
|
||||
ReadLock rl(&mutex_);
|
||||
db_->GetLiveFilesMetaData(metadata);
|
||||
for (auto bfile_pair : blob_files_) {
|
||||
auto blob_file = bfile_pair.second;
|
||||
LiveFileMetaData filemetadata;
|
||||
filemetadata.size = blob_file->GetFileSize();
|
||||
filemetadata.name = blob_file->PathName();
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||
filemetadata.column_family_name = cfh->GetName();
|
||||
metadata->emplace_back(filemetadata);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
#endif // !ROCKSDB_LITE
|
@ -1415,6 +1415,47 @@ TEST_F(BlobDBTest, EvictExpiredFile) {
|
||||
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, DisableFileDeletions) {
|
||||
BlobDBOptions bdb_options;
|
||||
bdb_options.disable_background_tasks = true;
|
||||
Open(bdb_options);
|
||||
std::map<std::string, std::string> data;
|
||||
for (bool force : {true, false}) {
|
||||
ASSERT_OK(Put("foo", "v", &data));
|
||||
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
|
||||
ASSERT_EQ(1, blob_files.size());
|
||||
auto blob_file = blob_files[0];
|
||||
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
|
||||
blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
// Call DisableFileDeletions twice.
|
||||
ASSERT_OK(blob_db_->DisableFileDeletions());
|
||||
ASSERT_OK(blob_db_->DisableFileDeletions());
|
||||
// File deletions should be disabled.
|
||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
VerifyDB(data);
|
||||
// Enable file deletions once. If force=true, file deletion is enabled.
|
||||
// Otherwise it needs to enable it for a second time.
|
||||
ASSERT_OK(blob_db_->EnableFileDeletions(force));
|
||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||
if (!force) {
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
VerifyDB(data);
|
||||
// Call EnableFileDeletions a second time.
|
||||
ASSERT_OK(blob_db_->EnableFileDeletions(false));
|
||||
blob_db_impl()->TEST_DeleteObsoleteFiles();
|
||||
}
|
||||
// Regardless of value of `force`, file should be deleted by now.
|
||||
ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
|
||||
ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
|
||||
VerifyDB({});
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user