diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index d787529b1..553f89f2a 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -891,7 +891,10 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { } if (blob_inserter.has_put()) { - CloseIf(blob_inserter.last_file()); + s = CloseBlobFileIfNeeded(blob_inserter.last_file()); + if (!s.ok()) { + return s; + } } // add deleted key to list of keys that have been deleted for book-keeping @@ -1022,7 +1025,9 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, extendTTL(&(bfile->ttl_range_), expiration); } - CloseIf(bfile); + if (s.ok()) { + s = CloseBlobFileIfNeeded(bfile); + } TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish"); return s; @@ -1362,58 +1367,44 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { return std::make_pair(true, -1); } -std::pair BlobDBImpl::CloseSeqWrite( - std::shared_ptr bfile, bool aborted) { +Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { + Status s; + ROCKS_LOG_INFO(db_options_.info_log, "Close blob file %" PRIu64, + bfile->BlobFileNumber()); { WriteLock wl(&mutex_); - // this prevents others from picking up this file - open_blob_files_.erase(bfile); - - auto findit = - std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile); - if (findit != open_simple_files_.end()) open_simple_files_.erase(findit); + if (bfile->HasTTL()) { + size_t erased __attribute__((__unused__)) = open_blob_files_.erase(bfile); + assert(erased == 1); + } else { + auto iter = std::find(open_simple_files_.begin(), + open_simple_files_.end(), bfile); + assert(iter != open_simple_files_.end()); + open_simple_files_.erase(iter); + } } if (!bfile->closed_.load()) { WriteLock lockbfile_w(&bfile->mutex_); - bfile->WriteFooterAndCloseLocked(); + s = bfile->WriteFooterAndCloseLocked(); } - return std::make_pair(false, -1); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to close blob file %" PRIu64 "with error: %s", + bfile->BlobFileNumber(), s.ToString().c_str()); + } + + return s; } -void BlobDBImpl::CloseIf(const std::shared_ptr& bfile) { +Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { // atomic read - bool close = bfile->GetFileSize() > bdb_options_.blob_file_size; - if (!close) return; - - if (debug_level_ >= 2) { - ROCKS_LOG_DEBUG(db_options_.info_log, - "Scheduling file for close %s fsize: %" PRIu64 - " limit: %" PRIu64, - bfile->PathName().c_str(), bfile->GetFileSize(), - bdb_options_.blob_file_size); + if (bfile->GetFileSize() < bdb_options_.blob_file_size) { + return Status::OK(); } - - { - WriteLock wl(&mutex_); - - open_blob_files_.erase(bfile); - auto findit = - std::find(open_simple_files_.begin(), open_simple_files_.end(), bfile); - if (findit != open_simple_files_.end()) { - open_simple_files_.erase(findit); - } else { - ROCKS_LOG_WARN(db_options_.info_log, - "File not found while closing %s fsize: %" PRIu64 - " Multithreaded Writes?", - bfile->PathName().c_str(), bfile->GetFileSize()); - } - } - - tqueue_.add(0, std::bind(&BlobDBImpl::CloseSeqWrite, this, bfile, - std::placeholders::_1)); + return CloseBlobFile(bfile); } bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked( @@ -1585,7 +1576,7 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { } for (auto bfile : process_files) { - CloseSeqWrite(bfile, false); + CloseBlobFile(bfile); } return std::make_pair(true, -1); @@ -1916,7 +1907,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, delete transaction; } ROCKS_LOG_INFO( - db_options_.info_log, "%s blob file %" PRIu64 ".", + db_options_.info_log, + "%s blob file %" PRIu64 ". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64 " succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.", s.ok() ? "Successfully garbage collected" : "Failed to garbage collect", @@ -2334,8 +2326,8 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() { DeleteObsoleteFiles(false /*abort*/); } -void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { - CloseSeqWrite(bfile, false /*abort*/); +Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { + return CloseBlobFile(bfile); } Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 9886dbe5b..e7c49b20d 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -263,7 +263,7 @@ class BlobDBImpl : public BlobDB { std::vector> TEST_GetObsoleteFiles() const; - void TEST_CloseBlobFile(std::shared_ptr& bfile); + Status TEST_CloseBlobFile(std::shared_ptr& bfile); Status TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, GCStats* gc_stats); @@ -293,11 +293,6 @@ class BlobDBImpl : public BlobDB { // this handler is called. void OnFlushBeginHandler(DB* db, const FlushJobInfo& info); - // timer queue callback to close a file by appending a footer - // removes file from open files list - std::pair CloseSeqWrite(std::shared_ptr bfile, - bool aborted); - // is this file ready for Garbage collection. if the TTL of the file // has expired or if threshold of the file has been evicted // tt - current time @@ -308,8 +303,11 @@ class BlobDBImpl : public BlobDB { // collect all the blob log files from the blob directory Status GetAllLogFiles(std::set>* file_nums); - // appends a task into timer queue to close the file - void CloseIf(const std::shared_ptr& bfile); + // Close a file by appending a footer, and removes file from open files list. + Status CloseBlobFile(std::shared_ptr bfile); + + // Close a file if its size exceeds blob_file_size + Status CloseBlobFileIfNeeded(std::shared_ptr& bfile); uint64_t ExtractExpiration(const Slice& key, const Slice& value, Slice* value_slice, std::string* new_value); @@ -470,7 +468,7 @@ class BlobDBImpl : public BlobDB { // epoch or version of the open files. std::atomic epoch_of_; - // typically we keep 4 open blob files (simple i.e. no TTL) + // All opened non-TTL blob files. std::vector> open_simple_files_; // all the blob files which are currently being appended to based diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 8ec01698a..41c1482e7 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -185,7 +185,7 @@ TEST_F(BlobDBTest, PutWithTTL) { auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); - bdb_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); @@ -214,7 +214,7 @@ TEST_F(BlobDBTest, PutUntil) { auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); - bdb_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); @@ -246,7 +246,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_FALSE(blob_files[0]->HasTTL()); - bdb_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(0, gc_stats.num_deletes); @@ -291,7 +291,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); - bdb_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; @@ -338,7 +338,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); - bdb_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; @@ -395,7 +395,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); - bdb_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); @@ -592,7 +592,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) { } auto blob_files = blob_db_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); // Test for data in SST size_t new_keys = 0; for (int i = 0; i < 100; i++) { @@ -627,7 +627,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { static_cast_with_check(blob_db_); auto blob_files = blob_db_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); SyncPoint::GetInstance()->LoadDependency( {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", @@ -663,7 +663,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { static_cast_with_check(blob_db_); auto blob_files = blob_db_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); - blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); mock_env_->set_now_micros(300 * 1000000); SyncPoint::GetInstance()->LoadDependency( @@ -708,7 +708,6 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) { ASSERT_EQ(11, blob_files.size()); ASSERT_TRUE(blob_files[0]->HasTTL()); ASSERT_TRUE(blob_files[0]->Immutable()); - blob_db_impl->TEST_CloseBlobFile(blob_files[0]); for (int i = 1; i <= 10; i++) { ASSERT_FALSE(blob_files[i]->HasTTL()); if (i < 10) { @@ -736,7 +735,7 @@ TEST_F(BlobDBTest, ReadWhileGC) { ASSERT_EQ(1, blob_files.size()); std::shared_ptr bfile = blob_files[0]; uint64_t bfile_number = bfile->BlobFileNumber(); - blob_db_impl->TEST_CloseBlobFile(bfile); + ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(bfile)); switch (i) { case 0: