diff --git a/db/db_impl.cc b/db/db_impl.cc index b4f6104a9..ad77d99bf 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1663,12 +1663,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { delete casted_s; } -bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) { +bool DBImpl::HasActiveSnapshotInRange(SequenceNumber lower_bound, + SequenceNumber upper_bound) { InstrumentedMutexLock l(&mutex_); - if (snapshots_.empty()) { - return false; - } - return (snapshots_.newest()->GetSequenceNumber() >= sn); + return snapshots_.HasSnapshotInRange(lower_bound, upper_bound); } #ifndef ROCKSDB_LITE diff --git a/db/db_impl.h b/db/db_impl.h index c89e1fc5b..bef2e5c25 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -227,7 +227,9 @@ class DBImpl : public DB { virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override; - bool HasActiveSnapshotLaterThanSN(SequenceNumber sn); + // Whether there is an active snapshot in range [lower_bound, upper_bound). + bool HasActiveSnapshotInRange(SequenceNumber lower_bound, + SequenceNumber upper_bound); #ifndef ROCKSDB_LITE using DB::ResetStats; diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index ad9c1a9fb..7dc405931 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -108,6 +108,22 @@ class SnapshotList { return ret; } + // Whether there is an active snapshot in range [lower_bound, upper_bound). + bool HasSnapshotInRange(SequenceNumber lower_bound, + SequenceNumber upper_bound) { + if (empty()) { + return false; + } + const SnapshotImpl* s = &list_; + while (s->next_ != &list_) { + if (s->next_->number_ >= lower_bound) { + return s->next_->number_ < upper_bound; + } + s = s->next_; + } + return false; + } + // get the sequence number of the most recent snapshot SequenceNumber GetNewest() { if (empty()) { diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 9aeaadbae..9cece1522 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -956,10 +956,24 @@ bool BlobDBImpl::EvictOldestBlobFile() { } WriteLock wl(&mutex_); - oldest_file->SetCanBeDeleted(); - obsolete_files_.push_front(oldest_file); - oldest_file_evicted_.store(true); - return true; + // Double check the file is not obsolete by others + if (oldest_file_evicted_ == false && !oldest_file->Obsolete()) { + auto expiration_range = oldest_file->GetExpirationRange(); + ROCKS_LOG_INFO(db_options_.info_log, + "Evict oldest blob file since DB out of space. Current " + "space used: %" PRIu64 ", blob dir size: %" PRIu64 + ", evicted blob file #%" PRIu64 + " with expiration range (%" PRIu64 ", %" PRIu64 ").", + total_blob_space_.load(), bdb_options_.blob_dir_size, + oldest_file->BlobFileNumber(), expiration_range.first, + expiration_range.second); + oldest_file->MarkObsolete(oldest_file->GetSequenceRange().second); + obsolete_files_.push_back(oldest_file); + oldest_file_evicted_.store(true); + return true; + } + + return false; } Status BlobDBImpl::CheckSize(size_t blob_size) { @@ -1299,27 +1313,12 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { return CloseBlobFile(bfile); } -bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked( +bool BlobDBImpl::VisibleToActiveSnapshot( const std::shared_ptr& bfile) { assert(bfile->Obsolete()); - - SequenceNumber esn = bfile->GetSequenceRange().first; - - // TODO(yiwu): Here we should check instead if there is an active snapshot - // lies between the first sequence in the file, and the last sequence by - // the time the file finished being garbage collect. - bool notok = db_impl_->HasActiveSnapshotLaterThanSN(esn); - if (notok) { - ROCKS_LOG_INFO(db_options_.info_log, - "Could not delete file due to snapshot failure %s", - bfile->PathName().c_str()); - return false; - } else { - ROCKS_LOG_INFO(db_options_.info_log, - "Will delete file due to snapshot success %s", - bfile->PathName().c_str()); - return true; - } + SequenceNumber first_sequence = bfile->GetSequenceRange().first; + SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence(); + return db_impl_->HasActiveSnapshotInRange(first_sequence, obsolete_sequence); } bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, @@ -1697,7 +1696,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/, nullptr /*read_callback*/, &is_blob_index); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); - if (!get_status.ok() && !get_status.ok()) { + if (!get_status.ok() && !get_status.IsNotFound()) { // error s = get_status; ROCKS_LOG_ERROR(db_options_.info_log, @@ -1814,6 +1813,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, &rewrite_batch, &callback); } if (rewrite_status.ok()) { + newfile->ExtendSequenceRange( + WriteBatchInternal::Sequence(&rewrite_batch)); gc_stats->relocate_succeeded++; } else if (rewrite_status.IsBusy()) { // The key is overwritten in the meanwhile. Drop the blob record. @@ -1827,6 +1828,17 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } } // end of ReadRecord loop + if (s.ok()) { + SequenceNumber obsolete_sequence = + newfile == nullptr ? bfptr->GetSequenceRange().second + 1 + : newfile->GetSequenceRange().second; + bfptr->MarkObsolete(obsolete_sequence); + if (!first_gc) { + WriteLock wl(&mutex_); + obsolete_files_.push_back(bfptr); + } + } + ROCKS_LOG_INFO( db_options_.info_log, "%s blob file %" PRIu64 @@ -1935,11 +1947,17 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { auto bfile = *iter; { ReadLock lockbfile_r(&bfile->mutex_); - if (!FileDeleteOk_SnapshotCheckLocked(bfile)) { + if (VisibleToActiveSnapshot(bfile)) { + ROCKS_LOG_INFO(db_options_.info_log, + "Could not delete file due to snapshot failure %s", + bfile->PathName().c_str()); ++iter; continue; } } + ROCKS_LOG_INFO(db_options_.info_log, + "Will delete file due to snapshot success %s", + bfile->PathName().c_str()); blob_files_.erase(bfile->BlobFileNumber()); Status s = env_->DeleteFile(bfile->PathName()); @@ -2069,8 +2087,6 @@ std::pair BlobDBImpl::RunGC(bool aborted) { FilterSubsetOfFiles(blob_files, &to_process, current_epoch_, files_to_collect); - // in this collect the set of files, which became obsolete - std::vector> obsoletes; for (auto bfile : to_process) { GCStats gc_stats; Status s = GCFileAndUpdateLSM(bfile, &gc_stats); @@ -2084,16 +2100,6 @@ std::pair BlobDBImpl::RunGC(bool aborted) { bfile->deleted_size_ = gc_stats.deleted_size; bfile->deleted_count_ = gc_stats.num_deletes; bfile->gc_once_after_open_ = false; - } else { - obsoletes.push_back(bfile); - } - } - - if (!obsoletes.empty()) { - WriteLock wl(&mutex_); - for (auto bfile : obsoletes) { - bfile->SetCanBeDeleted(); - obsolete_files_.push_front(bfile); } } @@ -2190,16 +2196,6 @@ Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, } void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } - -void BlobDBImpl::TEST_ObsoleteFile(std::shared_ptr& bfile) { - uint64_t number = bfile->BlobFileNumber(); - assert(blob_files_.count(number) > 0); - bfile->SetCanBeDeleted(); - { - WriteLock l(&mutex_); - obsolete_files_.push_back(bfile); - } -} #endif // !NDEBUG } // namespace blob_db diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index fc36712be..9881107d3 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -279,8 +279,6 @@ class BlobDBImpl : public BlobDB { void TEST_RunGC(); - void TEST_ObsoleteFile(std::shared_ptr& bfile); - void TEST_DeleteObsoleteFiles(); #endif // !NDEBUG @@ -411,6 +409,7 @@ class BlobDBImpl : public BlobDB { // checks if there is no snapshot which is referencing the // blobs + bool VisibleToActiveSnapshot(const std::shared_ptr& file); bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr& bfile); bool MarkBlobDeleted(const Slice& key, const Slice& lsmValue); diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 9ffdb234f..1c9493565 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -63,6 +63,22 @@ class BlobDBTest : public testing::Test { } } + BlobDBImpl *blob_db_impl() { + return reinterpret_cast(blob_db_); + } + + Status Put(const Slice &key, const Slice &value) { + return blob_db_->Put(WriteOptions(), key, value); + } + + void Delete(const std::string &key, + std::map *data = nullptr) { + ASSERT_OK(blob_db_->Delete(WriteOptions(), key)); + if (data != nullptr) { + data->erase(key); + } + } + void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd, std::map *data = nullptr) { int len = rnd->Next() % kMaxBlobSize + 1; @@ -111,14 +127,6 @@ class BlobDBTest : public testing::Test { } } - void Delete(const std::string &key, - std::map *data = nullptr) { - ASSERT_OK(blob_db_->Delete(WriteOptions(), key)); - if (data != nullptr) { - data->erase(key); - } - } - // Verify blob db contain expected data and nothing more. void VerifyDB(const std::map &data) { VerifyDB(blob_db_, data); @@ -593,16 +601,14 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) { bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); DBImpl *db_impl = static_cast_with_check(blob_db_->GetBaseDB()); std::map data; for (int i = 0; i < 200; i++) { PutRandom("key" + ToString(i), &rnd, &data); } - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); // Test for data in SST size_t new_keys = 0; for (int i = 0; i < 100; i++) { @@ -620,7 +626,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) { } } GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(200, gc_stats.blob_count); ASSERT_EQ(0, gc_stats.num_deletes); ASSERT_EQ(200 - new_keys, gc_stats.num_relocate); @@ -634,11 +640,9 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { bdb_options.disable_background_tasks = true; Open(bdb_options); ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1")); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); SyncPoint::GetInstance()->LoadDependency( {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", @@ -651,7 +655,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); }); GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); ASSERT_EQ(0, gc_stats.num_deletes); ASSERT_EQ(1, gc_stats.num_relocate); @@ -671,11 +675,9 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { Open(bdb_options, options); mock_env_->set_current_time(100); ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); mock_env_->set_current_time(300); SyncPoint::GetInstance()->LoadDependency( @@ -690,7 +692,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { }); GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); ASSERT_EQ(1, gc_stats.num_deletes); ASSERT_EQ(0, gc_stats.delete_succeeded); @@ -719,9 +721,7 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { for (int i = 0; i < 10; i++) { ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value)); } - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(11, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); ASSERT_TRUE(blob_files[0]->Immutable()); @@ -731,9 +731,9 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { ASSERT_TRUE(blob_files[i]->Immutable()); } } - blob_db_impl->TEST_RunGC(); + blob_db_impl()->TEST_RunGC(); // The oldest simple blob file (i.e. blob_files[1]) has been selected for GC. - auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles(); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); ASSERT_EQ(1, obsolete_files.size()); ASSERT_EQ(blob_files[1]->BlobFileNumber(), obsolete_files[0]->BlobFileNumber()); @@ -747,13 +747,11 @@ TEST_F(BlobDBTest, ReadWhileGC) { bdb_options.disable_background_tasks = true; Open(bdb_options); blob_db_->Put(WriteOptions(), "foo", "bar"); - BlobDBImpl *blob_db_impl = - static_cast_with_check(blob_db_); - auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); std::shared_ptr bfile = blob_files[0]; uint64_t bfile_number = bfile->BlobFileNumber(); - ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(bfile)); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); switch (i) { case 0: @@ -791,17 +789,16 @@ TEST_F(BlobDBTest, ReadWhileGC) { TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1"); GCStats gc_stats; - ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); ASSERT_EQ(1, gc_stats.blob_count); ASSERT_EQ(1, gc_stats.num_relocate); ASSERT_EQ(1, gc_stats.relocate_succeeded); - blob_db_impl->TEST_ObsoleteFile(blob_files[0]); - blob_db_impl->TEST_DeleteObsoleteFiles(); + blob_db_impl()->TEST_DeleteObsoleteFiles(); // The file shouln't be deleted - blob_files = blob_db_impl->TEST_GetBlobFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(2, blob_files.size()); ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber()); - auto obsolete_files = blob_db_impl->TEST_GetObsoleteFiles(); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); ASSERT_EQ(1, obsolete_files.size()); ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber()); TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2"); @@ -809,16 +806,85 @@ TEST_F(BlobDBTest, ReadWhileGC) { SyncPoint::GetInstance()->DisableProcessing(); // The file is deleted this time - blob_db_impl->TEST_DeleteObsoleteFiles(); - blob_files = blob_db_impl->TEST_GetBlobFiles(); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber()); - ASSERT_EQ(0, blob_db_impl->TEST_GetObsoleteFiles().size()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); VerifyDB({{"foo", "bar"}}); Destroy(); } } +TEST_F(BlobDBTest, SnapshotAndGarbageCollection) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + // i = when to take snapshot + for (int i = 0; i < 4; i++) { + for (bool delete_key : {true, false}) { + const Snapshot *snapshot = nullptr; + Destroy(); + Open(bdb_options); + // First file + ASSERT_OK(Put("key1", "value")); + if (i == 0) { + snapshot = blob_db_->GetSnapshot(); + } + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + // Second file + ASSERT_OK(Put("key2", "value")); + if (i == 1) { + snapshot = blob_db_->GetSnapshot(); + } + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + auto bfile = blob_files[1]; + ASSERT_FALSE(bfile->Immutable()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + // Third file + ASSERT_OK(Put("key3", "value")); + if (i == 2) { + snapshot = blob_db_->GetSnapshot(); + } + if (delete_key) { + Delete("key2"); + } + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_TRUE(bfile->Obsolete()); + ASSERT_EQ(1, gc_stats.blob_count); + if (delete_key) { + ASSERT_EQ(0, gc_stats.num_relocate); + ASSERT_EQ(bfile->GetSequenceRange().second + 1, + bfile->GetObsoleteSequence()); + } else { + ASSERT_EQ(1, gc_stats.num_relocate); + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), + bfile->GetObsoleteSequence()); + } + if (i == 3) { + snapshot = blob_db_->GetSnapshot(); + } + size_t num_files = delete_key ? 3 : 4; + ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + if (i == 0 || i == 3 || (i == 2 && delete_key)) { + // The snapshot shouldn't see data in bfile + ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); + } else { + // The snapshot will see data in bfile, so the file shouldn't be deleted + ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); + } + } + } +} + TEST_F(BlobDBTest, ColumnFamilyNotSupported) { Options options; options.env = mock_env_.get(); @@ -962,7 +1028,7 @@ TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) { bdb_options.is_fifo = true; Open(bdb_options); - // Each stored blob has an overhead of about 32 bytes currently. + // Each stored blob has an overhead of 32 bytes currently. // So a 100 byte blob should take up 132 bytes. std::string value(100, 'v'); ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10)); diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index d50256ca6..bbd885725 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -36,7 +36,7 @@ BlobFile::BlobFile() deleted_count_(0), deleted_size_(0), closed_(false), - can_be_deleted_(false), + obsolete_(false), gc_once_after_open_(false), expiration_range_({0, 0}), sequence_range_({kMaxSequenceNumber, 0}), @@ -55,7 +55,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) deleted_count_(0), deleted_size_(0), closed_(false), - can_be_deleted_(false), + obsolete_(false), gc_once_after_open_(false), expiration_range_({0, 0}), sequence_range_({kMaxSequenceNumber, 0}), @@ -64,7 +64,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) header_valid_(false) {} BlobFile::~BlobFile() { - if (can_be_deleted_) { + if (obsolete_) { std::string pn(PathName()); Status s = Env::Default()->DeleteFile(PathName()); if (!s.ok()) { @@ -110,17 +110,21 @@ std::string BlobFile::DumpState() const { "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64 " file_size: %" PRIu64 " deleted_count: %" PRIu64 " deleted_size: %" PRIu64 - " closed: %d can_be_deleted: %d expiration_range: (%" PRIu64 - ", %" PRIu64 ") sequence_range: (%" PRIu64 " %" PRIu64 - "), writer: %d reader: %d", + " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 + ") sequence_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d", path_to_dir_.c_str(), file_number_, blob_count_.load(), gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_, - closed_.load(), can_be_deleted_.load(), expiration_range_.first, + closed_.load(), obsolete_.load(), expiration_range_.first, expiration_range_.second, sequence_range_.first, sequence_range_.second, (!!log_writer_), (!!ra_file_reader_)); return str; } +void BlobFile::MarkObsolete(SequenceNumber sequence) { + obsolete_sequence_ = sequence; + obsolete_.store(true); +} + bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const { assert(last_fsync_ <= file_size_); return (hard) ? file_size_ > last_fsync_ diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 455383448..239e8e1c5 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -63,8 +63,12 @@ class BlobFile { std::atomic closed_; // has a pass of garbage collection successfully finished on this file - // can_be_deleted_ still needs to do iterator/snapshot checks - std::atomic can_be_deleted_; + // obsolete_ still needs to do iterator/snapshot checks + std::atomic obsolete_; + + // The last sequence number by the time the file marked as obsolete. + // Data in this file is visible to a snapshot taken before the sequence. + SequenceNumber obsolete_sequence_; // should this file been gc'd once to reconcile lost deletes/compactions std::atomic gc_once_after_open_; @@ -91,6 +95,8 @@ class BlobFile { bool header_valid_; + SequenceNumber garbage_collection_finish_sequence_; + public: BlobFile(); @@ -117,7 +123,19 @@ class BlobFile { std::string DumpState() const; // if the file has gone through GC and blobs have been relocated - bool Obsolete() const { return can_be_deleted_.load(); } + bool Obsolete() const { + assert(Immutable() || !obsolete_.load()); + return obsolete_.load(); + } + + // Mark file as obsolete by garbage collection. The file is not visible to + // snapshots with sequence greater or equal to the given sequence. + void MarkObsolete(SequenceNumber sequence); + + SequenceNumber GetObsoleteSequence() const { + assert(Obsolete()); + return obsolete_sequence_; + } // if the file is not taking any more appends. bool Immutable() const { return closed_.load(); } @@ -125,6 +143,8 @@ class BlobFile { // we will assume this is atomic bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const; + void Fsync(); + uint64_t GetFileSize() const { return file_size_.load(std::memory_order_acquire); } @@ -155,8 +175,6 @@ class BlobFile { std::shared_ptr GetWriter() const { return log_writer_; } - void Fsync(); - private: std::shared_ptr OpenSequentialReader( Env* env, const DBOptions& db_options, @@ -183,8 +201,6 @@ class BlobFile { void SetFileSize(uint64_t fs) { file_size_ = fs; } void SetBlobCount(uint64_t bc) { blob_count_ = bc; } - - void SetCanBeDeleted() { can_be_deleted_ = true; } }; } // namespace blob_db } // namespace rocksdb