Blob DB: Evict oldest blob file when close to blob db size limit
Summary: Evict oldest blob file and put it in obsolete_files list when close to blob db size limit. The file will be delete when the `DeleteObsoleteFiles` background job runs next time. For now I set `kEvictOldestFileAtSize` constant, which controls when to evict the oldest file, at 90%. It could be tweaked or made into an option if really needed; I didn't want to expose it as an option pre-maturely as there are already too many :) . Closes https://github.com/facebook/rocksdb/pull/3094 Differential Revision: D6187340 Pulled By: sagar0 fbshipit-source-id: 687f8262101b9301bf964b94025a2fe9d8573421
This commit is contained in:
parent
c1e99eddc8
commit
f98efcb1e3
@ -62,7 +62,7 @@ bool blobf_compare_ttl::operator()(const std::shared_ptr<BlobFile>& lhs,
|
|||||||
if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
|
if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return lhs->BlobFileNumber() > rhs->BlobFileNumber();
|
return lhs->BlobFileNumber() < rhs->BlobFileNumber();
|
||||||
}
|
}
|
||||||
|
|
||||||
void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
|
void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
|
||||||
@ -117,7 +117,8 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
|
|||||||
total_periods_ampl_(0),
|
total_periods_ampl_(0),
|
||||||
total_blob_space_(0),
|
total_blob_space_(0),
|
||||||
open_p1_done_(false),
|
open_p1_done_(false),
|
||||||
debug_level_(0) {
|
debug_level_(0),
|
||||||
|
oldest_file_evicted_(false) {
|
||||||
blob_dir_ = (bdb_options_.path_relative)
|
blob_dir_ = (bdb_options_.path_relative)
|
||||||
? dbname + "/" + bdb_options_.blob_dir
|
? dbname + "/" + bdb_options_.blob_dir
|
||||||
: bdb_options_.blob_dir;
|
: bdb_options_.blob_dir;
|
||||||
@ -171,7 +172,8 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
|
|||||||
last_period_ampl_(0),
|
last_period_ampl_(0),
|
||||||
total_periods_write_(0),
|
total_periods_write_(0),
|
||||||
total_periods_ampl_(0),
|
total_periods_ampl_(0),
|
||||||
total_blob_space_(0) {
|
total_blob_space_(0),
|
||||||
|
oldest_file_evicted_(false) {
|
||||||
if (!bdb_options_.blob_dir.empty())
|
if (!bdb_options_.blob_dir.empty())
|
||||||
blob_dir_ = (bdb_options_.path_relative)
|
blob_dir_ = (bdb_options_.path_relative)
|
||||||
? db_->GetName() + "/" + bdb_options_.blob_dir
|
? db_->GetName() + "/" + bdb_options_.blob_dir
|
||||||
@ -931,20 +933,56 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
|
|||||||
return has_expiration ? expiration : kNoExpiration;
|
return has_expiration ? expiration : kNoExpiration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<BlobFile> BlobDBImpl::GetOldestBlobFile() {
|
||||||
|
std::vector<std::shared_ptr<BlobFile>> blob_files;
|
||||||
|
CopyBlobFiles(&blob_files, [](const std::shared_ptr<BlobFile>& f) {
|
||||||
|
return !f->Obsolete() && f->Immutable();
|
||||||
|
});
|
||||||
|
blobf_compare_ttl compare;
|
||||||
|
return *std::min_element(blob_files.begin(), blob_files.end(), compare);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool BlobDBImpl::EvictOldestBlobFile() {
|
||||||
|
auto oldest_file = GetOldestBlobFile();
|
||||||
|
if (oldest_file == nullptr) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
WriteLock wl(&mutex_);
|
||||||
|
oldest_file->SetCanBeDeleted();
|
||||||
|
obsolete_files_.push_front(oldest_file);
|
||||||
|
oldest_file_evicted_.store(true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status BlobDBImpl::CheckSize(size_t blob_size) {
|
||||||
|
uint64_t new_space_util = total_blob_space_.load() + blob_size;
|
||||||
|
if (bdb_options_.blob_dir_size > 0) {
|
||||||
|
if (!bdb_options_.is_fifo &&
|
||||||
|
(new_space_util > bdb_options_.blob_dir_size)) {
|
||||||
|
return Status::NoSpace(
|
||||||
|
"Write failed, as writing it would exceed blob_dir_size limit.");
|
||||||
|
}
|
||||||
|
if (bdb_options_.is_fifo && !oldest_file_evicted_.load() &&
|
||||||
|
(new_space_util >
|
||||||
|
kEvictOldestFileAtSize * bdb_options_.blob_dir_size)) {
|
||||||
|
EvictOldestBlobFile();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
||||||
const std::string& headerbuf, const Slice& key,
|
const std::string& headerbuf, const Slice& key,
|
||||||
const Slice& value, uint64_t expiration,
|
const Slice& value, uint64_t expiration,
|
||||||
std::string* index_entry) {
|
std::string* index_entry) {
|
||||||
auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size();
|
auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size();
|
||||||
if (bdb_options_.blob_dir_size > 0 &&
|
Status s = CheckSize(size_put);
|
||||||
(total_blob_space_.load() + size_put) > bdb_options_.blob_dir_size) {
|
if (!s.ok()) {
|
||||||
if (!bdb_options_.is_fifo) {
|
return s;
|
||||||
return Status::NoSpace("Blob DB reached the maximum configured size.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Status s;
|
|
||||||
|
|
||||||
uint64_t blob_offset = 0;
|
uint64_t blob_offset = 0;
|
||||||
uint64_t key_offset = 0;
|
uint64_t key_offset = 0;
|
||||||
{
|
{
|
||||||
@ -1910,7 +1948,12 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// directory change. Fsync
|
// directory change. Fsync
|
||||||
if (file_deleted) dir_ent_->Fsync();
|
if (file_deleted) {
|
||||||
|
dir_ent_->Fsync();
|
||||||
|
|
||||||
|
// reset oldest_file_evicted flag
|
||||||
|
oldest_file_evicted_.store(false);
|
||||||
|
}
|
||||||
|
|
||||||
// put files back into obsolete if for some reason, delete failed
|
// put files back into obsolete if for some reason, delete failed
|
||||||
if (!tobsolete.empty()) {
|
if (!tobsolete.empty()) {
|
||||||
@ -1924,13 +1967,18 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void BlobDBImpl::CopyBlobFiles(
|
void BlobDBImpl::CopyBlobFiles(
|
||||||
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
|
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy,
|
||||||
|
std::function<bool(const std::shared_ptr<BlobFile>&)> predicate) {
|
||||||
ReadLock rl(&mutex_);
|
ReadLock rl(&mutex_);
|
||||||
|
|
||||||
// take a copy
|
|
||||||
bfiles_copy->reserve(blob_files_.size());
|
|
||||||
for (auto const& p : blob_files_) {
|
for (auto const& p : blob_files_) {
|
||||||
bfiles_copy->push_back(p.second);
|
bool pred_value = true;
|
||||||
|
if (predicate) {
|
||||||
|
pred_value = predicate(p.second);
|
||||||
|
}
|
||||||
|
if (pred_value) {
|
||||||
|
bfiles_copy->push_back(p.second);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,6 +205,10 @@ class BlobDBImpl : public BlobDB {
|
|||||||
// how often to schedule check seq files period
|
// how often to schedule check seq files period
|
||||||
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
|
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
|
||||||
|
|
||||||
|
// when should oldest file be evicted:
|
||||||
|
// on reaching 90% of blob_dir_size
|
||||||
|
static constexpr double kEvictOldestFileAtSize = 0.9;
|
||||||
|
|
||||||
using BlobDB::Put;
|
using BlobDB::Put;
|
||||||
Status Put(const WriteOptions& options, const Slice& key,
|
Status Put(const WriteOptions& options, const Slice& key,
|
||||||
const Slice& value) override;
|
const Slice& value) override;
|
||||||
@ -414,7 +418,9 @@ class BlobDBImpl : public BlobDB {
|
|||||||
bool FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
|
bool FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
|
||||||
uint64_t blob_offset, uint64_t blob_size);
|
uint64_t blob_offset, uint64_t blob_size);
|
||||||
|
|
||||||
void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
|
void CopyBlobFiles(
|
||||||
|
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy,
|
||||||
|
std::function<bool(const std::shared_ptr<BlobFile>&)> predicate = {});
|
||||||
|
|
||||||
void FilterSubsetOfFiles(
|
void FilterSubsetOfFiles(
|
||||||
const std::vector<std::shared_ptr<BlobFile>>& blob_files,
|
const std::vector<std::shared_ptr<BlobFile>>& blob_files,
|
||||||
@ -423,6 +429,12 @@ class BlobDBImpl : public BlobDB {
|
|||||||
|
|
||||||
uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
|
uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
|
||||||
|
|
||||||
|
Status CheckSize(size_t blob_size);
|
||||||
|
|
||||||
|
std::shared_ptr<BlobFile> GetOldestBlobFile();
|
||||||
|
|
||||||
|
bool EvictOldestBlobFile();
|
||||||
|
|
||||||
// the base DB
|
// the base DB
|
||||||
DBImpl* db_impl_;
|
DBImpl* db_impl_;
|
||||||
Env* env_;
|
Env* env_;
|
||||||
@ -526,6 +538,8 @@ class BlobDBImpl : public BlobDB {
|
|||||||
bool open_p1_done_;
|
bool open_p1_done_;
|
||||||
|
|
||||||
uint32_t debug_level_;
|
uint32_t debug_level_;
|
||||||
|
|
||||||
|
std::atomic<bool> oldest_file_evicted_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace blob_db
|
} // namespace blob_db
|
||||||
|
@ -700,12 +700,15 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
|
|||||||
VerifyDB({{"foo", "v2"}});
|
VerifyDB({{"foo", "v2"}});
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) {
|
// This test is no longer valid since we now return an error when we go
|
||||||
|
// over the configured blob_dir_size.
|
||||||
|
// The test needs to be re-written later in such a way that writes continue
|
||||||
|
// after a GC happens.
|
||||||
|
TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
|
||||||
// Use mock env to stop wall clock.
|
// Use mock env to stop wall clock.
|
||||||
Options options;
|
Options options;
|
||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
bdb_options.is_fifo = true;
|
|
||||||
bdb_options.blob_dir_size = 100;
|
bdb_options.blob_dir_size = 100;
|
||||||
bdb_options.blob_file_size = 100;
|
bdb_options.blob_file_size = 100;
|
||||||
bdb_options.min_blob_size = 0;
|
bdb_options.min_blob_size = 0;
|
||||||
@ -927,7 +930,7 @@ TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Test to verify that a NoSpace IOError Status is returned on reaching
|
// Test to verify that a NoSpace IOError Status is returned on reaching
|
||||||
// blob_dir_size limit.
|
// blob_dir_size limit.
|
||||||
TEST_F(BlobDBTest, OutOfSpace) {
|
TEST_F(BlobDBTest, OutOfSpace) {
|
||||||
// Use mock env to stop wall clock.
|
// Use mock env to stop wall clock.
|
||||||
Options options;
|
Options options;
|
||||||
@ -949,6 +952,41 @@ TEST_F(BlobDBTest, OutOfSpace) {
|
|||||||
ASSERT_TRUE(s.IsNoSpace());
|
ASSERT_TRUE(s.IsNoSpace());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) {
|
||||||
|
// Use mock env to stop wall clock.
|
||||||
|
Options options;
|
||||||
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.blob_dir_size = 270;
|
||||||
|
bdb_options.blob_file_size = 100;
|
||||||
|
bdb_options.disable_background_tasks = true;
|
||||||
|
bdb_options.is_fifo = true;
|
||||||
|
Open(bdb_options);
|
||||||
|
|
||||||
|
// Each stored blob has an overhead of about 32 bytes currently.
|
||||||
|
// So a 100 byte blob should take up 132 bytes.
|
||||||
|
std::string value(100, 'v');
|
||||||
|
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
|
||||||
|
|
||||||
|
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
|
||||||
|
auto blob_files = bdb_impl->TEST_GetBlobFiles();
|
||||||
|
ASSERT_EQ(1, blob_files.size());
|
||||||
|
|
||||||
|
// Adding another 100 byte blob would take the total size to 264 bytes
|
||||||
|
// (2*132), which is more than 90% of blob_dir_size. So, the oldest file
|
||||||
|
// should be evicted and put in obsolete files list.
|
||||||
|
ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
|
||||||
|
|
||||||
|
auto obsolete_files = bdb_impl->TEST_GetObsoleteFiles();
|
||||||
|
ASSERT_EQ(1, obsolete_files.size());
|
||||||
|
ASSERT_TRUE(obsolete_files[0]->Immutable());
|
||||||
|
ASSERT_EQ(blob_files[0]->BlobFileNumber(),
|
||||||
|
obsolete_files[0]->BlobFileNumber());
|
||||||
|
|
||||||
|
bdb_impl->TEST_DeleteObsoleteFiles();
|
||||||
|
obsolete_files = bdb_impl->TEST_GetObsoleteFiles();
|
||||||
|
ASSERT_TRUE(obsolete_files.empty());
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(BlobDBTest, InlineSmallValues) {
|
TEST_F(BlobDBTest, InlineSmallValues) {
|
||||||
constexpr uint64_t kMaxExpiration = 1000;
|
constexpr uint64_t kMaxExpiration = 1000;
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user