diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 7c27be789..db01aadf7 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -62,7 +62,7 @@ bool blobf_compare_ttl::operator()(const std::shared_ptr& lhs, if (lhs->expiration_range_.first > rhs->expiration_range_.first) { return false; } - return lhs->BlobFileNumber() > rhs->BlobFileNumber(); + return lhs->BlobFileNumber() < rhs->BlobFileNumber(); } void EvictAllVersionsCompactionListener::InternalListener::OnCompaction( @@ -117,7 +117,8 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, total_periods_ampl_(0), total_blob_space_(0), open_p1_done_(false), - debug_level_(0) { + debug_level_(0), + oldest_file_evicted_(false) { blob_dir_ = (bdb_options_.path_relative) ? dbname + "/" + bdb_options_.blob_dir : bdb_options_.blob_dir; @@ -171,7 +172,8 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options) last_period_ampl_(0), total_periods_write_(0), total_periods_ampl_(0), - total_blob_space_(0) { + total_blob_space_(0), + oldest_file_evicted_(false) { if (!bdb_options_.blob_dir.empty()) blob_dir_ = (bdb_options_.path_relative) ? db_->GetName() + "/" + bdb_options_.blob_dir @@ -931,20 +933,56 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value, return has_expiration ? expiration : kNoExpiration; } +std::shared_ptr BlobDBImpl::GetOldestBlobFile() { + std::vector> blob_files; + CopyBlobFiles(&blob_files, [](const std::shared_ptr& f) { + return !f->Obsolete() && f->Immutable(); + }); + blobf_compare_ttl compare; + return *std::min_element(blob_files.begin(), blob_files.end(), compare); +} + +bool BlobDBImpl::EvictOldestBlobFile() { + auto oldest_file = GetOldestBlobFile(); + if (oldest_file == nullptr) { + return false; + } + + WriteLock wl(&mutex_); + oldest_file->SetCanBeDeleted(); + obsolete_files_.push_front(oldest_file); + oldest_file_evicted_.store(true); + return true; +} + +Status BlobDBImpl::CheckSize(size_t blob_size) { + uint64_t new_space_util = total_blob_space_.load() + blob_size; + if (bdb_options_.blob_dir_size > 0) { + if (!bdb_options_.is_fifo && + (new_space_util > bdb_options_.blob_dir_size)) { + return Status::NoSpace( + "Write failed, as writing it would exceed blob_dir_size limit."); + } + if (bdb_options_.is_fifo && !oldest_file_evicted_.load() && + (new_space_util > + kEvictOldestFileAtSize * bdb_options_.blob_dir_size)) { + EvictOldestBlobFile(); + } + } + + return Status::OK(); +} + Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, const Slice& value, uint64_t expiration, std::string* index_entry) { auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size(); - if (bdb_options_.blob_dir_size > 0 && - (total_blob_space_.load() + size_put) > bdb_options_.blob_dir_size) { - if (!bdb_options_.is_fifo) { - return Status::NoSpace("Blob DB reached the maximum configured size."); - } + Status s = CheckSize(size_put); + if (!s.ok()) { + return s; } - Status s; - uint64_t blob_offset = 0; uint64_t key_offset = 0; { @@ -1910,7 +1948,12 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { } // directory change. Fsync - if (file_deleted) dir_ent_->Fsync(); + if (file_deleted) { + dir_ent_->Fsync(); + + // reset oldest_file_evicted flag + oldest_file_evicted_.store(false); + } // put files back into obsolete if for some reason, delete failed if (!tobsolete.empty()) { @@ -1924,13 +1967,18 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { } void BlobDBImpl::CopyBlobFiles( - std::vector>* bfiles_copy) { + std::vector>* bfiles_copy, + std::function&)> predicate) { ReadLock rl(&mutex_); - // take a copy - bfiles_copy->reserve(blob_files_.size()); for (auto const& p : blob_files_) { - bfiles_copy->push_back(p.second); + bool pred_value = true; + if (predicate) { + pred_value = predicate(p.second); + } + if (pred_value) { + bfiles_copy->push_back(p.second); + } } } diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index f90363008..fc36712be 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -205,6 +205,10 @@ class BlobDBImpl : public BlobDB { // how often to schedule check seq files period static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000; + // when should oldest file be evicted: + // on reaching 90% of blob_dir_size + static constexpr double kEvictOldestFileAtSize = 0.9; + using BlobDB::Put; Status Put(const WriteOptions& options, const Slice& key, const Slice& value) override; @@ -414,7 +418,9 @@ class BlobDBImpl : public BlobDB { bool FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, uint64_t blob_offset, uint64_t blob_size); - void CopyBlobFiles(std::vector>* bfiles_copy); + void CopyBlobFiles( + std::vector>* bfiles_copy, + std::function&)> predicate = {}); void FilterSubsetOfFiles( const std::vector>& blob_files, @@ -423,6 +429,12 @@ class BlobDBImpl : public BlobDB { uint64_t EpochNow() { return env_->NowMicros() / 1000000; } + Status CheckSize(size_t blob_size); + + std::shared_ptr GetOldestBlobFile(); + + bool EvictOldestBlobFile(); + // the base DB DBImpl* db_impl_; Env* env_; @@ -526,6 +538,8 @@ class BlobDBImpl : public BlobDB { bool open_p1_done_; uint32_t debug_level_; + + std::atomic oldest_file_evicted_; }; } // namespace blob_db diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 4dcf1a752..0eb4d791b 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -700,12 +700,15 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { VerifyDB({{"foo", "v2"}}); } -TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) { +// This test is no longer valid since we now return an error when we go +// over the configured blob_dir_size. +// The test needs to be re-written later in such a way that writes continue +// after a GC happens. +TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { // Use mock env to stop wall clock. Options options; options.env = mock_env_.get(); BlobDBOptions bdb_options; - bdb_options.is_fifo = true; bdb_options.blob_dir_size = 100; bdb_options.blob_file_size = 100; bdb_options.min_blob_size = 0; @@ -927,7 +930,7 @@ TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { } // Test to verify that a NoSpace IOError Status is returned on reaching -// blob_dir_size limit. +// blob_dir_size limit. TEST_F(BlobDBTest, OutOfSpace) { // Use mock env to stop wall clock. Options options; @@ -949,6 +952,41 @@ TEST_F(BlobDBTest, OutOfSpace) { ASSERT_TRUE(s.IsNoSpace()); } +TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) { + // Use mock env to stop wall clock. + Options options; + BlobDBOptions bdb_options; + bdb_options.blob_dir_size = 270; + bdb_options.blob_file_size = 100; + bdb_options.disable_background_tasks = true; + bdb_options.is_fifo = true; + Open(bdb_options); + + // Each stored blob has an overhead of about 32 bytes currently. + // So a 100 byte blob should take up 132 bytes. + std::string value(100, 'v'); + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10)); + + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + + // Adding another 100 byte blob would take the total size to 264 bytes + // (2*132), which is more than 90% of blob_dir_size. So, the oldest file + // should be evicted and put in obsolete files list. + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60)); + + auto obsolete_files = bdb_impl->TEST_GetObsoleteFiles(); + ASSERT_EQ(1, obsolete_files.size()); + ASSERT_TRUE(obsolete_files[0]->Immutable()); + ASSERT_EQ(blob_files[0]->BlobFileNumber(), + obsolete_files[0]->BlobFileNumber()); + + bdb_impl->TEST_DeleteObsoleteFiles(); + obsolete_files = bdb_impl->TEST_GetObsoleteFiles(); + ASSERT_TRUE(obsolete_files.empty()); +} + TEST_F(BlobDBTest, InlineSmallValues) { constexpr uint64_t kMaxExpiration = 1000; Random rnd(301);