diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 0488d9924..f74307e19 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1712,10 +1712,17 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, gcstats->blob_count++; bool del_this = false; + bool reloc_this = false; + + // TODO(yiwu): The following logic should use GetForUpdate() from + // optimistic transaction to check if the key is current, otherwise + // there can be another writer sneak in between sequence number of + // and the deletion. + // this particular TTL has expired if (no_relocation_ttl || (has_ttl && tt > record.GetTTL())) { del_this = true; - } else { + } else if (!first_gc) { SequenceNumber seq = kMaxSequenceNumber; bool found_record_for_key = false; SuperVersion* sv = db_impl_->GetAndRefSuperVersion(cfd); @@ -1726,8 +1733,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } Status s1 = db_impl_->GetLatestSequenceForKey( sv, record.Key(), false, &seq, &found_record_for_key); - if (s1.IsNotFound() || (!found_record_for_key || seq != record.GetSN())) { - del_this = true; + if (found_record_for_key && seq == record.GetSN()) { + reloc_this = true; } db_impl_->ReturnAndCleanupSuperVersion(cfd, sv); } @@ -1749,77 +1756,76 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, gcstats->overrided_while_delete++; } delete txn; - continue; - } else if (first_gc) { - continue; } - if (!newfile) { - // new file - std::string reason("GC of "); - reason += bfptr->PathName(); - newfile = NewBlobFile(reason); - gcstats->newfile = newfile; + if (reloc_this) { + if (!newfile) { + // new file + std::string reason("GC of "); + reason += bfptr->PathName(); + newfile = NewBlobFile(reason); + gcstats->newfile = newfile; - new_writer = CheckOrCreateWriterLocked(newfile); - newfile->header_ = std::move(header); - // Can't use header beyond this point - newfile->header_valid_ = true; - newfile->file_size_ = BlobLogHeader::kHeaderSize; - s = new_writer->WriteHeader(newfile->header_); + new_writer = CheckOrCreateWriterLocked(newfile); + newfile->header_ = std::move(header); + // Can't use header beyond this point + newfile->header_valid_ = true; + newfile->file_size_ = BlobLogHeader::kHeaderSize; + s = new_writer->WriteHeader(newfile->header_); - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "File: %s - header writing failed", - newfile->PathName().c_str()); - return s; + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File: %s - header writing failed", + newfile->PathName().c_str()); + return s; + } + + WriteLock wl(&mutex_); + + dir_change_.store(true); + blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); } - WriteLock wl(&mutex_); + gcstats->num_relocs++; + std::string index_entry; - dir_change_.store(true); - blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); + uint64_t blob_offset = 0; + uint64_t key_offset = 0; + // write the blob to the blob log. + s = new_writer->AddRecord(record.Key(), record.Blob(), &key_offset, + &blob_offset, record.GetTTL()); + + BlobHandle handle; + handle.set_filenumber(newfile->BlobFileNumber()); + handle.set_size(record.Blob().size()); + handle.set_offset(blob_offset); + handle.set_compression(bdb_options_.compression); + handle.EncodeTo(&index_entry); + + new_writer->AddRecordFooter(record.GetSN()); + newfile->blob_count_++; + newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + + record.Blob().size() + BlobLogRecord::kFooterSize; + + Transaction* txn = opt_db_->BeginTransaction( + write_options_, OptimisticTransactionOptions(), nullptr); + txn->Put(cfh, record.Key(), index_entry); + Status s1 = txn->Commit(); + // chances that this Put will fail is low. If it fails, it would be + // because a new version of the key came in at this time, which will + // override the current version being iterated on. + if (s1.IsBusy()) { + ROCKS_LOG_INFO(db_options_.info_log, + "Optimistic transaction failed: %s put bn: %" PRIu32, + bfptr->PathName().c_str(), gcstats->blob_count); + } else { + gcstats->succ_relocs++; + ROCKS_LOG_DEBUG(db_options_.info_log, + "Successfully added put back into LSM: %s bn: %" PRIu32, + bfptr->PathName().c_str(), gcstats->blob_count); + } + delete txn; } - - gcstats->num_relocs++; - std::string index_entry; - - uint64_t blob_offset = 0; - uint64_t key_offset = 0; - // write the blob to the blob log. - s = new_writer->AddRecord(record.Key(), record.Blob(), &key_offset, - &blob_offset, record.GetTTL()); - - BlobHandle handle; - handle.set_filenumber(newfile->BlobFileNumber()); - handle.set_size(record.Blob().size()); - handle.set_offset(blob_offset); - handle.set_compression(bdb_options_.compression); - handle.EncodeTo(&index_entry); - - new_writer->AddRecordFooter(record.GetSN()); - newfile->blob_count_++; - newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + - record.Blob().size() + BlobLogRecord::kFooterSize; - - Transaction* txn = opt_db_->BeginTransaction( - write_options_, OptimisticTransactionOptions(), nullptr); - txn->Put(cfh, record.Key(), index_entry); - Status s1 = txn->Commit(); - // chances that this Put will fail is low. If it fails, it would be because - // a new version of the key came in at this time, which will override - // the current version being iterated on. - if (s1.IsBusy()) { - ROCKS_LOG_INFO(db_options_.info_log, - "Optimistic transaction failed: %s put bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); - } else { - gcstats->succ_relocs++; - ROCKS_LOG_DEBUG(db_options_.info_log, - "Successfully added put back into LSM: %s bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); - } - delete txn; } if (gcstats->newfile) total_blob_space_ += newfile->file_size_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index a5f979592..a3873729c 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -12,6 +12,7 @@ #include #include "db/db_test_util.h" #include "port/port.h" +#include "util/cast_util.h" #include "util/random.h" #include "util/string_util.h" #include "util/testharness.h" @@ -531,7 +532,8 @@ TEST_F(BlobDBTest, SequenceNumber) { bdb_options.disable_background_tasks = true; Open(bdb_options); SequenceNumber sequence = blob_db_->GetLatestSequenceNumber(); - BlobDBImpl *blob_db_impl = reinterpret_cast(blob_db_); + BlobDBImpl *blob_db_impl = + static_cast_with_check(blob_db_); for (int i = 0; i < 100; i++) { std::string key = "key" + ToString(i); PutRandom(key, &rnd); @@ -560,6 +562,44 @@ TEST_F(BlobDBTest, SequenceNumber) { } } +TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + BlobDBImpl *blob_db_impl = + static_cast_with_check(blob_db_); + DBImpl *db_impl = static_cast_with_check(blob_db_->GetBaseDB()); + std::map data; + for (int i = 0; i < 200; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + // Test for data in SST + size_t new_keys = 0; + for (int i = 0; i < 100; i++) { + if (rnd.Next() % 2 == 1) { + new_keys++; + PutRandom("key" + ToString(i), &rnd, &data); + } + } + db_impl->TEST_FlushMemTable(true /*wait*/); + // Test for data in memtable + for (int i = 100; i < 200; i++) { + if (rnd.Next() % 2 == 1) { + new_keys++; + PutRandom("key" + ToString(i), &rnd, &data); + } + } + GCStats gc_stats; + ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(0, gc_stats.num_deletes); + ASSERT_EQ(200 - new_keys, gc_stats.num_relocs); + VerifyDB(data); +} + } // namespace blob_db } // namespace rocksdb