diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 8f1c3b027..0a5984c4a 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -29,6 +29,7 @@ #include "util/logging.h" #include "util/mutexlock.h" #include "util/random.h" +#include "util/sync_point.h" #include "util/timer_queue.h" #include "utilities/transactions/optimistic_transaction_db_impl.h" #include "utilities/transactions/optimistic_transaction_impl.h" @@ -955,6 +956,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, Status BlobDBImpl::PutUntil(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value_unc, uint64_t expiration) { + TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); MutexLock l(&write_mutex_); UpdateWriteOptions(options); @@ -1026,6 +1028,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, CloseIf(bfile); + TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish"); return s; } @@ -1659,8 +1662,8 @@ std::pair BlobDBImpl::WaStats(bool aborted) { // DELETED in the LSM //////////////////////////////////////////////////////////////////////////////// Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, - GCStats* gcstats) { - uint64_t tt = EpochNow(); + GCStats* gc_stats) { + uint64_t now = EpochNow(); std::shared_ptr reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_); @@ -1683,8 +1686,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, bool first_gc = bfptr->gc_once_after_open_; ColumnFamilyHandle* cfh = bfptr->GetColumnFamily(db_); - auto cfhi = reinterpret_cast(cfh); - auto cfd = cfhi->cfd(); bool has_ttl = header.HasTTL(); // this reads the key but skips the blob @@ -1692,7 +1693,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, assert(opt_db_); - bool no_relocation_ttl = (has_ttl && tt > bfptr->GetTTLRange().second); + bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second); bool no_relocation_lsmdel = false; { @@ -1711,136 +1712,199 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, BlobLogRecord record; std::shared_ptr newfile; std::shared_ptr new_writer; + Transaction* transaction = nullptr; + uint64_t blob_offset = 0; + bool retry = false; - while (reader->ReadRecord(&record, shallow).ok()) { - gcstats->blob_count++; + static const WriteOptions kGarbageCollectionWriteOptions = []() { + WriteOptions write_options; + // TODO(yiwu): Disable WAL for garbage colection to make it compatible with + // use cases that don't use WAL. However without WAL there are at least + // two issues with crash: + // 1. If a key is dropped from blob file (e.g. due to TTL), right before a + // crash, the key may still presents in LSM after restart. + // 2. If a key is relocated to another blob file, right before a crash, + // after restart the new offset may be lost with the old offset pointing + // to the removed blob file. + // We need to have better recovery mechanism to address these issues. + write_options.disableWAL = true; + // It is ok to ignore column families that were dropped. + write_options.ignore_missing_column_families = true; + return write_options; + }(); - bool del_this = false; - bool reloc_this = false; - - // TODO(yiwu): The following logic should use GetForUpdate() from - // optimistic transaction to check if the key is current, otherwise - // there can be another writer sneak in between sequence number of - // and the deletion. - - // this particular TTL has expired - if (no_relocation_ttl || (has_ttl && tt > record.GetTTL())) { - del_this = true; - } else if (!first_gc) { - SequenceNumber seq = kMaxSequenceNumber; - bool found_record_for_key = false; - SuperVersion* sv = db_impl_->GetAndRefSuperVersion(cfd); - if (sv == nullptr) { - Status result = - Status::InvalidArgument("Could not access column family 0"); - return result; + while (true) { + assert(s.ok()); + if (retry) { + // Retry in case transaction fail with Status::TryAgain. + retry = false; + } else { + // Read the next blob record. + Status read_record_status = + reader->ReadRecord(&record, shallow, &blob_offset); + // Exit if we reach the end of blob file. + // TODO(yiwu): properly handle ReadRecord error. + if (!read_record_status.ok()) { + break; } - Status s1 = db_impl_->GetLatestSequenceForKey( - sv, record.Key(), false, &seq, &found_record_for_key); - if (found_record_for_key && seq == record.GetSN()) { - reloc_this = true; - } - db_impl_->ReturnAndCleanupSuperVersion(cfd, sv); + gc_stats->blob_count++; } - if (del_this) { - gcstats->num_deletes++; - gcstats->deleted_size += record.GetBlobSize(); - if (first_gc) continue; + transaction = + opt_db_->BeginTransaction(kGarbageCollectionWriteOptions, + OptimisticTransactionOptions(), transaction); - Transaction* txn = static_cast(opt_db_.get()) - ->BeginTransaction(write_options_); - txn->Delete(cfh, record.Key()); - Status s1 = txn->Commit(); - // chances that this DELETE will fail is low. If it fails, it would be - // because a new version of the key came in at this time, which will - // override the current version being iterated on. - if (!s1.IsBusy()) { - // assume that failures happen due to new writes. - gcstats->overrided_while_delete++; - } - delete txn; + std::string index_entry; + Status get_status = transaction->GetForUpdate(ReadOptions(), cfh, + record.Key(), &index_entry); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate"); + if (get_status.IsNotFound()) { + // Key has been deleted. Drop the blob record. + continue; + } + if (!get_status.ok()) { + s = get_status; + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while getting index entry: %s", + s.ToString().c_str()); + break; } - if (reloc_this) { - if (!newfile) { - // new file - std::string reason("GC of "); - reason += bfptr->PathName(); - newfile = NewBlobFile(reason); - gcstats->newfile = newfile; + // TODO(yiwu): We should have an override of GetForUpdate returning a + // PinnableSlice. + Slice index_entry_slice(index_entry); + BlobHandle handle; + s = handle.DecodeFrom(&index_entry_slice); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while decoding index entry: %s", + s.ToString().c_str()); + break; + } + if (handle.filenumber() != bfptr->BlobFileNumber() || + handle.offset() != blob_offset) { + // Key has been overwritten. Drop the blob record. + continue; + } - new_writer = CheckOrCreateWriterLocked(newfile); - newfile->header_ = std::move(header); - // Can't use header beyond this point - newfile->header_valid_ = true; - newfile->file_size_ = BlobLogHeader::kHeaderSize; - s = new_writer->WriteHeader(newfile->header_); - - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "File: %s - header writing failed", - newfile->PathName().c_str()); - return s; - } - - WriteLock wl(&mutex_); - - dir_change_.store(true); - blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); - } - - gcstats->num_relocs++; - std::string index_entry; - - uint64_t blob_offset = 0; - uint64_t key_offset = 0; - // write the blob to the blob log. - s = new_writer->AddRecord(record.Key(), record.Blob(), &key_offset, - &blob_offset, record.GetTTL()); - - BlobHandle handle; - handle.set_filenumber(newfile->BlobFileNumber()); - handle.set_size(record.Blob().size()); - handle.set_offset(blob_offset); - handle.set_compression(bdb_options_.compression); - handle.EncodeTo(&index_entry); - - new_writer->AddRecordFooter(record.GetSN()); - newfile->blob_count_++; - newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + - record.Blob().size() + BlobLogRecord::kFooterSize; - - Transaction* txn = opt_db_->BeginTransaction( - write_options_, OptimisticTransactionOptions(), nullptr); - txn->Put(cfh, record.Key(), index_entry); - Status s1 = txn->Commit(); - // chances that this Put will fail is low. If it fails, it would be - // because a new version of the key came in at this time, which will - // override the current version being iterated on. - if (s1.IsBusy()) { - ROCKS_LOG_INFO(db_options_.info_log, - "Optimistic transaction failed: %s put bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); + // If key has expired, remove it from base DB. + if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) { + gc_stats->num_deletes++; + gc_stats->deleted_size += record.GetBlobSize(); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); + transaction->Delete(cfh, record.Key()); + Status delete_status = transaction->Commit(); + if (delete_status.ok()) { + gc_stats->delete_succeeded++; + } else if (delete_status.IsBusy()) { + // The key is overwritten in the meanwhile. Drop the blob record. + gc_stats->overwritten_while_delete++; + } else if (delete_status.IsTryAgain()) { + // Retry the transaction. + retry = true; } else { - gcstats->succ_relocs++; - ROCKS_LOG_DEBUG(db_options_.info_log, - "Successfully added put back into LSM: %s bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); + // We hit an error. + s = delete_status; + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while deleting expired key: %s", + s.ToString().c_str()); + break; } - delete txn; + // Continue to next blob record or retry. + continue; } + + if (first_gc) { + // Do not relocate blob record for initial GC. + continue; + } + + // Relocate the blob record to new file. + if (!newfile) { + // new file + std::string reason("GC of "); + reason += bfptr->PathName(); + newfile = NewBlobFile(reason); + gc_stats->newfile = newfile; + + new_writer = CheckOrCreateWriterLocked(newfile); + newfile->header_ = std::move(header); + // Can't use header beyond this point + newfile->header_valid_ = true; + newfile->file_size_ = BlobLogHeader::kHeaderSize; + s = new_writer->WriteHeader(newfile->header_); + + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File: %s - header writing failed", + newfile->PathName().c_str()); + break; + } + + WriteLock wl(&mutex_); + + dir_change_.store(true); + blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); + } + + gc_stats->num_relocate++; + std::string new_index_entry; + + uint64_t new_blob_offset = 0; + uint64_t new_key_offset = 0; + // write the blob to the blob log. + s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset, + &new_blob_offset, record.GetTTL()); + + BlobHandle new_handle; + new_handle.set_filenumber(newfile->BlobFileNumber()); + new_handle.set_size(record.Blob().size()); + new_handle.set_offset(new_blob_offset); + new_handle.set_compression(bdb_options_.compression); + new_handle.EncodeTo(&new_index_entry); + + new_writer->AddRecordFooter(record.GetSN()); + newfile->blob_count_++; + newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + + record.Blob().size() + BlobLogRecord::kFooterSize; + + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); + transaction->Put(cfh, record.Key(), new_index_entry); + Status put_status = transaction->Commit(); + if (put_status.ok()) { + gc_stats->relocate_succeeded++; + } else if (put_status.IsBusy()) { + // The key is overwritten in the meanwhile. Drop the blob record. + gc_stats->overwritten_while_relocate++; + } else if (put_status.IsTryAgain()) { + // Retry the transaction. + // TODO(yiwu): On retry, we can reuse the new blob record. + retry = true; + } else { + // We hit an error. + s = put_status; + ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s", + s.ToString().c_str()); + break; + } + } // end of ReadRecord loop + + if (transaction != nullptr) { + delete transaction; + } + ROCKS_LOG_INFO( + db_options_.info_log, "%s blob file %" PRIu64 ".", + ". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64 + " succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.", + s.ok() ? "Successfully garbage collected" : "Failed to garbage collect", + bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->delete_succeeded, + gc_stats->num_deletes, gc_stats->relocate_succeeded, + gc_stats->num_relocate); + if (newfile != nullptr) { + total_blob_space_ += newfile->file_size_; + ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".", + newfile->BlobFileNumber()); } - - if (gcstats->newfile) total_blob_space_ += newfile->file_size_; - - ROCKS_LOG_INFO(db_options_.info_log, - "File: %s Num deletes %" PRIu32 " Num relocs: %" PRIu32 - " Succ Deletes: %" PRIu32 " Succ relocs: %" PRIu32, - bfptr->PathName().c_str(), gcstats->num_deletes, - gcstats->num_relocs, gcstats->succ_deletes_lsm, - gcstats->succ_relocs); - return s; } @@ -2123,15 +2187,17 @@ std::pair BlobDBImpl::RunGC(bool aborted) { // in this collect the set of files, which became obsolete std::vector> obsoletes; for (auto bfile : to_process) { - GCStats gcstats; - Status s = GCFileAndUpdateLSM(bfile, &gcstats); - if (!s.ok()) continue; + GCStats gc_stats; + Status s = GCFileAndUpdateLSM(bfile, &gc_stats); + if (!s.ok()) { + continue; + } if (bfile->gc_once_after_open_.load()) { WriteLock lockbfile_w(&bfile->mutex_); - bfile->deleted_size_ = gcstats.deleted_size; - bfile->deleted_count_ = gcstats.num_deletes; + bfile->deleted_size_ = gc_stats.deleted_size; + bfile->deleted_count_ = gc_stats.num_deletes; bfile->gc_once_after_open_ = false; } else { obsoletes.push_back(bfile); diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index d812604be..6247fa22b 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -137,10 +137,13 @@ struct GCStats { uint64_t blob_count = 0; uint64_t num_deletes = 0; uint64_t deleted_size = 0; - uint64_t num_relocs = 0; - uint64_t succ_deletes_lsm = 0; - uint64_t overrided_while_delete = 0; - uint64_t succ_relocs = 0; + uint64_t retry_delete = 0; + uint64_t delete_succeeded = 0; + uint64_t overwritten_while_delete = 0; + uint64_t num_relocate = 0; + uint64_t retry_relocate = 0; + uint64_t relocate_succeeded = 0; + uint64_t overwritten_while_relocate = 0; std::shared_ptr newfile = nullptr; }; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 829a7819d..4839a6afc 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -14,6 +14,7 @@ #include "port/port.h" #include "util/random.h" #include "util/string_util.h" +#include "util/sync_point.h" #include "util/testharness.h" #include "utilities/blob_db/blob_db_impl.h" @@ -176,7 +177,7 @@ TEST_F(BlobDBTest, PutWithTTL) { for (size_t i = 0; i < 100; i++) { uint64_t ttl = rnd.Next() % 100; PutRandomWithTTL("key" + ToString(i), ttl, &rnd, - (ttl < 50 ? nullptr : &data)); + (ttl <= 50 ? nullptr : &data)); } mock_env_->set_now_micros(100 * 1000000); auto *bdb_impl = static_cast(blob_db_); @@ -187,7 +188,7 @@ TEST_F(BlobDBTest, PutWithTTL) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -205,7 +206,7 @@ TEST_F(BlobDBTest, PutUntil) { for (size_t i = 0; i < 100; i++) { uint64_t expiration = rnd.Next() % 100 + 50; PutRandomUntil("key" + ToString(i), expiration, &rnd, - (expiration < 100 ? nullptr : &data)); + (expiration <= 100 ? nullptr : &data)); } mock_env_->set_now_micros(100 * 1000000); auto *bdb_impl = static_cast(blob_db_); @@ -216,7 +217,7 @@ TEST_F(BlobDBTest, PutUntil) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -248,7 +249,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(0, gc_stats.num_deletes); - ASSERT_EQ(100, gc_stats.num_relocs); + ASSERT_EQ(100, gc_stats.num_relocate); VerifyDB(data); } @@ -262,7 +263,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { std::string * /*new_value*/, bool * /*value_changed*/) override { *ttl = rnd->Next() % 100; - if (*ttl >= 50) { + if (*ttl > 50) { data[key.ToString()] = value.ToString(); } return true; @@ -294,7 +295,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -309,7 +310,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { std::string * /*new_value*/, bool * /*value_changed*/) override { *expiration = rnd->Next() % 100 + 50; - if (*expiration >= 100) { + if (*expiration > 100) { data[key.ToString()] = value.ToString(); } return true; @@ -341,7 +342,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); auto &data = static_cast(ttl_extractor_.get())->data; ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -384,7 +385,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { std::string value_ttl = value + "ttl:"; PutFixed64(&value_ttl, ttl); ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl))); - if (ttl >= 50) { + if (ttl > 50) { data[key] = value; } } @@ -397,7 +398,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { GCStats gc_stats; ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); ASSERT_EQ(100 - data.size(), gc_stats.num_deletes); - ASSERT_EQ(data.size(), gc_stats.num_relocs); + ASSERT_EQ(data.size(), gc_stats.num_relocate); VerifyDB(data); } @@ -533,9 +534,7 @@ TEST_F(BlobDBTest, MultipleWriters) { i)); std::map data; for (size_t i = 0; i < 10; i++) { - if (workers[i].joinable()) { - workers[i].join(); - } + workers[i].join(); data.insert(data_set[i].begin(), data_set[i].end()); } VerifyDB(data); @@ -577,7 +576,7 @@ TEST_F(BlobDBTest, SequenceNumber) { } } -TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) { +TEST_F(BlobDBTest, GCAfterOverwriteKeys) { Random rnd(301); BlobDBOptions bdb_options; bdb_options.disable_background_tasks = true; @@ -609,11 +608,81 @@ TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) { } GCStats gc_stats; ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(200, gc_stats.blob_count); ASSERT_EQ(0, gc_stats.num_deletes); - ASSERT_EQ(200 - new_keys, gc_stats.num_relocs); + ASSERT_EQ(200 - new_keys, gc_stats.num_relocate); VerifyDB(data); } +TEST_F(BlobDBTest, GCRelocateKeyWhileOverwritting) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1")); + BlobDBImpl *blob_db_impl = dynamic_cast(blob_db_); + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + "BlobDBImpl::PutUntil:Start"}, + {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + auto writer = port::Thread( + [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); }); + + GCStats gc_stats; + ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(0, gc_stats.num_deletes); + ASSERT_EQ(1, gc_stats.num_relocate); + ASSERT_EQ(0, gc_stats.relocate_succeeded); + ASSERT_EQ(1, gc_stats.overwritten_while_relocate); + writer.join(); + VerifyDB({{"foo", "v2"}}); +} + +TEST_F(BlobDBTest, GCExpiredKeyWhileOverwritting) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + mock_env_->set_now_micros(100 * 1000000); + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); + BlobDBImpl *blob_db_impl = dynamic_cast(blob_db_); + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + mock_env_->set_now_micros(300 * 1000000); + + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate", + "BlobDBImpl::PutUntil:Start"}, + {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + auto writer = port::Thread([this]() { + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400)); + }); + + GCStats gc_stats; + ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(1, gc_stats.num_deletes); + ASSERT_EQ(0, gc_stats.delete_succeeded); + ASSERT_EQ(1, gc_stats.overwritten_while_delete); + ASSERT_EQ(0, gc_stats.num_relocate); + writer.join(); + VerifyDB({{"foo", "v2"}}); +} + } // namespace blob_db } // namespace rocksdb diff --git a/utilities/blob_db/blob_log_reader.cc b/utilities/blob_db/blob_log_reader.cc index 3931c8669..75afab2e7 100644 --- a/utilities/blob_db/blob_log_reader.cc +++ b/utilities/blob_db/blob_log_reader.cc @@ -41,7 +41,7 @@ Status Reader::ReadHeader(BlobLogHeader* header) { } Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, - WALRecoveryMode wal_recovery_mode) { + uint64_t* blob_offset) { record->Clear(); buffer_.clear(); backing_store_[0] = '\0'; @@ -65,6 +65,9 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, header_crc = crc32c::Extend(header_crc, buffer_.data(), crc_data_size); uint64_t kb_size = record->GetKeySize() + record->GetBlobSize(); + if (blob_offset != nullptr) { + *blob_offset = next_byte_ + record->GetKeySize(); + } switch (level) { case kReadHdrFooter: file_->Skip(kb_size); diff --git a/utilities/blob_db/blob_log_reader.h b/utilities/blob_db/blob_log_reader.h index 05f53fe93..5522ec3a2 100644 --- a/utilities/blob_db/blob_log_reader.h +++ b/utilities/blob_db/blob_log_reader.h @@ -60,9 +60,9 @@ class Reader { // "*scratch" as temporary storage. The contents filled in *record // will only be valid until the next mutating operation on this // reader or the next mutation to *scratch. + // If blob_offset is non-null, return offset of the blob through it. Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHdrFooter, - WALRecoveryMode wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords); + uint64_t* blob_offset = nullptr); SequentialFileReader* file() { return file_.get(); }