Blob DB garbage collection should keep keys with newer version
Summary: Fix the bug where if blob db garbage collection revmoe keys with newer version. It shouldn't delete the key from base db when sequence number in base db is not equal to the one in blob log. Closes https://github.com/facebook/rocksdb/pull/2678 Differential Revision: D5549752 Pulled By: yiwu-arbug fbshipit-source-id: abb8649260963b5c389748023970fd746279d227
This commit is contained in:
parent
b14207c0c5
commit
1b33ee8e7b
@ -1716,10 +1716,17 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
gcstats->blob_count++;
|
gcstats->blob_count++;
|
||||||
|
|
||||||
bool del_this = false;
|
bool del_this = false;
|
||||||
|
bool reloc_this = false;
|
||||||
|
|
||||||
|
// TODO(yiwu): The following logic should use GetForUpdate() from
|
||||||
|
// optimistic transaction to check if the key is current, otherwise
|
||||||
|
// there can be another writer sneak in between sequence number of
|
||||||
|
// and the deletion.
|
||||||
|
|
||||||
// this particular TTL has expired
|
// this particular TTL has expired
|
||||||
if (no_relocation_ttl || (has_ttl && tt > record.GetTTL())) {
|
if (no_relocation_ttl || (has_ttl && tt > record.GetTTL())) {
|
||||||
del_this = true;
|
del_this = true;
|
||||||
} else {
|
} else if (!first_gc) {
|
||||||
SequenceNumber seq = kMaxSequenceNumber;
|
SequenceNumber seq = kMaxSequenceNumber;
|
||||||
bool found_record_for_key = false;
|
bool found_record_for_key = false;
|
||||||
SuperVersion* sv = db_impl_->GetAndRefSuperVersion(cfd);
|
SuperVersion* sv = db_impl_->GetAndRefSuperVersion(cfd);
|
||||||
@ -1730,8 +1737,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
}
|
}
|
||||||
Status s1 = db_impl_->GetLatestSequenceForKey(
|
Status s1 = db_impl_->GetLatestSequenceForKey(
|
||||||
sv, record.Key(), false, &seq, &found_record_for_key);
|
sv, record.Key(), false, &seq, &found_record_for_key);
|
||||||
if (s1.IsNotFound() || (!found_record_for_key || seq != record.GetSN())) {
|
if (found_record_for_key && seq == record.GetSN()) {
|
||||||
del_this = true;
|
reloc_this = true;
|
||||||
}
|
}
|
||||||
db_impl_->ReturnAndCleanupSuperVersion(cfd, sv);
|
db_impl_->ReturnAndCleanupSuperVersion(cfd, sv);
|
||||||
}
|
}
|
||||||
@ -1753,11 +1760,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
gcstats->overrided_while_delete++;
|
gcstats->overrided_while_delete++;
|
||||||
}
|
}
|
||||||
delete txn;
|
delete txn;
|
||||||
continue;
|
|
||||||
} else if (first_gc) {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (reloc_this) {
|
||||||
if (!newfile) {
|
if (!newfile) {
|
||||||
// new file
|
// new file
|
||||||
std::string reason("GC of ");
|
std::string reason("GC of ");
|
||||||
@ -1806,13 +1811,13 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() +
|
newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() +
|
||||||
record.Blob().size() + BlobLogRecord::kFooterSize;
|
record.Blob().size() + BlobLogRecord::kFooterSize;
|
||||||
|
|
||||||
Transaction* txn = static_cast<OptimisticTransactionDB*>(opt_db_.get())
|
Transaction* txn = opt_db_->BeginTransaction(
|
||||||
->BeginTransaction(write_options_);
|
write_options_, OptimisticTransactionOptions(), nullptr);
|
||||||
txn->Put(cfh, record.Key(), index_entry);
|
txn->Put(cfh, record.Key(), index_entry);
|
||||||
Status s1 = txn->Commit();
|
Status s1 = txn->Commit();
|
||||||
// chances that this Put will fail is low. If it fails, it would be because
|
// chances that this Put will fail is low. If it fails, it would be
|
||||||
// a new version of the key came in at this time, which will override
|
// because a new version of the key came in at this time, which will
|
||||||
// the current version being iterated on.
|
// override the current version being iterated on.
|
||||||
if (s1.IsBusy()) {
|
if (s1.IsBusy()) {
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
ROCKS_LOG_INFO(db_options_.info_log,
|
||||||
"Optimistic transaction failed: %s put bn: %" PRIu32,
|
"Optimistic transaction failed: %s put bn: %" PRIu32,
|
||||||
@ -1825,6 +1830,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
}
|
}
|
||||||
delete txn;
|
delete txn;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (gcstats->newfile) total_blob_space_ += newfile->file_size_;
|
if (gcstats->newfile) total_blob_space_ += newfile->file_size_;
|
||||||
|
|
||||||
|
@ -531,7 +531,7 @@ TEST_F(BlobDBTest, SequenceNumber) {
|
|||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
SequenceNumber sequence = blob_db_->GetLatestSequenceNumber();
|
SequenceNumber sequence = blob_db_->GetLatestSequenceNumber();
|
||||||
BlobDBImpl *blob_db_impl = reinterpret_cast<BlobDBImpl *>(blob_db_);
|
BlobDBImpl *blob_db_impl = dynamic_cast<BlobDBImpl*>(blob_db_);
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
std::string key = "key" + ToString(i);
|
std::string key = "key" + ToString(i);
|
||||||
PutRandom(key, &rnd);
|
PutRandom(key, &rnd);
|
||||||
@ -560,6 +560,43 @@ TEST_F(BlobDBTest, SequenceNumber) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) {
|
||||||
|
Random rnd(301);
|
||||||
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.disable_background_tasks = true;
|
||||||
|
Open(bdb_options);
|
||||||
|
BlobDBImpl *blob_db_impl = dynamic_cast<BlobDBImpl*>(blob_db_);
|
||||||
|
DBImpl *db_impl = dynamic_cast<DBImpl*>(blob_db_->GetBaseDB());
|
||||||
|
std::map<std::string, std::string> data;
|
||||||
|
for (int i = 0; i < 200; i++) {
|
||||||
|
PutRandom("key" + ToString(i), &rnd, &data);
|
||||||
|
}
|
||||||
|
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
|
||||||
|
ASSERT_EQ(1, blob_files.size());
|
||||||
|
blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
|
||||||
|
// Test for data in SST
|
||||||
|
size_t new_keys = 0;
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
if (rnd.Next() % 2 == 1) {
|
||||||
|
new_keys++;
|
||||||
|
PutRandom("key" + ToString(i), &rnd, &data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
db_impl->TEST_FlushMemTable(true /*wait*/);
|
||||||
|
// Test for data in memtable
|
||||||
|
for (int i = 100; i < 200; i++) {
|
||||||
|
if (rnd.Next() % 2 == 1) {
|
||||||
|
new_keys++;
|
||||||
|
PutRandom("key" + ToString(i), &rnd, &data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GCStats gc_stats;
|
||||||
|
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
|
||||||
|
ASSERT_EQ(0, gc_stats.num_deletes);
|
||||||
|
ASSERT_EQ(200 - new_keys, gc_stats.num_relocs);
|
||||||
|
VerifyDB(data);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace blob_db
|
} // namespace blob_db
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user