Blob DB: not writing sequence number as blob record footer

Summary:
Previously each time we write a blob we write blog_record_header + key + value + blob_record_footer to blob log. The footer only contains a sequence and a crc for the sequence number. The sequence number was used in garbage collection to verify the value is recent. After #2703 we moved to use optimistic transaction and no longer use sequence number from the footer. Remove the footer altogether.

There's another usage of sequence number and we are keeping it: Each blob log file keep track of sequence number range of keys in it, and use it to check if it is reference by a snapshot, before being deleted.
Closes https://github.com/facebook/rocksdb/pull/3005

Differential Revision: D6057585

Pulled By: yiwu-arbug

fbshipit-source-id: d6da53c457a316e9723f359a1b47facfc3ffe090
This commit is contained in:
Yi Wu 2017-10-17 12:11:52 -07:00
parent 8afb0036ca
commit 419b93c56f
12 changed files with 140 additions and 360 deletions

View File

@ -47,12 +47,6 @@ void extendTimestamps(rocksdb::blob_db::tsrange_t* ts_range, uint64_t ts) {
ts_range->first = std::min(ts_range->first, ts);
ts_range->second = std::max(ts_range->second, ts);
}
void extendSN(rocksdb::blob_db::snrange_t* sn_range,
rocksdb::SequenceNumber sn) {
sn_range->first = std::min(sn_range->first, sn);
sn_range->second = std::max(sn_range->second, sn);
}
} // end namespace
namespace rocksdb {
@ -438,12 +432,10 @@ Status BlobDBImpl::OpenAllFiles() {
std::numeric_limits<uint32_t>::min());
tsrange_t ts_range(std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::min());
snrange_t sn_range(std::numeric_limits<SequenceNumber>::max(),
std::numeric_limits<SequenceNumber>::min());
uint64_t blob_count = 0;
BlobLogRecord record;
Reader::ReadLevel shallow = Reader::kReadHdrKeyFooter;
Reader::ReadLevel shallow = Reader::kReadHeaderKey;
uint64_t record_start = reader->GetNextByte();
// TODO(arahut) - when we detect corruption, we should truncate
@ -455,7 +447,6 @@ Status BlobDBImpl::OpenAllFiles() {
if (bfptr->HasTimestamp()) {
extendTimestamps(&ts_range, record.GetTimeVal());
}
extendSN(&sn_range, record.GetSN());
record_start = reader->GetNextByte();
}
@ -473,16 +464,15 @@ Status BlobDBImpl::OpenAllFiles() {
}
bfptr->SetBlobCount(blob_count);
bfptr->SetSNRange(sn_range);
bfptr->SetSNRange({0, 0});
if (bfptr->HasTimestamp()) bfptr->set_time_range(ts_range);
ROCKS_LOG_INFO(db_options_.info_log,
"Blob File: %s blob_count: %" PRIu64
" size_bytes: %" PRIu64
" sn_range: (%d, %d) ts: %d ttl: %d",
bfpath.c_str(), blob_count, size_bytes, sn_range.first,
sn_range.second, bfptr->HasTimestamp(), bfptr->HasTTL());
" size_bytes: %" PRIu64 " ts: %d ttl: %d",
bfpath.c_str(), blob_count, size_bytes,
bfptr->HasTimestamp(), bfptr->HasTTL());
if (bfptr->HasTTL()) {
ttl_range.second =
@ -566,11 +556,11 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
}
Writer::ElemType et = Writer::kEtNone;
if (bfile->file_size_ == BlobLogHeader::kHeaderSize)
if (bfile->file_size_ == BlobLogHeader::kHeaderSize) {
et = Writer::kEtFileHdr;
else if (bfile->file_size_ > BlobLogHeader::kHeaderSize)
et = Writer::kEtFooter;
else if (bfile->file_size_) {
} else if (bfile->file_size_ > BlobLogHeader::kHeaderSize) {
et = Writer::kEtRecord;
} else if (bfile->file_size_) {
ROCKS_LOG_WARN(db_options_.info_log,
"Open blob file: %s with wrong size: %d", fpath.c_str(),
boffset);
@ -772,14 +762,13 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
BlobDBImpl* impl_;
SequenceNumber sequence_;
WriteBatch updates_blob_;
Status batch_rewrite_status_;
std::shared_ptr<BlobFile> last_file_;
bool has_put_;
std::string new_value_;
uint32_t default_cf_id_;
public:
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
: impl_(impl),
sequence_(seq),
has_put_(false),
@ -788,9 +777,9 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
->cfd()
->GetID()) {}
WriteBatch& updates_blob() { return updates_blob_; }
SequenceNumber sequence() { return sequence_; }
Status batch_rewrite_status() { return batch_rewrite_status_; }
WriteBatch* updates_blob() { return &updates_blob_; }
std::shared_ptr<BlobFile>& last_file() { return last_file_; }
@ -799,9 +788,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value_slice) override {
if (column_family_id != default_cf_id_) {
batch_rewrite_status_ = Status::NotSupported(
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
return batch_rewrite_status_;
}
Slice value_unc;
uint64_t expiration =
@ -812,13 +800,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
? impl_->SelectBlobFileTTL(expiration)
: ((last_file_) ? last_file_ : impl_->SelectBlobFile());
if (last_file_ && last_file_ != bfile) {
batch_rewrite_status_ = Status::NotFound("too many blob files");
return batch_rewrite_status_;
return Status::NotFound("too many blob files");
}
if (!bfile) {
batch_rewrite_status_ = Status::NotFound("blob file not found");
return batch_rewrite_status_;
return Status::NotFound("blob file not found");
}
last_file_ = bfile;
@ -830,31 +816,26 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
std::string index_entry;
Status st = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry);
if (st.ok()) {
impl_->AppendSN(last_file_, sequence_);
sequence_++;
Status s = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry);
if (!s.ok()) {
return s;
}
bfile->ExtendSequenceRange(sequence_);
sequence_++;
if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration);
}
if (!st.ok()) {
batch_rewrite_status_ = st;
} else {
WriteBatchInternal::Put(&updates_blob_, column_family_id, key,
index_entry);
}
return Status::OK();
return WriteBatchInternal::Put(&updates_blob_, column_family_id, key,
index_entry);
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (column_family_id != default_cf_id_) {
batch_rewrite_status_ = Status::NotSupported(
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
return batch_rewrite_status_;
}
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key);
sequence_++;
@ -864,27 +845,23 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
virtual Status DeleteRange(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) {
if (column_family_id != default_cf_id_) {
batch_rewrite_status_ = Status::NotSupported(
return Status::NotSupported(
"Blob DB doesn't support non-default column family.");
return batch_rewrite_status_;
}
WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id,
begin_key, end_key);
sequence_++;
return Status::OK();
}
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
const Slice& /*key*/) override {
batch_rewrite_status_ =
Status::NotSupported("Not supported operation in blob db.");
return batch_rewrite_status_;
return Status::NotSupported("Not supported operation in blob db.");
}
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
const Slice& /*value*/) override {
batch_rewrite_status_ =
Status::NotSupported("Not supported operation in blob db.");
return batch_rewrite_status_;
return Status::NotSupported("Not supported operation in blob db.");
}
virtual void LogData(const Slice& blob) override {
@ -894,19 +871,20 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
MutexLock l(&write_mutex_);
SequenceNumber sequence = db_impl_->GetLatestSequenceNumber() + 1;
BlobInserter blob_inserter(this, sequence);
updates->Iterate(&blob_inserter);
if (!blob_inserter.batch_rewrite_status().ok()) {
return blob_inserter.batch_rewrite_status();
}
Status s = db_->Write(opts, &(blob_inserter.updates_blob()));
SequenceNumber current_seq = db_impl_->GetLatestSequenceNumber() + 1;
BlobInserter blob_inserter(this, current_seq);
Status s = updates->Iterate(&blob_inserter);
if (!s.ok()) {
return s;
}
s = db_->Write(opts, blob_inserter.updates_blob());
if (!s.ok()) {
return s;
}
assert(current_seq ==
WriteBatchInternal::Sequence(blob_inserter.updates_blob()));
assert(blob_inserter.sequence() ==
current_seq + WriteBatchInternal::Count(blob_inserter.updates_blob()));
if (blob_inserter.has_put()) {
s = CloseBlobFileIfNeeded(blob_inserter.last_file());
if (!s.ok()) {
@ -942,7 +920,7 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
};
// add deleted key to list of keys that have been deleted for book-keeping
DeleteBookkeeper delete_bookkeeper(this, sequence);
DeleteBookkeeper delete_bookkeeper(this, current_seq);
updates->Iterate(&delete_bookkeeper);
return Status::OK();
@ -1051,20 +1029,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
// this is the sequence number of the write.
SequenceNumber sn = WriteBatchInternal::Sequence(&batch);
if (debug_level_ >= 3)
ROCKS_LOG_INFO(db_options_.info_log, "<Adding KEY FILE: %s: KEY: %s SN: %d",
bfile->PathName().c_str(), key.ToString().c_str(), sn);
s = AppendSN(bfile, sn);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to append SN to FILE: %s: KEY: %s VALSZ: %d"
" status: '%s' blob_file: '%s'",
bfile->PathName().c_str(), key.ToString().c_str(),
value.size(), s.ToString().c_str(),
bfile->DumpState().c_str());
}
bfile->ExtendSequenceRange(sn);
if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration);
@ -1140,32 +1105,6 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
return s;
}
Status BlobDBImpl::AppendSN(const std::shared_ptr<BlobFile>& bfile,
const SequenceNumber& sn) {
Status s;
{
WriteLock lockbfile_w(&bfile->mutex_);
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
if (!writer) return Status::IOError("Failed to create blob writer");
s = writer->AddRecordFooter(sn);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Invalid status in AppendSN: %s status: '%s'",
bfile->PathName().c_str(), s.ToString().c_str());
return s;
}
if (sn != std::numeric_limits<SequenceNumber>::max())
extendSN(&(bfile->sn_range_), sn);
}
bfile->file_size_ += BlobLogRecord::kFooterSize;
last_period_write_ += BlobLogRecord::kFooterSize;
total_blob_space_ += BlobLogRecord::kFooterSize;
return s;
}
std::vector<Status> BlobDBImpl::MultiGet(
const ReadOptions& read_options,
const std::vector<Slice>& keys, std::vector<std::string>* values) {
@ -1205,7 +1144,8 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
}
Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
std::string* value, SequenceNumber* sequence) {
std::string* value) {
assert(value != nullptr);
Slice index_entry_slice(index_entry);
BlobHandle handle;
Status s = handle.DecodeFrom(&index_entry_slice);
@ -1249,90 +1189,69 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
std::shared_ptr<RandomAccessFileReader> reader =
GetOrOpenRandomAccessReader(bfile, env_, env_options_);
if (value != nullptr) {
std::string* valueptr = value;
std::string value_c;
if (bdb_options_.compression != kNoCompression) {
valueptr = &value_c;
}
// allocate the buffer. This is safe in C++11
valueptr->resize(handle.size());
char* buffer = &(*valueptr)[0];
Slice blob_value;
s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer);
if (!s.ok() || blob_value.size() != handle.size()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to read blob from file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
static_cast<int>(blob_value.size()), key.data(),
s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
}
Slice crc_slice;
uint32_t crc_exp;
std::string crc_str;
crc_str.resize(sizeof(uint32_t));
char* crc_buffer = &(crc_str[0]);
s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)),
sizeof(uint32_t), &crc_slice, crc_buffer);
if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(
db_options_.info_log,
"Failed to fetch blob crc file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
key.data(), s.ToString().c_str());
}
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
}
uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size());
crc = crc32c::Mask(crc); // Adjust for storage
if (crc != crc_exp) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Blob crc mismatch file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(),
handle.size(), key.data(), s.ToString().c_str());
}
return Status::Corruption("Corruption. Blob CRC mismatch");
}
if (bdb_options_.compression != kNoCompression) {
BlockContents contents;
auto cfh =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
s = UncompressBlockContentsForCompressionType(
blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
*(cfh->cfd()->ioptions()));
*value = contents.data.ToString();
}
std::string* valueptr = value;
std::string value_c;
if (bdb_options_.compression != kNoCompression) {
valueptr = &value_c;
}
if (sequence != nullptr) {
char buffer[BlobLogRecord::kFooterSize];
Slice footer_slice;
s = reader->Read(handle.offset() + handle.size(),
BlobLogRecord::kFooterSize, &footer_slice, buffer);
if (!s.ok()) {
return s;
// allocate the buffer. This is safe in C++11
valueptr->resize(handle.size());
char* buffer = &(*valueptr)[0];
Slice blob_value;
s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer);
if (!s.ok() || blob_value.size() != handle.size()) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to read blob from file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
static_cast<int>(blob_value.size()), key.data(),
s.ToString().c_str());
}
BlobLogRecord record;
s = record.DecodeFooterFrom(footer_slice);
if (!s.ok()) {
return s;
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
}
Slice crc_slice;
uint32_t crc_exp;
std::string crc_str;
crc_str.resize(sizeof(uint32_t));
char* crc_buffer = &(crc_str[0]);
s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)),
sizeof(uint32_t), &crc_slice, crc_buffer);
if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to fetch blob crc file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
key.data(), s.ToString().c_str());
}
*sequence = record.GetSN();
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
}
uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size());
crc = crc32c::Mask(crc); // Adjust for storage
if (crc != crc_exp) {
if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Blob crc mismatch file: %s blob_offset: %" PRIu64
" blob_size: %" PRIu64 " key: %s status: '%s'",
bfile->PathName().c_str(), handle.offset(), handle.size(),
key.data(), s.ToString().c_str());
}
return Status::Corruption("Corruption. Blob CRC mismatch");
}
if (bdb_options_.compression != kNoCompression) {
BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
s = UncompressBlockContentsForCompressionType(
blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
*(cfh->cfd()->ioptions()));
*value = contents.data.ToString();
}
return s;
@ -1488,8 +1407,7 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
WriteLock lockbfile_w(&bfile->mutex_);
bfile->deleted_count_++;
bfile->deleted_size_ += key_size + blob_size + BlobLogRecord::kHeaderSize +
BlobLogRecord::kFooterSize;
bfile->deleted_size_ += key_size + blob_size + BlobLogRecord::kHeaderSize;
return true;
}
@ -1742,7 +1660,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool has_ttl = header.HasTTL();
// this reads the key but skips the blob
Reader::ReadLevel shallow = Reader::kReadHdrKeyFooter;
Reader::ReadLevel shallow = Reader::kReadHeaderKey;
assert(opt_db_);
@ -1759,7 +1677,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool no_relocation = no_relocation_ttl || no_relocation_lsmdel;
if (!no_relocation) {
// read the blob because you have to write it back to new file
shallow = Reader::kReadHdrKeyBlobFooter;
shallow = Reader::kReadHeaderKeyBlob;
}
BlobLogRecord record;
@ -1906,10 +1824,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
new_handle.set_compression(bdb_options_.compression);
new_handle.EncodeTo(&new_index_entry);
new_writer->AddRecordFooter(record.GetSN());
newfile->blob_count_++;
newfile->file_size_ += BlobLogRecord::kHeaderSize + record.Key().size() +
record.Blob().size() + BlobLogRecord::kFooterSize;
newfile->file_size_ +=
BlobLogRecord::kHeaderSize + record.Key().size() + record.Blob().size();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
transaction->Put(cfh, record.Key(), new_index_entry);
@ -2105,7 +2022,7 @@ bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
ColumnFamilyHandle* cfh = bfile->GetColumnFamily(db_);
BlobLogRecord record;
Reader::ReadLevel full = Reader::kReadHdrKeyBlobFooter;
Reader::ReadLevel full = Reader::kReadHeaderKeyBlob;
while (reader->ReadRecord(&record, full).ok()) {
bdb_options_.gc_evict_cb_fn(cfh, record.Key(), record.Blob());
}
@ -2320,16 +2237,6 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options,
}
#ifndef NDEBUG
Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
SequenceNumber* sequence) {
std::string index_entry;
Status s = db_->Get(ReadOptions(), key, &index_entry);
if (!s.ok()) {
return s;
}
return CommonGet(key, index_entry, nullptr, sequence);
}
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
ReadLock l(&mutex_);
std::vector<std::shared_ptr<BlobFile>> blob_files;

View File

@ -252,8 +252,6 @@ class BlobDBImpl : public BlobDB {
~BlobDBImpl();
#ifndef NDEBUG
Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence);
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
@ -278,7 +276,7 @@ class BlobDBImpl : public BlobDB {
bool SetSnapshotIfNeeded(ReadOptions* read_options);
Status CommonGet(const Slice& key, const std::string& index_entry,
std::string* value, SequenceNumber* sequence = nullptr);
std::string* value);
Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const;
@ -310,9 +308,6 @@ class BlobDBImpl : public BlobDB {
const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry);
Status AppendSN(const std::shared_ptr<BlobFile>& bfile,
const SequenceNumber& sn);
// find an existing blob log file based on the expiration unix epoch
// if such a file does not exist, return nullptr
std::shared_ptr<BlobFile> SelectBlobFileTTL(uint64_t expiration);

View File

@ -530,43 +530,6 @@ TEST_F(BlobDBTest, MultipleWriters) {
VerifyDB(data);
}
// Test sequence number store in blob file is correct.
TEST_F(BlobDBTest, SequenceNumber) {
Random rnd(301);
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
SequenceNumber sequence = blob_db_->GetLatestSequenceNumber();
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
for (int i = 0; i < 100; i++) {
std::string key = "key" + ToString(i);
PutRandom(key, &rnd);
sequence += 1;
ASSERT_EQ(sequence, blob_db_->GetLatestSequenceNumber());
SequenceNumber actual_sequence = 0;
ASSERT_OK(blob_db_impl->TEST_GetSequenceNumber(key, &actual_sequence));
ASSERT_EQ(sequence, actual_sequence);
}
for (int i = 0; i < 100; i++) {
WriteBatch batch;
size_t batch_size = rnd.Next() % 10 + 1;
for (size_t k = 0; k < batch_size; k++) {
std::string value = test::RandomHumanReadableString(&rnd, 1000);
ASSERT_OK(batch.Put("key" + ToString(i) + "-" + ToString(k), value));
}
ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
for (size_t k = 0; k < batch_size; k++) {
std::string key = "key" + ToString(i) + "-" + ToString(k);
sequence++;
SequenceNumber actual_sequence;
ASSERT_OK(blob_db_impl->TEST_GetSequenceNumber(key, &actual_sequence));
ASSERT_EQ(sequence, actual_sequence);
}
ASSERT_EQ(sequence, blob_db_->GetLatestSequenceNumber());
}
}
TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
Random rnd(301);
BlobDBOptions bdb_options;

View File

@ -185,7 +185,7 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
uint32_t header_crc =
crc32c::Extend(0, slice.data(), slice.size() - 2 * sizeof(uint32_t));
*offset += BlobLogRecord::kHeaderSize;
s = Read(*offset, key_size + blob_size + BlobLogRecord::kFooterSize, &slice);
s = Read(*offset, key_size + blob_size, &slice);
if (!s.ok()) {
return s;
}
@ -207,15 +207,7 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
DumpSlice(Slice(slice.data() + key_size, blob_size), show_blob);
}
}
Slice footer_slice(slice.data() + record.GetKeySize() + record.GetBlobSize(),
BlobLogRecord::kFooterSize);
s = record.DecodeFooterFrom(footer_slice);
if (!s.ok()) {
return s;
}
fprintf(stdout, " footer CRC : %" PRIu32 "\n", record.footer_checksum());
fprintf(stdout, " sequence : %" PRIu64 "\n", record.GetSN());
*offset += key_size + blob_size + BlobLogRecord::kFooterSize;
*offset += key_size + blob_size;
return s;
}

View File

@ -5,8 +5,14 @@
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_file.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <stdio.h>
#include <cinttypes>
#include <algorithm>
#include <memory>
#include "util/filename.h"

View File

@ -147,6 +147,11 @@ class BlobFile {
return header_.HasTimestamp();
}
void ExtendSequenceRange(SequenceNumber sequence) {
sn_range_.first = std::min(sn_range_.first, sequence);
sn_range_.second = std::max(sn_range_.second, sequence);
}
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
void Fsync();

