From 7bfa88037e71137764c8a72464be5cfca903bb1b Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 2 Nov 2017 15:50:30 -0700 Subject: [PATCH] Blob DB: fix snapshot handling Summary: Blob db will keep blob file if data in the file is visible to an active snapshot. Before this patch it checks whether there is an active snapshot has sequence number greater than the earliest sequence in the file. This is problematic since we take snapshot on every read, if it keep having reads, old blob files will not be cleanup. Change to check if there is an active snapshot falls in the range of [earliest_sequence, obsolete_sequence) where obsolete sequence is 1. if data is relocated to another file by garbage collection, it is the latest sequence at the time garbage collection finish 2. otherwise, it is the latest sequence of the file Closes https://github.com/facebook/rocksdb/pull/3087 Differential Revision: D6182519 Pulled By: yiwu-arbug fbshipit-source-id: cdf4c35281f782eb2a9ad6a87b6727bbdff27a45 --- db/db_impl.cc | 8 +- db/db_impl.h | 4 +- db/snapshot_impl.h | 16 ++++ utilities/blob_db/blob_db_impl.cc | 90 +++++++++--------- utilities/blob_db/blob_db_impl.h | 3 +- utilities/blob_db/blob_db_test.cc | 148 +++++++++++++++++++++--------- utilities/blob_db/blob_file.cc | 18 ++-- utilities/blob_db/blob_file.h | 30 ++++-- 8 files changed, 207 insertions(+), 110 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index b4f6104a9..ad77d99bf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1663,12 +1663,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { delete casted_s; } -bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) { +bool DBImpl::HasActiveSnapshotInRange(SequenceNumber lower_bound, + SequenceNumber upper_bound) { InstrumentedMutexLock l(&mutex_); - if (snapshots_.empty()) { - return false; - } - return (snapshots_.newest()->GetSequenceNumber() >= sn); + return snapshots_.HasSnapshotInRange(lower_bound, upper_bound); } #ifndef ROCKSDB_LITE diff --git a/db/db_impl.h b/db/db_impl.h index c89e1fc5b..bef2e5c25 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -227,7 +227,9 @@ class DBImpl : public DB { virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override; - bool HasActiveSnapshotLaterThanSN(SequenceNumber sn); + // Whether there is an active snapshot in range [lower_bound, upper_bound). + bool HasActiveSnapshotInRange(SequenceNumber lower_bound, + SequenceNumber upper_bound); #ifndef ROCKSDB_LITE using DB::ResetStats; diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index ad9c1a9fb..7dc405931 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -108,6 +108,22 @@ class SnapshotList { return ret; } + // Whether there is an active snapshot in range [lower_bound, upper_bound). + bool HasSnapshotInRange(SequenceNumber lower_bound, + SequenceNumber upper_bound) { + if (empty()) { + return false; + } + const SnapshotImpl* s = &list_; + while (s->next_ != &list_) { + if (s->next_->number_ >= lower_bound) { + return s->next_->number_ < upper_bound; + } + s = s->next_; + } + return false; + } + // get the sequence number of the most recent snapshot SequenceNumber GetNewest() { if (empty()) { diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 9aeaadbae..9cece1522 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -956,10 +956,24 @@ bool BlobDBImpl::EvictOldestBlobFile() { } WriteLock wl(&mutex_); - oldest_file->SetCanBeDeleted(); - obsolete_files_.push_front(oldest_file); - oldest_file_evicted_.store(true); - return true; + // Double check the file is not obsolete by others + if (oldest_file_evicted_ == false && !oldest_file->Obsolete()) { + auto expiration_range = oldest_file->GetExpirationRange(); + ROCKS_LOG_INFO(db_options_.info_log, + "Evict oldest blob file since DB out of space. Current " + "space used: %" PRIu64 ", blob dir size: %" PRIu64 + ", evicted blob file #%" PRIu64 + " with expiration range (%" PRIu64 ", %" PRIu64 ").", + total_blob_space_.load(), bdb_options_.blob_dir_size, + oldest_file->BlobFileNumber(), expiration_range.first, + expiration_range.second); + oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second); + obsolete_files_.push_back(oldest_file); + oldest_file_evicted_.store(true); + return true; + } + + return false; } Status BlobDBImpl::CheckSize(size_t blob_size) { @@ -1299,27 +1313,12 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { return CloseBlobFile(bfile); } -bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked( +bool BlobDBImpl::VisibleToActiveSnapshot( const std::shared_ptr& bfile) { assert(bfile->Obsolete()); - - SequenceNumber esn = bfile->GetSequenceRange().first; - - // TODO(yiwu): Here we should check instead if there is an active snapshot - // lies between the first sequence in the file, and the last sequence by - // the time the file finished being garbage collect. - bool notok = db_impl_->HasActiveSnapshotLaterThanSN(esn); - if (notok) { - ROCKS_LOG_INFO(db_options_.info_log, - "Could not delete file due to snapshot failure %s", - bfile->PathName().c_str()); - return false; - } else { - ROCKS_LOG_INFO(db_options_.info_log, - "Will delete file due to snapshot success %s", - bfile->PathName().c_str()); - return true; - } + SequenceNumber first_sequence = bfile->GetSequenceRange().first; + SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence(); + return db_impl_->HasActiveSnapshotInRange(first_sequence, obsolete_sequence); } bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, @@ -1697,7 +1696,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/, nullptr /*read_callback*/, &is_blob_index); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); - if (!get_status.ok() && !get_status.ok()) { + if (!get_status.ok() && !get_status.IsNotFound()) { // error s = get_status; ROCKS_LOG_ERROR(db_options_.info_log, @@ -1814,6 +1813,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, &rewrite_batch, &callback); } if (rewrite_status.ok()) { + newfile->ExtendSequenceRange( + WriteBatchInternal::Sequence(&rewrite_batch)); gc_stats->relocate_succeeded++; } else if (rewrite_status.IsBusy()) { // The key is overwritten in the meanwhile. Drop the blob record. @@ -1827,6 +1828,17 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } } // end of ReadRecord loop + if (s.ok()) { + SequenceNumber obsolete_sequence = + newfile == nullptr ? bfptr->GetSequenceRange().second + 1 + : newfile->GetSequenceRange().second; + bfptr->MarkObsolete(obsolete_sequence); + if (!first_gc) { + WriteLock wl(&mutex_); + obsolete_files_.push_back(bfptr); + } + } + ROCKS_LOG_INFO( db_options_.info_log, "%s blob file %" PRIu64 @@ -1935,11 +1947,17 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { auto bfile = *iter; { ReadLock lockbfile_r(&bfile->mutex_); - if (!FileDeleteOk_SnapshotCheckLocked(bfile)) { + if (VisibleToActiveSnapshot(bfile)) { + ROCKS_LOG_INFO(db_options_.info_log, + "Could not delete file due to snapshot failure %s", + bfile->PathName().c_str()); ++iter; continue; } } + ROCKS_LOG_INFO(db_options_.info_log, + "Will delete file due to snapshot success %s", + bfile->PathName().c_str()); blob_files_.erase(bfile->BlobFileNumber()); Status s = env_->DeleteFile(bfile->PathName()); @@ -2069,8 +2087,6 @@ std::pair BlobDBImpl::RunGC(bool aborted) { FilterSubsetOfFiles(blob_files, &to_process, current_epoch_, files_to_collect); - // in this collect the set of files, which became obsolete - std::vector> obsoletes; for (auto bfile : to_process) { GCStats gc_stats; Status s = GCFileAndUpdateLSM(bfile, &gc_stats); @@ -2084,16 +2100,6 @@ std::pair BlobDBImpl::RunGC(bool aborted) { bfile->deleted_size_ = gc_stats.deleted_size; bfile->deleted_count_ = gc_stats.num_deletes; bfile->gc_once_after_open_ = false; - } else { - obsoletes.push_back(bfile); - } - } - - if (!obsoletes.empty()) { - WriteLock wl(&mutex_); - for (auto bfile : obsoletes) { - bfile->SetCanBeDeleted(); - obsolete_files_.push_front(bfile); } } @@ -2190,16 +2196,6 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, } void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } - -void BlobDBImpl::TEST_ObsoleteFile(std::shared_ptr& bfile) { - uint64_t number = bfile->BlobFileNumber(); - assert(blob_files_.count(number) > 0); - bfile->SetCanBeDeleted(); - { - WriteLock l(&mutex_); - obsolete_files_.push_back(bfile); - } -} #endif // !NDEBUG } // namespace blob_db diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index fc36712be..9881107d3 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -279,8 +279,6 @@ class BlobDBImpl : public BlobDB { void TEST_RunGC(); - void TEST_ObsoleteFile(std::shared_ptr& bfile); - void TEST_DeleteObsoleteFiles(); #endif // !NDEBUG @@ -411,6 +409,7 @@ class BlobDBImpl : public BlobDB { // checks if there is no snapshot which is referencing the // blobs + bool VisibleToActiveSnapshot(const std::shared_ptr& file); bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr& bfile); bool MarkBlobDeleted(const Slice& key, const Slice& lsmValue); diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 9ffdb234f..1c9493565 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -63,6 +63,22 @@ class BlobDBTest : public testing::Test { } } + BlobDBImpl *blob_db_impl() { + return reinterpret_cast(blob_db_); + } + + Status Put(const Slice &key, const Slice &value) { + return blob_db_->Put(WriteOptions(), key, value); + } + + void Delete(const std::string &key, + std::map *data = nullptr) { + ASSERT_OK(blob_db_->Delete(WriteOptions(), key)); + if (data != nullptr) { + data->erase(key); + } + } + void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd, std::map *data = nullptr) { int len = rnd->Next() % kMaxBlobSize + 1; @@ -111,14 +127,6 @@ class BlobDBTest : public testing::Test { } } - void Delete(const std::string &key, - std::map *data = nullptr) { - ASSERT_OK(blob_db_->Delete(WriteOptions(), key)); - if (data != nullptr) { - data->erase(key); - } - } - // Verify blob db contain expected data and nothing more. void VerifyDB(const std::map &data) { VerifyDB(blob_db_, data); @@ -593,16 +601,14 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) { bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); DBImpl *db_impl = static_cast_with_check(blob_db_->GetBaseDB()); std::map data; for (int i = 0; i < 200; i++) { PutRandom("key" + ToString(i), &rnd, &data); } - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); // Test for data in SST size_t new_keys = 0; for (int i = 0; i < 100; i++) { @@ -620,7 +626,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) { } } GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(200, gc_stats.blob_count); ASSERT_EQ(0, gc_stats.num_deletes); ASSERT_EQ(200 - new_keys, gc_stats.num_relocate); @@ -634,11 +640,9 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { bdb_options.disable_background_tasks = true; Open(bdb_options); ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1")); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); SyncPoint::GetInstance()->LoadDependency( {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", @@ -651,7 +655,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); }); GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); ASSERT_EQ(0, gc_stats.num_deletes); ASSERT_EQ(1, gc_stats.num_relocate); @@ -671,11 +675,9 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { Open(bdb_options, options); mock_env_->set_current_time(100); ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); mock_env_->set_current_time(300); SyncPoint::GetInstance()->LoadDependency( @@ -690,7 +692,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { }); GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); ASSERT_EQ(1, gc_stats.num_deletes); ASSERT_EQ(0, gc_stats.delete_succeeded); @@ -719,9 +721,7 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { for (int i = 0; i < 10; i++) { ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value)); } - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(11, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); ASSERT_TRUE(blob_files[0]->Immutable()); @@ -731,9 +731,9 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { ASSERT_TRUE(blob_files[i]->Immutable()); } } - blob_db_impl->TEST_RunGC(); + blob_db_impl()->TEST_RunGC(); // The oldest simple blob file (i.e. blob_files[1]) has been selected for GC. - auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles(); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); ASSERT_EQ(1, obsolete_files.size()); ASSERT_EQ(blob_files[1]->BlobFileNumber(), obsolete_files[0]->BlobFileNumber()); @@ -747,13 +747,11 @@ TEST_F(BlobDBTest, ReadWhileGC) { bdb_options.disable_background_tasks = true; Open(bdb_options); blob_db_->Put(WriteOptions(), "foo", "bar"); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); std::shared_ptr bfile = blob_files[0]; uint64_t bfile_number = bfile->BlobFileNumber(); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(bfile)); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); switch (i) { case 0: @@ -791,17 +789,16 @@ TEST_F(BlobDBTest, ReadWhileGC) { TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1"); GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); ASSERT_EQ(1, gc_stats.num_relocate); ASSERT_EQ(1, gc_stats.relocate_succeeded); - blob_db_impl->TEST_ObsoleteFile(blob_files[0]); - blob_db_impl->TEST_DeleteObsoleteFiles(); + blob_db_impl()->TEST_DeleteObsoleteFiles(); // The file shouln't be deleted - blob_files = blob_db_impl->TEST_GetBlobFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(2, blob_files.size()); ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber()); - auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles(); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); ASSERT_EQ(1, obsolete_files.size()); ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber()); TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2"); @@ -809,16 +806,85 @@ TEST_F(BlobDBTest, ReadWhileGC) { SyncPoint::GetInstance()->DisableProcessing(); // The file is deleted this time - blob_db_impl->TEST_DeleteObsoleteFiles(); - blob_files = blob_db_impl->TEST_GetBlobFiles(); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber()); - ASSERT_EQ(0, blob_db_impl->TEST_GetObsoleteFiles().size()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); VerifyDB({{"foo", "bar"}}); Destroy(); } } +TEST_F(BlobDBTest, SnapshotAndGarbageCollection) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + // i = when to take snapshot + for (int i = 0; i < 4; i++) { + for (bool delete_key : {true, false}) { + const Snapshot *snapshot = nullptr; + Destroy(); + Open(bdb_options); + // First file + ASSERT_OK(Put("key1", "value")); + if (i == 0) { + snapshot = blob_db_->GetSnapshot(); + } + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + // Second file + ASSERT_OK(Put("key2", "value")); + if (i == 1) { + snapshot = blob_db_->GetSnapshot(); + } + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + auto bfile = blob_files[1]; + ASSERT_FALSE(bfile->Immutable()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + // Third file + ASSERT_OK(Put("key3", "value")); + if (i == 2) { + snapshot = blob_db_->GetSnapshot(); + } + if (delete_key) { + Delete("key2"); + } + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_TRUE(bfile->Obsolete()); + ASSERT_EQ(1, gc_stats.blob_count); + if (delete_key) { + ASSERT_EQ(0, gc_stats.num_relocate); + ASSERT_EQ(bfile->GetSequenceRange().second + 1, + bfile->GetObsoleteSequence()); + } else { + ASSERT_EQ(1, gc_stats.num_relocate); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), + bfile->GetObsoleteSequence()); + } + if (i == 3) { + snapshot = blob_db_->GetSnapshot(); + } + size_t num_files = delete_key ? 3 : 4; + ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + if (i == 0 || i == 3 || (i == 2 && delete_key)) { + // The snapshot shouldn't see data in bfile + ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); + } else { + // The snapshot will see data in bfile, so the file shouldn't be deleted + ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); + } + } + } +} + TEST_F(BlobDBTest, ColumnFamilyNotSupported) { Options options; options.env = mock_env_.get(); @@ -962,7 +1028,7 @@ TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) { bdb_options.is_fifo = true; Open(bdb_options); - // Each stored blob has an overhead of about 32 bytes currently. + // Each stored blob has an overhead of 32 bytes currently. // So a 100 byte blob should take up 132 bytes. std::string value(100, 'v'); ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10)); diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index d50256ca6..bbd885725 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -36,7 +36,7 @@ BlobFile::BlobFile() deleted_count_(0), deleted_size_(0), closed_(false), - can_be_deleted_(false), + obsolete_(false), gc_once_after_open_(false), expiration_range_({0, 0}), sequence_range_({kMaxSequenceNumber, 0}), @@ -55,7 +55,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) deleted_count_(0), deleted_size_(0), closed_(false), - can_be_deleted_(false), + obsolete_(false), gc_once_after_open_(false), expiration_range_({0, 0}), sequence_range_({kMaxSequenceNumber, 0}), @@ -64,7 +64,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) header_valid_(false) {} BlobFile::~BlobFile() { - if (can_be_deleted_) { + if (obsolete_) { std::string pn(PathName()); Status s = Env::Default()->DeleteFile(PathName()); if (!s.ok()) { @@ -110,17 +110,21 @@ std::string BlobFile::DumpState() const { "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64 " file_size: %" PRIu64 " deleted_count: %" PRIu64 " deleted_size: %" PRIu64 - " closed: %d can_be_deleted: %d expiration_range: (%" PRIu64 - ", %" PRIu64 ") sequence_range: (%" PRIu64 " %" PRIu64 - "), writer: %d reader: %d", + " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 + ") sequence_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d", path_to_dir_.c_str(), file_number_, blob_count_.load(), gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_, - closed_.load(), can_be_deleted_.load(), expiration_range_.first, + closed_.load(), obsolete_.load(), expiration_range_.first, expiration_range_.second, sequence_range_.first, sequence_range_.second, (!!log_writer_), (!!ra_file_reader_)); return str; } +void BlobFile::MarkObsolete(SequenceNumber sequence) { + obsolete_sequence_ = sequence; + obsolete_.store(true); +} + bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const { assert(last_fsync_ <= file_size_); return (hard) ? file_size_ > last_fsync_ diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 455383448..239e8e1c5 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -63,8 +63,12 @@ class BlobFile { std::atomic closed_; // has a pass of garbage collection successfully finished on this file - // can_be_deleted_ still needs to do iterator/snapshot checks - std::atomic can_be_deleted_; + // obsolete_ still needs to do iterator/snapshot checks + std::atomic obsolete_; + + // The last sequence number by the time the file marked as obsolete. + // Data in this file is visible to a snapshot taken before the sequence. + SequenceNumber obsolete_sequence_; // should this file been gc'd once to reconcile lost deletes/compactions std::atomic gc_once_after_open_; @@ -91,6 +95,8 @@ class BlobFile { bool header_valid_; + SequenceNumber garbage_collection_finish_sequence_; + public: BlobFile(); @@ -117,7 +123,19 @@ class BlobFile { std::string DumpState() const; // if the file has gone through GC and blobs have been relocated - bool Obsolete() const { return can_be_deleted_.load(); } + bool Obsolete() const { + assert(Immutable() || !obsolete_.load()); + return obsolete_.load(); + } + + // Mark file as obsolete by garbage collection. The file is not visible to + // snapshots with sequence greater or equal to the given sequence. + void MarkObsolete(SequenceNumber sequence); + + SequenceNumber GetObsoleteSequence() const { + assert(Obsolete()); + return obsolete_sequence_; + } // if the file is not taking any more appends. bool Immutable() const { return closed_.load(); } @@ -125,6 +143,8 @@ class BlobFile { // we will assume this is atomic bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const; + void Fsync(); + uint64_t GetFileSize() const { return file_size_.load(std::memory_order_acquire); } @@ -155,8 +175,6 @@ class BlobFile { std::shared_ptr GetWriter() const { return log_writer_; } - void Fsync(); - private: std::shared_ptr OpenSequentialReader( Env* env, const DBOptions& db_options, @@ -183,8 +201,6 @@ class BlobFile { void SetFileSize(uint64_t fs) { file_size_ = fs; } void SetBlobCount(uint64_t bc) { blob_count_ = bc; } - - void SetCanBeDeleted() { can_be_deleted_ = true; } }; } // namespace blob_db } // namespace rocksdb