From 1b33ee8e7be093a6cb4412f8e241ff25e5532721 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 3 Aug 2017 12:56:44 -0700 Subject: [PATCH] Blob DB garbage collection should keep keys with newer version Summary: Fix the bug where if blob db garbage collection revmoe keys with newer version. It shouldn't delete the key from base db when sequence number in base db is not equal to the one in blob log. Closes https://github.com/facebook/rocksdb/pull/2678 Differential Revision: D5549752 Pulled By: yiwu-arbug fbshipit-source-id: abb8649260963b5c389748023970fd746279d227 --- utilities/blob_db/blob_db_impl.cc | 138 ++++++++++++++++-------------- utilities/blob_db/blob_db_test.cc | 39 ++++++++- 2 files changed, 110 insertions(+), 67 deletions(-) diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index daaba3756..694500c25 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1716,10 +1716,17 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, gcstats->blob_count++; bool del_this = false; + bool reloc_this = false; + + // TODO(yiwu): The following logic should use GetForUpdate() from + // optimistic transaction to check if the key is current, otherwise + // there can be another writer sneak in between sequence number of + // and the deletion. + // this particular TTL has expired if (no_relocation_ttl || (has_ttl && tt > record.GetTTL())) { del_this = true; - } else { + } else if (!first_gc) { SequenceNumber seq = kMaxSequenceNumber; bool found_record_for_key = false; SuperVersion* sv = db_impl_->GetAndRefSuperVersion(cfd); @@ -1730,8 +1737,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } Status s1 = db_impl_->GetLatestSequenceForKey( sv, record.Key(), false, &seq, &found_record_for_key); - if (s1.IsNotFound() || (!found_record_for_key || seq != record.GetSN())) { - del_this = true; + if (found_record_for_key && seq == record.GetSN()) { + reloc_this = true; } db_impl_->ReturnAndCleanupSuperVersion(cfd, sv); } @@ -1753,77 +1760,76 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, gcstats->overrided_while_delete++; } delete txn; - continue; - } else if (first_gc) { - continue; } - if (!newfile) { - // new file - std::string reason("GC of "); - reason += bfptr->PathName(); - newfile = NewBlobFile(reason); - gcstats->newfile = newfile; + if (reloc_this) { + if (!newfile) { + // new file + std::string reason("GC of "); + reason += bfptr->PathName(); + newfile = NewBlobFile(reason); + gcstats->newfile = newfile; - new_writer = CheckOrCreateWriterLocked(newfile); - newfile->header_ = std::move(header); - // Can't use header beyond this point - newfile->header_valid_ = true; - newfile->file_size_ = BlobLogHeader::kHeaderSize; - s = new_writer->WriteHeader(newfile->header_); + new_writer = CheckOrCreateWriterLocked(newfile); + newfile->header_ = std::move(header); + // Can't use header beyond this point + newfile->header_valid_ = true; + newfile->file_size_ = BlobLogHeader::kHeaderSize; + s = new_writer->WriteHeader(newfile->header_); - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "File: %s - header writing failed", - newfile->PathName().c_str()); - return s; + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File: %s - header writing failed", + newfile->PathName().c_str()); + return s; + } + + WriteLock wl(&mutex_); + + dir_change_.store(true); + blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); } - WriteLock wl(&mutex_); + gcstats->num_relocs++; + std::string index_entry; - dir_change_.store(true); - blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); + uint64_t blob_offset = 0; + uint64_t key_offset = 0; + // write the blob to the blob log. + s = new_writer->AddRecord(record.Key(), record.Blob(), &key_offset, + &blob_offset, record.GetTTL()); + + BlobHandle handle; + handle.set_filenumber(newfile->BlobFileNumber()); + handle.set_size(record.Blob().size()); + handle.set_offset(blob_offset); + handle.set_compression(bdb_options_.compression); + handle.EncodeTo(&index_entry); + + new_writer->AddRecordFooter(record.GetSN()); + newfile->blob_count_++; + newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + + record.Blob().size() + BlobLogRecord::kFooterSize; + + Transaction* txn = opt_db_->BeginTransaction( + write_options_, OptimisticTransactionOptions(), nullptr); + txn->Put(cfh, record.Key(), index_entry); + Status s1 = txn->Commit(); + // chances that this Put will fail is low. If it fails, it would be + // because a new version of the key came in at this time, which will + // override the current version being iterated on. + if (s1.IsBusy()) { + ROCKS_LOG_INFO(db_options_.info_log, + "Optimistic transaction failed: %s put bn: %" PRIu32, + bfptr->PathName().c_str(), gcstats->blob_count); + } else { + gcstats->succ_relocs++; + ROCKS_LOG_DEBUG(db_options_.info_log, + "Successfully added put back into LSM: %s bn: %" PRIu32, + bfptr->PathName().c_str(), gcstats->blob_count); + } + delete txn; } - - gcstats->num_relocs++; - std::string index_entry; - - uint64_t blob_offset = 0; - uint64_t key_offset = 0; - // write the blob to the blob log. - s = new_writer->AddRecord(record.Key(), record.Blob(), &key_offset, - &blob_offset, record.GetTTL()); - - BlobHandle handle; - handle.set_filenumber(newfile->BlobFileNumber()); - handle.set_size(record.Blob().size()); - handle.set_offset(blob_offset); - handle.set_compression(bdb_options_.compression); - handle.EncodeTo(&index_entry); - - new_writer->AddRecordFooter(record.GetSN()); - newfile->blob_count_++; - newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() + - record.Blob().size() + BlobLogRecord::kFooterSize; - - Transaction* txn = static_cast(opt_db_.get()) - ->BeginTransaction(write_options_); - txn->Put(cfh, record.Key(), index_entry); - Status s1 = txn->Commit(); - // chances that this Put will fail is low. If it fails, it would be because - // a new version of the key came in at this time, which will override - // the current version being iterated on. - if (s1.IsBusy()) { - ROCKS_LOG_INFO(db_options_.info_log, - "Optimistic transaction failed: %s put bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); - } else { - gcstats->succ_relocs++; - ROCKS_LOG_DEBUG(db_options_.info_log, - "Successfully added put back into LSM: %s bn: %" PRIu32, - bfptr->PathName().c_str(), gcstats->blob_count); - } - delete txn; } if (gcstats->newfile) total_blob_space_ += newfile->file_size_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index a5f979592..5b3d5f5f4 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -531,7 +531,7 @@ TEST_F(BlobDBTest, SequenceNumber) { bdb_options.disable_background_tasks = true; Open(bdb_options); SequenceNumber sequence = blob_db_->GetLatestSequenceNumber(); - BlobDBImpl *blob_db_impl = reinterpret_cast(blob_db_); + BlobDBImpl *blob_db_impl = dynamic_cast(blob_db_); for (int i = 0; i < 100; i++) { std::string key = "key" + ToString(i); PutRandom(key, &rnd); @@ -560,6 +560,43 @@ TEST_F(BlobDBTest, SequenceNumber) { } } +TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + BlobDBImpl *blob_db_impl = dynamic_cast(blob_db_); + DBImpl *db_impl = dynamic_cast(blob_db_->GetBaseDB()); + std::map data; + for (int i = 0; i < 200; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + auto blob_files = blob_db_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + blob_db_impl->TEST_CloseBlobFile(blob_files[0]); + // Test for data in SST + size_t new_keys = 0; + for (int i = 0; i < 100; i++) { + if (rnd.Next() % 2 == 1) { + new_keys++; + PutRandom("key" + ToString(i), &rnd, &data); + } + } + db_impl->TEST_FlushMemTable(true /*wait*/); + // Test for data in memtable + for (int i = 100; i < 200; i++) { + if (rnd.Next() % 2 == 1) { + new_keys++; + PutRandom("key" + ToString(i), &rnd, &data); + } + } + GCStats gc_stats; + ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(0, gc_stats.num_deletes); + ASSERT_EQ(200 - new_keys, gc_stats.num_relocs); + VerifyDB(data); +} + } // namespace blob_db } // namespace rocksdb