View File

@ -224,7 +224,6 @@ BlobLogRecord::BlobLogRecord()
blob_size_(0),
time_val_(0),
ttl_val_(0),
sn_(0),
type_(0),
subtype_(0) {}
@ -249,7 +248,6 @@ void BlobLogRecord::Clear() {
blob_size_ = 0;
time_val_ = 0;
ttl_val_ = 0;
sn_ = 0;
type_ = subtype_ = 0;
key_.clear();
blob_.clear();
@ -289,30 +287,6 @@ Status BlobLogRecord::DecodeHeaderFrom(const Slice& hdrslice) {
return Status::OK();
}
Status BlobLogRecord::DecodeFooterFrom(const Slice& footerslice) {
Slice input = footerslice;
if (input.size() < kFooterSize) {
return Status::Corruption("Invalid Blob Record Footer: size");
}
uint32_t f_crc = crc32c::Extend(0, input.data(), 8);
f_crc = crc32c::Mask(f_crc);
if (!GetFixed64(&input, &sn_)) {
return Status::Corruption("Invalid Blob Record Footer: sn");
}
if (!GetFixed32(&input, &footer_cksum_)) {
return Status::Corruption("Invalid Blob Record Footer: cksum");
}
if (f_crc != footer_cksum_) {
return Status::Corruption("Record Checksum mismatch: footer_cksum");
}
return Status::OK();
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -188,8 +188,6 @@ class BlobLogRecord {
uint64_t blob_size_;
uint64_t time_val_;
uint64_t ttl_val_;
SequenceNumber sn_;
uint32_t footer_cksum_;
char type_;
char subtype_;
Slice key_;
@ -218,8 +216,6 @@ class BlobLogRecord {
// = 42
static const size_t kHeaderSize = 4 + 4 + 8 + 8 + 4 + 8 + 1 + 1;
static const size_t kFooterSize = 8 + 4;
public:
BlobLogRecord();
@ -245,17 +241,11 @@ class BlobLogRecord {
char subtype() const { return subtype_; }
SequenceNumber GetSN() const { return sn_; }
uint32_t header_checksum() const { return header_cksum_; }
uint32_t checksum() const { return checksum_; }
uint32_t footer_checksum() const { return footer_cksum_; }
Status DecodeHeaderFrom(const Slice& hdrslice);
Status DecodeFooterFrom(const Slice& footerslice);
};
} // namespace blob_db

View File

@ -69,21 +69,11 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
*blob_offset = next_byte_ + record->GetKeySize();
}
switch (level) {
case kReadHdrFooter:
case kReadHeader:
file_->Skip(kb_size);
next_byte_ += kb_size;
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached before record footer");
}
status = record->DecodeFooterFrom(buffer_);
return status;
case kReadHdrKeyFooter:
case kReadHeaderKey:
record->ResizeKeyBuffer(record->GetKeySize());
status = file_->Read(record->GetKeySize(), &record->key_,
record->GetKeyBuffer());
@ -103,18 +93,7 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
file_->Skip(record->GetBlobSize());
next_byte_ += record->GetBlobSize();
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached during footer read");
}
status = record->DecodeFooterFrom(buffer_);
return status;
case kReadHdrKeyBlobFooter:
case kReadHeaderKeyBlob:
record->ResizeKeyBuffer(record->GetKeySize());
status = file_->Read(record->GetKeySize(), &record->key_,
record->GetKeyBuffer());
@ -146,21 +125,8 @@ Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
if (blob_crc != record->checksum_) {
return Status::Corruption("Blob Checksum mismatch");
}
status =
file_->Read(BlobLogRecord::kFooterSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kFooterSize) {
return Status::IOError("EOF reached during blob footer read");
}
status = record->DecodeFooterFrom(buffer_);
return status;
default:
assert(0);
return status;
}
return status;
}
} // namespace blob_db

