Fix blob DB transaction usage while GC

Summary:
While GC, blob DB use optimistic transaction to delete or replace the index entry in LSM, to guarantee correctness if there's a normal write writing to the same key. However, the previous implementation doesn't call SetSnapshot() nor use GetForUpdate() of transaction API, instead it do its own sequence number checking before beginning the transaction. A normal write can sneak in after the sequence number check and overwrite the key, and the GC will delete or relocate the old version of the key by mistake. Update the code to property use GetForUpdate() to check the existing index entry.

After the patch the sequence number store with each blob record is useless, So I'm considering remove the sequence number from blob record, in another patch.
Closes https://github.com/facebook/rocksdb/pull/2703

Differential Revision: D5589178

Pulled By: yiwu-arbug

fbshipit-source-id: 8dc960cd5f4e61b36024ba7c32d05584ce149c24
This commit is contained in:
yiwu-arbug 2017-08-11 12:30:02 -07:00 committed by Yi Wu
parent b024e77f49
commit 672efad99a
5 changed files with 291 additions and 150 deletions

View File

@ -29,6 +29,7 @@
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/sync_point.h"
#include "util/timer_queue.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h"
#include "utilities/transactions/optimistic_transaction_impl.h"
@ -955,6 +956,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
Status BlobDBImpl::PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value_unc, uint64_t expiration) {
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
MutexLock l(&write_mutex_);
UpdateWriteOptions(options);
@ -1026,6 +1028,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
CloseIf(bfile);
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
return s;
}
@ -1659,8 +1662,8 @@ std::pair<bool, int64_t> BlobDBImpl::WaStats(bool aborted) {
// DELETED in the LSM
////////////////////////////////////////////////////////////////////////////////
Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gcstats) {
uint64_t tt = EpochNow();
GCStats* gc_stats) {
uint64_t now = EpochNow();
std::shared_ptr<Reader> reader =
bfptr->OpenSequentialReader(env_, db_options_, env_options_);
@ -1683,8 +1686,6 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool first_gc = bfptr->gc_once_after_open_;
ColumnFamilyHandle* cfh = bfptr->GetColumnFamily(db_);
auto cfhi = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh);
auto cfd = cfhi->cfd();
bool has_ttl = header.HasTTL();
// this reads the key but skips the blob
@ -1692,7 +1693,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
assert(opt_db_);
bool no_relocation_ttl = (has_ttl && tt > bfptr->GetTTLRange().second);
bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second);
bool no_relocation_lsmdel = false;
{
@ -1711,136 +1712,199 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
BlobLogRecord record;
std::shared_ptr<BlobFile> newfile;
std::shared_ptr<Writer> new_writer;
Transaction* transaction = nullptr;
uint64_t blob_offset = 0;
bool retry = false;
while (reader->ReadRecord(&record, shallow).ok()) {
gcstats->blob_count++;
static const WriteOptions kGarbageCollectionWriteOptions = []() {
WriteOptions write_options;
// TODO(yiwu): Disable WAL for garbage colection to make it compatible with
// use cases that don't use WAL. However without WAL there are at least
// two issues with crash:
// 1. If a key is dropped from blob file (e.g. due to TTL), right before a
// crash, the key may still presents in LSM after restart.
// 2. If a key is relocated to another blob file, right before a crash,
// after restart the new offset may be lost with the old offset pointing
// to the removed blob file.
// We need to have better recovery mechanism to address these issues.
write_options.disableWAL = true;
// It is ok to ignore column families that were dropped.
write_options.ignore_missing_column_families = true;
return write_options;
}();
bool del_this = false;
bool reloc_this = false;
// TODO(yiwu): The following logic should use GetForUpdate() from
// optimistic transaction to check if the key is current, otherwise
// there can be another writer sneak in between sequence number of
// and the deletion.
// this particular TTL has expired
if (no_relocation_ttl || (has_ttl && tt > record.GetTTL())) {
del_this = true;
} else if (!first_gc) {
SequenceNumber seq = kMaxSequenceNumber;
bool found_record_for_key = false;
SuperVersion* sv = db_impl_->GetAndRefSuperVersion(cfd);
if (sv == nullptr) {
Status result =
Status::InvalidArgument("Could not access column family 0");
return result;
while (true) {
assert(s.ok());
if (retry) {
// Retry in case transaction fail with Status::TryAgain.
retry = false;
} else {
// Read the next blob record.
Status read_record_status =
reader->ReadRecord(&record, shallow, &blob_offset);
// Exit if we reach the end of blob file.
// TODO(yiwu): properly handle ReadRecord error.
if (!read_record_status.ok()) {
break;
}
Status s1 = db_impl_->GetLatestSequenceForKey(
sv, record.Key(), false, &seq, &found_record_for_key);
if (found_record_for_key && seq == record.GetSN()) {
reloc_this = true;
}
db_impl_->ReturnAndCleanupSuperVersion(cfd, sv);
gc_stats->blob_count++;
}
if (del_this) {
gcstats->num_deletes++;
gcstats->deleted_size += record.GetBlobSize();
if (first_gc) continue;
transaction =
opt_db_->BeginTransaction(kGarbageCollectionWriteOptions,
OptimisticTransactionOptions(), transaction);
Transaction* txn = static_cast<OptimisticTransactionDB*>(opt_db_.get())
->BeginTransaction(write_options_);
txn->Delete(cfh, record.Key());
Status s1 = txn->Commit();
// chances that this DELETE will fail is low. If it fails, it would be
// because a new version of the key came in at this time, which will
// override the current version being iterated on.
if (!s1.IsBusy()) {
// assume that failures happen due to new writes.
gcstats->overrided_while_delete++;
}
delete txn;
std::string index_entry;
Status get_status = transaction->GetForUpdate(ReadOptions(), cfh,
record.Key(), &index_entry);
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate");
if (get_status.IsNotFound()) {
// Key has been deleted. Drop the blob record.
continue;
}
if (!get_status.ok()) {
s = get_status;
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while getting index entry: %s",
s.ToString().c_str());
break;
}
if (reloc_this) {
if (!newfile) {
// new file
std::string reason("GC of ");
reason += bfptr->PathName();
newfile = NewBlobFile(reason);
gcstats->newfile = newfile;
// TODO(yiwu): We should have an override of GetForUpdate returning a
// PinnableSlice.
Slice index_entry_slice(index_entry);
BlobHandle handle;
s = handle.DecodeFrom(&index_entry_slice);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while decoding index entry: %s",
s.ToString().c_str());
break;
}
if (handle.filenumber() != bfptr->BlobFileNumber() ||
handle.offset() != blob_offset) {
// Key has been overwritten. Drop the blob record.
continue;
}
new_writer = CheckOrCreateWriterLocked(newfile);
newfile->header_ = std::move(header);
// Can't use header beyond this point
newfile->header_valid_ = true;
newfile->file_size_ = BlobLogHeader::kHeaderSize;
s = new_writer->WriteHeader(newfile->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File: %s - header writing failed",
newfile->PathName().c_str());
return s;
}
WriteLock wl(&mutex_);
dir_change_.store(true);
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
}
gcstats->num_relocs++;
std::string index_entry;
uint64_t blob_offset = 0;
uint64_t key_offset = 0;
// write the blob to the blob log.
s = new_writer->AddRecord(record.Key(), record.Blob(), &key_offset,
&blob_offset, record.GetTTL());
BlobHandle handle;
handle.set_filenumber(newfile->BlobFileNumber());
handle.set_size(record.Blob().size());
handle.set_offset(blob_offset);
handle.set_compression(bdb_options_.compression);
handle.EncodeTo(&index_entry);
new_writer->AddRecordFooter(record.GetSN());
newfile->blob_count_++;
newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() +
record.Blob().size() + BlobLogRecord::kFooterSize;
Transaction* txn = opt_db_->BeginTransaction(
write_options_, OptimisticTransactionOptions(), nullptr);
txn->Put(cfh, record.Key(), index_entry);
Status s1 = txn->Commit();
// chances that this Put will fail is low. If it fails, it would be
// because a new version of the key came in at this time, which will
// override the current version being iterated on.
if (s1.IsBusy()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Optimistic transaction failed: %s put bn: %" PRIu32,
bfptr->PathName().c_str(), gcstats->blob_count);
// If key has expired, remove it from base DB.
if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) {
gc_stats->num_deletes++;
gc_stats->deleted_size += record.GetBlobSize();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
transaction->Delete(cfh, record.Key());
Status delete_status = transaction->Commit();
if (delete_status.ok()) {
gc_stats->delete_succeeded++;
} else if (delete_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
gc_stats->overwritten_while_delete++;
} else if (delete_status.IsTryAgain()) {
// Retry the transaction.
retry = true;
} else {
gcstats->succ_relocs++;
ROCKS_LOG_DEBUG(db_options_.info_log,
"Successfully added put back into LSM: %s bn: %" PRIu32,
bfptr->PathName().c_str(), gcstats->blob_count);
// We hit an error.
s = delete_status;
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while deleting expired key: %s",
s.ToString().c_str());
break;
}
delete txn;
// Continue to next blob record or retry.
continue;
}
if (first_gc) {
// Do not relocate blob record for initial GC.
continue;
}
// Relocate the blob record to new file.
if (!newfile) {
// new file
std::string reason("GC of ");
reason += bfptr->PathName();
newfile = NewBlobFile(reason);
gc_stats->newfile = newfile;
new_writer = CheckOrCreateWriterLocked(newfile);
newfile->header_ = std::move(header);
// Can't use header beyond this point
newfile->header_valid_ = true;
newfile->file_size_ = BlobLogHeader::kHeaderSize;
s = new_writer->WriteHeader(newfile->header_);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"File: %s - header writing failed",
newfile->PathName().c_str());
break;
}
WriteLock wl(&mutex_);
dir_change_.store(true);
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
}
gc_stats->num_relocate++;
std::string new_index_entry;
uint64_t new_blob_offset = 0;
uint64_t new_key_offset = 0;
// write the blob to the blob log.
s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset,
&new_blob_offset, record.GetTTL());
BlobHandle new_handle;
new_handle.set_filenumber(newfile->BlobFileNumber());
new_handle.set_size(record.Blob().size());
new_handle.set_offset(new_blob_offset);
new_handle.set_compression(bdb_options_.compression);
new_handle.EncodeTo(&new_index_entry);
new_writer->AddRecordFooter(record.GetSN());
newfile->blob_count_++;
newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() +
record.Blob().size() + BlobLogRecord::kFooterSize;
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
transaction->Put(cfh, record.Key(), new_index_entry);
Status put_status = transaction->Commit();
if (put_status.ok()) {
gc_stats->relocate_succeeded++;
} else if (put_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
gc_stats->overwritten_while_relocate++;
} else if (put_status.IsTryAgain()) {
// Retry the transaction.
// TODO(yiwu): On retry, we can reuse the new blob record.
retry = true;
} else {
// We hit an error.
s = put_status;
ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s",
s.ToString().c_str());
break;
}
} // end of ReadRecord loop
if (transaction != nullptr) {
delete transaction;
}
ROCKS_LOG_INFO(
db_options_.info_log, "%s blob file %" PRIu64 ".",
". Total blob records: %" PRIu64 ", Deletes: %" PRIu64 "/%" PRIu64
" succeeded, Relocates: %" PRIu64 "/%" PRIu64 " succeeded.",
s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->delete_succeeded,
gc_stats->num_deletes, gc_stats->relocate_succeeded,
gc_stats->num_relocate);
if (newfile != nullptr) {
total_blob_space_ += newfile->file_size_;
ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
newfile->BlobFileNumber());
}
if (gcstats->newfile) total_blob_space_ += newfile->file_size_;
ROCKS_LOG_INFO(db_options_.info_log,
"File: %s Num deletes %" PRIu32 " Num relocs: %" PRIu32
" Succ Deletes: %" PRIu32 " Succ relocs: %" PRIu32,
bfptr->PathName().c_str(), gcstats->num_deletes,
gcstats->num_relocs, gcstats->succ_deletes_lsm,
gcstats->succ_relocs);
return s;
}
@ -2123,15 +2187,17 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
// in this collect the set of files, which became obsolete
std::vector<std::shared_ptr<BlobFile>> obsoletes;
for (auto bfile : to_process) {
GCStats gcstats;
Status s = GCFileAndUpdateLSM(bfile, &gcstats);
if (!s.ok()) continue;
GCStats gc_stats;
Status s = GCFileAndUpdateLSM(bfile, &gc_stats);
if (!s.ok()) {
continue;
}
if (bfile->gc_once_after_open_.load()) {
WriteLock lockbfile_w(&bfile->mutex_);
bfile->deleted_size_ = gcstats.deleted_size;
bfile->deleted_count_ = gcstats.num_deletes;
bfile->deleted_size_ = gc_stats.deleted_size;
bfile->deleted_count_ = gc_stats.num_deletes;
bfile->gc_once_after_open_ = false;
} else {
obsoletes.push_back(bfile);

View File

@ -137,10 +137,13 @@ struct GCStats {
uint64_t blob_count = 0;
uint64_t num_deletes = 0;
uint64_t deleted_size = 0;
uint64_t num_relocs = 0;
uint64_t succ_deletes_lsm = 0;
uint64_t overrided_while_delete = 0;
uint64_t succ_relocs = 0;
uint64_t retry_delete = 0;
uint64_t delete_succeeded = 0;
uint64_t overwritten_while_delete = 0;
uint64_t num_relocate = 0;
uint64_t retry_relocate = 0;
uint64_t relocate_succeeded = 0;
uint64_t overwritten_while_relocate = 0;
std::shared_ptr<BlobFile> newfile = nullptr;
};

View File

@ -14,6 +14,7 @@
#include "port/port.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "utilities/blob_db/blob_db_impl.h"
@ -176,7 +177,7 @@ TEST_F(BlobDBTest, PutWithTTL) {
for (size_t i = 0; i < 100; i++) {
uint64_t ttl = rnd.Next() % 100;
PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
(ttl < 50 ? nullptr : &data));
(ttl <= 50 ? nullptr : &data));
}
mock_env_->set_now_micros(100 * 1000000);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
@ -187,7 +188,7 @@ TEST_F(BlobDBTest, PutWithTTL) {
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
VerifyDB(data);
}
@ -205,7 +206,7 @@ TEST_F(BlobDBTest, PutUntil) {
for (size_t i = 0; i < 100; i++) {
uint64_t expiration = rnd.Next() % 100 + 50;
PutRandomUntil("key" + ToString(i), expiration, &rnd,
(expiration < 100 ? nullptr : &data));
(expiration <= 100 ? nullptr : &data));
}
mock_env_->set_now_micros(100 * 1000000);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
@ -216,7 +217,7 @@ TEST_F(BlobDBTest, PutUntil) {
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
VerifyDB(data);
}
@ -248,7 +249,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(100, gc_stats.num_relocs);
ASSERT_EQ(100, gc_stats.num_relocate);
VerifyDB(data);
}
@ -262,7 +263,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
std::string * /*new_value*/,
bool * /*value_changed*/) override {
*ttl = rnd->Next() % 100;
if (*ttl >= 50) {
if (*ttl > 50) {
data[key.ToString()] = value.ToString();
}
return true;
@ -294,7 +295,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
VerifyDB(data);
}
@ -309,7 +310,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
std::string * /*new_value*/,
bool * /*value_changed*/) override {
*expiration = rnd->Next() % 100 + 50;
if (*expiration >= 100) {
if (*expiration > 100) {
data[key.ToString()] = value.ToString();
}
return true;
@ -341,7 +342,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
VerifyDB(data);
}
@ -384,7 +385,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
std::string value_ttl = value + "ttl:";
PutFixed64(&value_ttl, ttl);
ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl)));
if (ttl >= 50) {
if (ttl > 50) {
data[key] = value;
}
}
@ -397,7 +398,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
ASSERT_EQ(data.size(), gc_stats.num_relocate);
VerifyDB(data);
}
@ -533,9 +534,7 @@ TEST_F(BlobDBTest, MultipleWriters) {
i));
std::map<std::string, std::string> data;
for (size_t i = 0; i < 10; i++) {
if (workers[i].joinable()) {
workers[i].join();
}
workers[i].join();
data.insert(data_set[i].begin(), data_set[i].end());
}
VerifyDB(data);
@ -577,7 +576,7 @@ TEST_F(BlobDBTest, SequenceNumber) {
}
}
TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) {
TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
@ -609,11 +608,81 @@ TEST_F(BlobDBTest, GCShouldKeepKeysWithNewerVersion) {
}
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(200, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(200 - new_keys, gc_stats.num_relocs);
ASSERT_EQ(200 - new_keys, gc_stats.num_relocate);
VerifyDB(data);
}
TEST_F(BlobDBTest, GCRelocateKeyWhileOverwritting) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
BlobDBImpl *blob_db_impl = dynamic_cast<BlobDBImpl*>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
SyncPoint::GetInstance()->EnableProcessing();
auto writer = port::Thread(
[this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); });
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(1, gc_stats.num_relocate);
ASSERT_EQ(0, gc_stats.relocate_succeeded);
ASSERT_EQ(1, gc_stats.overwritten_while_relocate);
writer.join();
VerifyDB({{"foo", "v2"}});
}
TEST_F(BlobDBTest, GCExpiredKeyWhileOverwritting) {
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_now_micros(100 * 1000000);
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200));
BlobDBImpl *blob_db_impl = dynamic_cast<BlobDBImpl*>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
blob_db_impl->TEST_CloseBlobFile(blob_files[0]);
mock_env_->set_now_micros(300 * 1000000);
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
SyncPoint::GetInstance()->EnableProcessing();
auto writer = port::Thread([this]() {
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400));
});
GCStats gc_stats;
ASSERT_OK(blob_db_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(1, gc_stats.blob_count);
ASSERT_EQ(1, gc_stats.num_deletes);
ASSERT_EQ(0, gc_stats.delete_succeeded);
ASSERT_EQ(1, gc_stats.overwritten_while_delete);
ASSERT_EQ(0, gc_stats.num_relocate);
writer.join();
VerifyDB({{"foo", "v2"}});
}
} // namespace blob_db
} // namespace rocksdb

View File

@ -41,7 +41,7 @@ Status Reader::ReadHeader(BlobLogHeader* header) {
}
Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
WALRecoveryMode wal_recovery_mode) {
uint64_t* blob_offset) {
record->Clear();
buffer_.clear();
backing_store_[0] = '\0';
@ -65,6 +65,9 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
header_crc = crc32c::Extend(header_crc, buffer_.data(), crc_data_size);
uint64_t kb_size = record->GetKeySize() + record->GetBlobSize();
if (blob_offset != nullptr) {
*blob_offset = next_byte_ + record->GetKeySize();
}
switch (level) {
case kReadHdrFooter:
file_->Skip(kb_size);

View File

@ -60,9 +60,9 @@ class Reader {
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
// If blob_offset is non-null, return offset of the blob through it.
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHdrFooter,
WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords);
uint64_t* blob_offset = nullptr);
SequentialFileReader* file() { return file_.get(); }