View File

@ -32,9 +32,9 @@ namespace blob_db {
class Reader {
public:
enum ReadLevel {
kReadHdrFooter,
kReadHdrKeyFooter,
kReadHdrKeyBlobFooter,
kReadHeader,
kReadHeaderKey,
kReadHeaderKeyBlob,
};
// Create a reader that will return log records from "*file".
@ -61,7 +61,7 @@ class Reader {
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
// If blob_offset is non-null, return offset of the blob through it.
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHdrFooter,
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader,
uint64_t* blob_offset = nullptr);
SequentialFileReader* file() { return file_.get(); }

View File

@ -2,7 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_writer.h"
@ -53,7 +52,7 @@ Status Writer::WriteHeader(const BlobLogHeader& header) {
Status Writer::AppendFooter(const BlobLogFooter& footer) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string str;
footer.EncodeTo(&str);
@ -73,7 +72,7 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset,
uint64_t ttl) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf;
ConstructBlobHeader(&buf, key, val, ttl, -1);
@ -85,7 +84,7 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtFooter);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf;
ConstructBlobHeader(&buf, key, val, -1, -1);
@ -134,7 +133,12 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
Status s = dest_->Append(Slice(headerbuf));
if (s.ok()) {
s = dest_->Append(key);
if (s.ok()) s = dest_->Append(val);
}
if (s.ok()) {
s = dest_->Append(val);
}
if (s.ok()) {
s = dest_->Flush();
}
*key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
@ -144,25 +148,6 @@ Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
return s;
}
Status Writer::AddRecordFooter(const SequenceNumber& seq) {
assert(last_elem_type_ == kEtRecord);
std::string buf;
PutFixed64(&buf, seq);
uint32_t footer_crc = crc32c::Extend(0, buf.c_str(), buf.size());
footer_crc = crc32c::Mask(footer_crc);
PutFixed32(&buf, footer_crc);
Status s = dest_->Append(Slice(buf));
block_offset_ += BlobLogRecord::kFooterSize;
if (s.ok()) dest_->Flush();
last_elem_type_ = kEtFooter;
return s;
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -2,7 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#ifndef ROCKSDB_LITE
@ -53,8 +52,6 @@ class Writer {
const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset);
Status AddRecordFooter(const SequenceNumber& sn);
Status AppendFooter(const BlobLogFooter& footer);
Status WriteHeader(const BlobLogHeader& header);
@ -89,7 +86,7 @@ class Writer {
Writer& operator=(const Writer&) = delete;
public:
enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFooter, kEtFileFooter };
enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter };
ElemType last_elem_type_;
};