Blob DB: update blob file format

Summary:
Changing blob file format and some code cleanup around the change. The change with blob log format are:
* Remove timestamp field in blob file header, blob file footer and blob records. The field is not being use and often confuse with expiration field.
* Blob file header now come with column family id, which always equal to default column family id. It leaves room for future support of column family.
* Compression field in blob file header now is a standalone byte (instead of compact encode with flags field)
* Blob file footer now come with its own crc.
* Key length now being uint64_t instead of uint32_t
* Blob CRC now checksum both key and value (instead of value only).
* Some reordering of the fields.

The list of cleanups:
* Better inline comments in blob_log_format.h
* rename ttlrange_t and snrange_t to ExpirationRange and SequenceRange respectively.
* simplify blob_db::Reader
* Move crc checking logic to inside blob_log_format.cc
Closes https://github.com/facebook/rocksdb/pull/3081

Differential Revision: D6171304

Pulled By: yiwu-arbug

fbshipit-source-id: e4373e0d39264441b7e2fbd0caba93ddd99ea2af
This commit is contained in:
Yi Wu 2017-10-27 13:14:34 -07:00
parent d66bb21e18
commit 9e82540901
11 changed files with 455 additions and 831 deletions

View File

@ -37,16 +37,6 @@
namespace { namespace {
int kBlockBasedTableVersionFormat = 2; int kBlockBasedTableVersionFormat = 2;
void extendTTL(rocksdb::blob_db::ttlrange_t* ttl_range, uint64_t ttl) {
ttl_range->first = std::min(ttl_range->first, ttl);
ttl_range->second = std::max(ttl_range->second, ttl);
}
void extendTimestamps(rocksdb::blob_db::tsrange_t* ts_range, uint64_t ts) {
ts_range->first = std::min(ts_range->first, ts);
ts_range->second = std::max(ts_range->second, ts);
}
} // end namespace } // end namespace
namespace rocksdb { namespace rocksdb {
@ -66,10 +56,12 @@ WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
bool blobf_compare_ttl::operator()(const std::shared_ptr<BlobFile>& lhs, bool blobf_compare_ttl::operator()(const std::shared_ptr<BlobFile>& lhs,
const std::shared_ptr<BlobFile>& rhs) const { const std::shared_ptr<BlobFile>& rhs) const {
if (lhs->ttl_range_.first < rhs->ttl_range_.first) return true; if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
return true;
if (lhs->ttl_range_.first > rhs->ttl_range_.first) return false; }
if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
return false;
}
return lhs->BlobFileNumber() > rhs->BlobFileNumber(); return lhs->BlobFileNumber() > rhs->BlobFileNumber();
} }
@ -332,6 +324,7 @@ Status BlobDBImpl::OpenAllFiles() {
bfpath.c_str(), s1.ToString().c_str(), size_bytes); bfpath.c_str(), s1.ToString().c_str(), size_bytes);
continue; continue;
} }
bfptr->SetHasTTL(bfptr->header_.has_ttl);
bfptr->header_valid_ = true; bfptr->header_valid_ = true;
std::shared_ptr<RandomAccessFileReader> ra_reader = std::shared_ptr<RandomAccessFileReader> ra_reader =
@ -355,10 +348,8 @@ Status BlobDBImpl::OpenAllFiles() {
"File found incomplete (w/o footer) %s", bfpath.c_str()); "File found incomplete (w/o footer) %s", bfpath.c_str());
// sequentially iterate over the file and read all the records // sequentially iterate over the file and read all the records
ttlrange_t ttl_range(std::numeric_limits<uint32_t>::max(), ExpirationRange expiration_range(std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::min()); std::numeric_limits<uint32_t>::min());
tsrange_t ts_range(std::numeric_limits<uint32_t>::max(),
std::numeric_limits<uint32_t>::min());
uint64_t blob_count = 0; uint64_t blob_count = 0;
BlobLogRecord record; BlobLogRecord record;
@ -369,10 +360,10 @@ Status BlobDBImpl::OpenAllFiles() {
while (reader->ReadRecord(&record, shallow).ok()) { while (reader->ReadRecord(&record, shallow).ok()) {
++blob_count; ++blob_count;
if (bfptr->HasTTL()) { if (bfptr->HasTTL()) {
extendTTL(&ttl_range, record.GetTTL()); expiration_range.first =
} std::min(expiration_range.first, record.expiration);
if (bfptr->HasTimestamp()) { expiration_range.second =
extendTimestamps(&ts_range, record.GetTimeVal()); std::max(expiration_range.second, record.expiration);
} }
record_start = reader->GetNextByte(); record_start = reader->GetNextByte();
} }
@ -391,24 +382,21 @@ Status BlobDBImpl::OpenAllFiles() {
} }
bfptr->SetBlobCount(blob_count); bfptr->SetBlobCount(blob_count);
bfptr->SetSNRange({0, 0}); bfptr->SetSequenceRange({0, 0});
if (bfptr->HasTimestamp()) bfptr->set_time_range(ts_range);
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,
"Blob File: %s blob_count: %" PRIu64 "Blob File: %s blob_count: %" PRIu64
" size_bytes: %" PRIu64 " ts: %d ttl: %d", " size_bytes: %" PRIu64 " has_ttl: %d",
bfpath.c_str(), blob_count, size_bytes, bfpath.c_str(), blob_count, size_bytes, bfptr->HasTTL());
bfptr->HasTimestamp(), bfptr->HasTTL());
if (bfptr->HasTTL()) { if (bfptr->HasTTL()) {
ttl_range.second = expiration_range.second = std::max(
std::max(ttl_range.second, expiration_range.second,
ttl_range.first + (uint32_t)bdb_options_.ttl_range_secs); expiration_range.first + (uint32_t)bdb_options_.ttl_range_secs);
bfptr->set_ttl_range(ttl_range); bfptr->set_expiration_range(expiration_range);
uint64_t now = EpochNow(); uint64_t now = EpochNow();
if (ttl_range.second < now) { if (expiration_range.second < now) {
Status fstatus = CreateWriterLocked(bfptr); Status fstatus = CreateWriterLocked(bfptr);
if (fstatus.ok()) fstatus = bfptr->WriteFooterAndCloseLocked(); if (fstatus.ok()) fstatus = bfptr->WriteFooterAndCloseLocked();
if (!fstatus.ok()) { if (!fstatus.ok()) {
@ -418,10 +406,11 @@ Status BlobDBImpl::OpenAllFiles() {
bfpath.c_str(), fstatus.ToString().c_str()); bfpath.c_str(), fstatus.ToString().c_str());
continue; continue;
} else { } else {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(
"Blob File Closed: %s now: %d ttl_range: (%d, %d)", db_options_.info_log,
bfpath.c_str(), now, ttl_range.first, "Blob File Closed: %s now: %d expiration_range: (%d, %d)",
ttl_range.second); bfpath.c_str(), now, expiration_range.first,
expiration_range.second);
} }
} else { } else {
open_blob_files_.insert(bfptr); open_blob_files_.insert(bfptr);
@ -483,9 +472,9 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
} }
Writer::ElemType et = Writer::kEtNone; Writer::ElemType et = Writer::kEtNone;
if (bfile->file_size_ == BlobLogHeader::kHeaderSize) { if (bfile->file_size_ == BlobLogHeader::kSize) {
et = Writer::kEtFileHdr; et = Writer::kEtFileHdr;
} else if (bfile->file_size_ > BlobLogHeader::kHeaderSize) { } else if (bfile->file_size_ > BlobLogHeader::kSize) {
et = Writer::kEtRecord; et = Writer::kEtRecord;
} else if (bfile->file_size_) { } else if (bfile->file_size_) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
@ -507,14 +496,14 @@ std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
if (open_blob_files_.empty()) return nullptr; if (open_blob_files_.empty()) return nullptr;
std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>(); std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
tmp->ttl_range_ = std::make_pair(expiration, 0); tmp->expiration_range_ = std::make_pair(expiration, 0);
auto citr = open_blob_files_.equal_range(tmp); auto citr = open_blob_files_.equal_range(tmp);
if (citr.first == open_blob_files_.end()) { if (citr.first == open_blob_files_.end()) {
assert(citr.second == open_blob_files_.end()); assert(citr.second == open_blob_files_.end());
std::shared_ptr<BlobFile> check = *(open_blob_files_.rbegin()); std::shared_ptr<BlobFile> check = *(open_blob_files_.rbegin());
return (check->ttl_range_.second < expiration) ? nullptr : check; return (check->expiration_range_.second < expiration) ? nullptr : check;
} }
if (citr.first != citr.second) return *(citr.first); if (citr.first != citr.second) return *(citr.first);
@ -522,8 +511,8 @@ std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
auto finditr = citr.second; auto finditr = citr.second;
if (finditr != open_blob_files_.begin()) --finditr; if (finditr != open_blob_files_.begin()) --finditr;
bool b2 = (*finditr)->ttl_range_.second < expiration; bool b2 = (*finditr)->expiration_range_.second < expiration;
bool b1 = (*finditr)->ttl_range_.first > expiration; bool b1 = (*finditr)->expiration_range_.first > expiration;
return (b1 || b2) ? nullptr : (*finditr); return (b1 || b2) ? nullptr : (*finditr);
} }
@ -560,9 +549,11 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
return nullptr; return nullptr;
} }
bfile->file_size_ = BlobLogHeader::kHeaderSize; bfile->file_size_ = BlobLogHeader::kSize;
bfile->header_.compression_ = bdb_options_.compression; bfile->header_.compression = bdb_options_.compression;
bfile->header_.has_ttl = false;
bfile->header_valid_ = true; bfile->header_valid_ = true;
bfile->SetHasTTL(false);
// CHECK again // CHECK again
WriteLock wl(&mutex_); WriteLock wl(&mutex_);
@ -603,7 +594,7 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
uint64_t exp_low = uint64_t exp_low =
(expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs; (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs; uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
ttlrange_t ttl_guess = std::make_pair(exp_low, exp_high); ExpirationRange expiration_range = std::make_pair(exp_low, exp_high);
bfile = NewBlobFile("SelectBlobFileTTL"); bfile = NewBlobFile("SelectBlobFileTTL");
assert(bfile); assert(bfile);
@ -621,14 +612,16 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
return nullptr; return nullptr;
} }
bfile->header_.set_ttl_guess(ttl_guess); bfile->header_.expiration_range = expiration_range;
bfile->header_.compression_ = bdb_options_.compression; bfile->header_.compression = bdb_options_.compression;
bfile->header_.has_ttl = true;
bfile->header_valid_ = true; bfile->header_valid_ = true;
bfile->file_size_ = BlobLogHeader::kHeaderSize; bfile->SetHasTTL(true);
bfile->file_size_ = BlobLogHeader::kSize;
// set the first value of the range, since that is // set the first value of the range, since that is
// concrete at this time. also necessary to add to open_blob_files_ // concrete at this time. also necessary to add to open_blob_files_
bfile->ttl_range_ = ttl_guess; bfile->expiration_range_ = expiration_range;
WriteLock wl(&mutex_); WriteLock wl(&mutex_);
// in case the epoch has shifted in the interim, then check // in case the epoch has shifted in the interim, then check
@ -878,8 +871,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
Slice value_compressed = GetCompressedSlice(value, &compression_output); Slice value_compressed = GetCompressedSlice(value, &compression_output);
std::string headerbuf; std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration, Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
-1);
s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration, s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
&index_entry); &index_entry);
@ -887,7 +879,7 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
if (s.ok()) { if (s.ok()) {
bfile->ExtendSequenceRange(sequence); bfile->ExtendSequenceRange(sequence);
if (expiration != kNoExpiration) { if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration); bfile->ExtendExpirationRange(expiration);
} }
s = CloseBlobFileIfNeeded(bfile); s = CloseBlobFileIfNeeded(bfile);
if (s.ok()) { if (s.ok()) {
@ -1045,7 +1037,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
// later from the Blob Header, which needs to be also a // later from the Blob Header, which needs to be also a
// valid offset. // valid offset.
if (blob_index.offset() < if (blob_index.offset() <
(BlobLogHeader::kHeaderSize + BlobLogRecord::kHeaderSize + key.size())) { (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
if (debug_level_ >= 2) { if (debug_level_ >= 2) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
"Invalid blob index file_number: %" PRIu64 "Invalid blob index file_number: %" PRIu64
@ -1085,7 +1077,9 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
valueptr = &value_c; valueptr = &value_c;
} }
// allocate the buffer. This is safe in C++11 // Allocate the buffer. This is safe in C++11
// Note that std::string::reserved() does not work, since previous value
// of the buffer can be larger than blob_index.size().
valueptr->resize(blob_index.size()); valueptr->resize(blob_index.size());
char* buffer = &(*valueptr)[0]; char* buffer = &(*valueptr)[0];
@ -1103,6 +1097,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status::NotFound("Blob Not Found as couldnt retrieve Blob"); return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
} }
// TODO(yiwu): Add an option to skip crc checking.
Slice crc_slice; Slice crc_slice;
uint32_t crc_exp; uint32_t crc_exp;
std::string crc_str; std::string crc_str;
@ -1121,7 +1116,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status::NotFound("Blob Not Found as couldnt retrieve CRC"); return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
} }
uint32_t crc = crc32c::Extend(0, blob_value.data(), blob_value.size()); uint32_t crc = crc32c::Value(key.data(), key.size());
crc = crc32c::Extend(crc, blob_value.data(), blob_value.size());
crc = crc32c::Mask(crc); // Adjust for storage crc = crc32c::Mask(crc); // Adjust for storage
if (crc != crc_exp) { if (crc != crc_exp) {
if (debug_level_ >= 2) { if (debug_level_ >= 2) {
@ -1134,6 +1130,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
return Status::Corruption("Corruption. Blob CRC mismatch"); return Status::Corruption("Corruption. Blob CRC mismatch");
} }
// TODO(yiwu): Should use compression flag in the blob file instead of
// current compression option.
if (bdb_options_.compression != kNoCompression) { if (bdb_options_.compression != kNoCompression) {
BlockContents contents; BlockContents contents;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
@ -1204,7 +1202,7 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
"Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64, "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64,
bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(), bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(),
bfile->deleted_count_, bfile->deleted_size_, bfile->deleted_count_, bfile->deleted_size_,
(bfile->ttl_range_.second - epoch_now)); (bfile->expiration_range_.second - epoch_now));
} }
// reschedule // reschedule
@ -1255,7 +1253,7 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
const std::shared_ptr<BlobFile>& bfile) { const std::shared_ptr<BlobFile>& bfile) {
assert(bfile->Obsolete()); assert(bfile->Obsolete());
SequenceNumber esn = bfile->GetSNRange().first; SequenceNumber esn = bfile->GetSequenceRange().first;
// TODO(yiwu): Here we should check instead if there is an active snapshot // TODO(yiwu): Here we should check instead if there is an active snapshot
// lies between the first sequence in the file, and the last sequence by // lies between the first sequence in the file, and the last sequence by
@ -1412,7 +1410,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
{ {
ReadLock lockbfile_r(&bfile->mutex_); ReadLock lockbfile_r(&bfile->mutex_);
if (bfile->ttl_range_.second > epoch_now) continue; if (bfile->expiration_range_.second > epoch_now) continue;
process_files.push_back(bfile); process_files.push_back(bfile);
} }
} }
@ -1586,22 +1584,24 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool first_gc = bfptr->gc_once_after_open_; bool first_gc = bfptr->gc_once_after_open_;
auto* cfh = bfptr->GetColumnFamily(db_); auto* cfh =
db_impl_->GetColumnFamilyHandleUnlocked(bfptr->column_family_id());
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto column_family_id = cfd->GetID(); auto column_family_id = cfd->GetID();
bool has_ttl = header.HasTTL(); bool has_ttl = header.has_ttl;
// this reads the key but skips the blob // this reads the key but skips the blob
Reader::ReadLevel shallow = Reader::kReadHeaderKey; Reader::ReadLevel shallow = Reader::kReadHeaderKey;
bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second); bool no_relocation_ttl =
(has_ttl && now >= bfptr->GetExpirationRange().second);
bool no_relocation_lsmdel = false; bool no_relocation_lsmdel = false;
{ {
ReadLock lockbfile_r(&bfptr->mutex_); ReadLock lockbfile_r(&bfptr->mutex_);
no_relocation_lsmdel = (bfptr->GetFileSize() == no_relocation_lsmdel =
(BlobLogHeader::kHeaderSize + bfptr->deleted_size_ + (bfptr->GetFileSize() ==
BlobLogFooter::kFooterSize)); (BlobLogHeader::kSize + bfptr->deleted_size_ + BlobLogFooter::kSize));
} }
bool no_relocation = no_relocation_ttl || no_relocation_lsmdel; bool no_relocation = no_relocation_ttl || no_relocation_lsmdel;
@ -1640,7 +1640,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool is_blob_index = false; bool is_blob_index = false;
PinnableSlice index_entry; PinnableSlice index_entry;
Status get_status = db_impl_->GetImpl( Status get_status = db_impl_->GetImpl(
ReadOptions(), cfh, record.Key(), &index_entry, nullptr /*value_found*/, ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/,
&is_blob_index); &is_blob_index);
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
if (!get_status.ok() && !get_status.ok()) { if (!get_status.ok() && !get_status.ok()) {
@ -1671,15 +1671,15 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
continue; continue;
} }
GarbageCollectionWriteCallback callback(cfd, record.Key(), latest_seq); GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq);
// If key has expired, remove it from base DB. // If key has expired, remove it from base DB.
if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) { if (no_relocation_ttl || (has_ttl && now >= record.expiration)) {
gc_stats->num_deletes++; gc_stats->num_deletes++;
gc_stats->deleted_size += record.GetBlobSize(); gc_stats->deleted_size += record.value_size;
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
WriteBatch delete_batch; WriteBatch delete_batch;
Status delete_status = delete_batch.Delete(record.Key()); Status delete_status = delete_batch.Delete(record.key);
if (delete_status.ok()) { if (delete_status.ok()) {
delete_status = db_impl_->WriteWithCallback(WriteOptions(), delete_status = db_impl_->WriteWithCallback(WriteOptions(),
&delete_batch, &callback); &delete_batch, &callback);
@ -1718,7 +1718,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
newfile->header_ = std::move(header); newfile->header_ = std::move(header);
// Can't use header beyond this point // Can't use header beyond this point
newfile->header_valid_ = true; newfile->header_valid_ = true;
newfile->file_size_ = BlobLogHeader::kHeaderSize; newfile->file_size_ = BlobLogHeader::kSize;
s = new_writer->WriteHeader(newfile->header_); s = new_writer->WriteHeader(newfile->header_);
if (!s.ok()) { if (!s.ok()) {
@ -1740,21 +1740,21 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
uint64_t new_blob_offset = 0; uint64_t new_blob_offset = 0;
uint64_t new_key_offset = 0; uint64_t new_key_offset = 0;
// write the blob to the blob log. // write the blob to the blob log.
s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset, s = new_writer->AddRecord(record.key, record.value, record.expiration,
&new_blob_offset, record.GetTTL()); &new_key_offset, &new_blob_offset);
BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(), BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
new_blob_offset, record.Blob().size(), new_blob_offset, record.value.size(),
bdb_options_.compression); bdb_options_.compression);
newfile->blob_count_++; newfile->blob_count_++;
newfile->file_size_ += newfile->file_size_ +=
BlobLogRecord::kHeaderSize + record.Key().size() + record.Blob().size(); BlobLogRecord::kHeaderSize + record.key.size() + record.value.size();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
WriteBatch rewrite_batch; WriteBatch rewrite_batch;
Status rewrite_status = WriteBatchInternal::PutBlobIndex( Status rewrite_status = WriteBatchInternal::PutBlobIndex(
&rewrite_batch, column_family_id, record.Key(), new_index_entry); &rewrite_batch, column_family_id, record.key, new_index_entry);
if (rewrite_status.ok()) { if (rewrite_status.ok()) {
rewrite_status = db_impl_->WriteWithCallback(WriteOptions(), rewrite_status = db_impl_->WriteWithCallback(WriteOptions(),
&rewrite_batch, &callback); &rewrite_batch, &callback);
@ -1797,8 +1797,8 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
bool is_oldest_simple_blob_file, bool is_oldest_simple_blob_file,
std::string* reason) { std::string* reason) {
if (bfile->HasTTL()) { if (bfile->HasTTL()) {
ttlrange_t ttl_range = bfile->GetTTLRange(); ExpirationRange expiration_range = bfile->GetExpirationRange();
if (now > ttl_range.second) { if (now > expiration_range.second) {
*reason = "entire file ttl expired"; *reason = "entire file ttl expired";
return true; return true;
} }
@ -1941,11 +1941,12 @@ bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
return false; return false;
} }
ColumnFamilyHandle* cfh = bfile->GetColumnFamily(db_); ColumnFamilyHandle* cfh =
db_impl_->GetColumnFamilyHandleUnlocked(bfile->column_family_id());
BlobLogRecord record; BlobLogRecord record;
Reader::ReadLevel full = Reader::kReadHeaderKeyBlob; Reader::ReadLevel full = Reader::kReadHeaderKeyBlob;
while (reader->ReadRecord(&record, full).ok()) { while (reader->ReadRecord(&record, full).ok()) {
bdb_options_.gc_evict_cb_fn(cfh, record.Key(), record.Blob()); bdb_options_.gc_evict_cb_fn(cfh, record.key, record.value);
} }
return true; return true;
@ -2038,15 +2039,15 @@ void BlobDBImpl::FilterSubsetOfFiles(
"File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64 "File has been skipped for GC ttl %s %" PRIu64 " %" PRIu64
" reason='%s'", " reason='%s'",
bfile->PathName().c_str(), now, bfile->PathName().c_str(), now,
bfile->GetTTLRange().second, reason.c_str()); bfile->GetExpirationRange().second, reason.c_str());
continue; continue;
} }
ROCKS_LOG_INFO(db_options_.info_log, ROCKS_LOG_INFO(db_options_.info_log,
"File has been chosen for GC ttl %s %" PRIu64 " %" PRIu64 "File has been chosen for GC ttl %s %" PRIu64 " %" PRIu64
" reason='%s'", " reason='%s'",
bfile->PathName().c_str(), now, bfile->GetTTLRange().second, bfile->PathName().c_str(), now,
reason.c_str()); bfile->GetExpirationRange().second, reason.c_str());
to_process->push_back(bfile); to_process->push_back(bfile);
} }
} }

View File

@ -1012,11 +1012,11 @@ TEST_F(BlobDBTest, InlineSmallValues) {
ttl_file = blob_files[1]; ttl_file = blob_files[1];
} }
ASSERT_FALSE(non_ttl_file->HasTTL()); ASSERT_FALSE(non_ttl_file->HasTTL());
ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSNRange().first); ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSequenceRange().first);
ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSNRange().second); ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSequenceRange().second);
ASSERT_TRUE(ttl_file->HasTTL()); ASSERT_TRUE(ttl_file->HasTTL());
ASSERT_EQ(first_ttl_seq, ttl_file->GetSNRange().first); ASSERT_EQ(first_ttl_seq, ttl_file->GetSequenceRange().first);
ASSERT_EQ(last_ttl_seq, ttl_file->GetSNRange().second); ASSERT_EQ(last_ttl_seq, ttl_file->GetSequenceRange().second);
} }
} // namespace blob_db } // namespace blob_db

View File

@ -18,7 +18,6 @@
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h"
#include "util/string_util.h" #include "util/string_util.h"
namespace rocksdb { namespace rocksdb {
@ -92,7 +91,7 @@ Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) {
Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset) { Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset) {
Slice slice; Slice slice;
Status s = Read(0, BlobLogHeader::kHeaderSize, &slice); Status s = Read(0, BlobLogHeader::kSize, &slice);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -102,20 +101,19 @@ Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset) {
return s; return s;
} }
fprintf(stdout, "Blob log header:\n"); fprintf(stdout, "Blob log header:\n");
fprintf(stdout, " Magic Number : %" PRIu32 "\n", header.magic_number()); fprintf(stdout, " Version : %" PRIu32 "\n", header.version);
fprintf(stdout, " Version : %" PRIu32 "\n", header.version()); fprintf(stdout, " Column Family ID : %" PRIu32 "\n",
CompressionType compression = header.compression(); header.column_family_id);
std::string compression_str; std::string compression_str;
if (!GetStringFromCompressionType(&compression_str, compression).ok()) { if (!GetStringFromCompressionType(&compression_str, header.compression)
.ok()) {
compression_str = "Unrecongnized compression type (" + compression_str = "Unrecongnized compression type (" +
ToString((int)header.compression()) + ")"; ToString((int)header.compression) + ")";
} }
fprintf(stdout, " Compression : %s\n", compression_str.c_str()); fprintf(stdout, " Compression : %s\n", compression_str.c_str());
fprintf(stdout, " TTL Range : %s\n", fprintf(stdout, " Expiration range : %s\n",
GetString(header.ttl_range()).c_str()); GetString(header.expiration_range).c_str());
fprintf(stdout, " Timestamp Range: %s\n", *offset = BlobLogHeader::kSize;
GetString(header.ts_range()).c_str());
*offset = BlobLogHeader::kHeaderSize;
return s; return s;
} }
@ -126,20 +124,12 @@ Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size,
fprintf(stdout, "No blob log footer.\n"); fprintf(stdout, "No blob log footer.\n");
return Status::OK(); return Status::OK();
}; };
if (file_size < BlobLogHeader::kHeaderSize + BlobLogFooter::kFooterSize) { if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
return no_footer(); return no_footer();
} }
Slice slice; Slice slice;
Status s = Read(file_size - 4, 4, &slice); *footer_offset = file_size - BlobLogFooter::kSize;
if (!s.ok()) { Status s = Read(*footer_offset, BlobLogFooter::kSize, &slice);
return s;
}
uint32_t magic_number = DecodeFixed32(slice.data());
if (magic_number != kMagicNumber) {
return no_footer();
}
*footer_offset = file_size - BlobLogFooter::kFooterSize;
s = Read(*footer_offset, BlobLogFooter::kFooterSize, &slice);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -149,13 +139,11 @@ Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size,
return s; return s;
} }
fprintf(stdout, "Blob log footer:\n"); fprintf(stdout, "Blob log footer:\n");
fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.GetBlobCount()); fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count);
fprintf(stdout, " TTL Range : %s\n", fprintf(stdout, " Expiration Range : %s\n",
GetString(footer.GetTTLRange()).c_str()); GetString(footer.expiration_range).c_str());
fprintf(stdout, " Time Range : %s\n", fprintf(stdout, " Sequence Range : %s\n",
GetString(footer.GetTimeRange()).c_str()); GetString(footer.sequence_range).c_str());
fprintf(stdout, " Sequence Range : %s\n",
GetString(footer.GetSNRange()).c_str());
return s; return s;
} }
@ -173,41 +161,25 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
uint32_t key_size = record.GetKeySize(); uint64_t key_size = record.key_size;
uint64_t blob_size = record.GetBlobSize(); uint64_t value_size = record.value_size;
fprintf(stdout, " key size : %" PRIu32 "\n", key_size); fprintf(stdout, " key size : %" PRIu64 "\n", key_size);
fprintf(stdout, " blob size : %" PRIu64 "\n", record.GetBlobSize()); fprintf(stdout, " value size : %" PRIu64 "\n", value_size);
fprintf(stdout, " TTL : %" PRIu64 "\n", record.GetTTL()); fprintf(stdout, " expiration : %" PRIu64 "\n", record.expiration);
fprintf(stdout, " time : %" PRIu64 "\n", record.GetTimeVal());
fprintf(stdout, " type : %d, %d\n", record.type(), record.subtype());
fprintf(stdout, " header CRC : %" PRIu32 "\n", record.header_checksum());
fprintf(stdout, " CRC : %" PRIu32 "\n", record.checksum());
uint32_t header_crc =
crc32c::Extend(0, slice.data(), slice.size() - 2 * sizeof(uint32_t));
*offset += BlobLogRecord::kHeaderSize; *offset += BlobLogRecord::kHeaderSize;
s = Read(*offset, key_size + blob_size, &slice); s = Read(*offset, key_size + value_size, &slice);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
header_crc = crc32c::Extend(header_crc, slice.data(), key_size);
header_crc = crc32c::Mask(header_crc);
if (header_crc != record.header_checksum()) {
return Status::Corruption("Record header checksum mismatch.");
}
uint32_t blob_crc = crc32c::Extend(0, slice.data() + key_size, blob_size);
blob_crc = crc32c::Mask(blob_crc);
if (blob_crc != record.checksum()) {
return Status::Corruption("Blob checksum mismatch.");
}
if (show_key != DisplayType::kNone) { if (show_key != DisplayType::kNone) {
fprintf(stdout, " key : "); fprintf(stdout, " key : ");
DumpSlice(Slice(slice.data(), key_size), show_key); DumpSlice(Slice(slice.data(), key_size), show_key);
if (show_blob != DisplayType::kNone) { if (show_blob != DisplayType::kNone) {
fprintf(stdout, " blob : "); fprintf(stdout, " blob : ");
DumpSlice(Slice(slice.data() + key_size, blob_size), show_blob); DumpSlice(Slice(slice.data() + key_size, value_size), show_blob);
} }
} }
*offset += key_size + blob_size; *offset += key_size + value_size;
return s; return s;
} }

View File

@ -15,6 +15,8 @@
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "util/filename.h" #include "util/filename.h"
#include "util/logging.h" #include "util/logging.h"
@ -27,6 +29,7 @@ namespace blob_db {
BlobFile::BlobFile() BlobFile::BlobFile()
: parent_(nullptr), : parent_(nullptr),
file_number_(0), file_number_(0),
has_ttl_(false),
blob_count_(0), blob_count_(0),
gc_epoch_(-1), gc_epoch_(-1),
file_size_(0), file_size_(0),
@ -35,9 +38,8 @@ BlobFile::BlobFile()
closed_(false), closed_(false),
can_be_deleted_(false), can_be_deleted_(false),
gc_once_after_open_(false), gc_once_after_open_(false),
ttl_range_(std::make_pair(0, 0)), expiration_range_({0, 0}),
time_range_(std::make_pair(0, 0)), sequence_range_({kMaxSequenceNumber, 0}),
sn_range_(std::make_pair(kMaxSequenceNumber, 0)),
last_access_(-1), last_access_(-1),
last_fsync_(0), last_fsync_(0),
header_valid_(false) {} header_valid_(false) {}
@ -46,6 +48,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
: parent_(p), : parent_(p),
path_to_dir_(bdir), path_to_dir_(bdir),
file_number_(fn), file_number_(fn),
has_ttl_(false),
blob_count_(0), blob_count_(0),
gc_epoch_(-1), gc_epoch_(-1),
file_size_(0), file_size_(0),
@ -54,9 +57,8 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
closed_(false), closed_(false),
can_be_deleted_(false), can_be_deleted_(false),
gc_once_after_open_(false), gc_once_after_open_(false),
ttl_range_(std::make_pair(0, 0)), expiration_range_({0, 0}),
time_range_(std::make_pair(0, 0)), sequence_range_({kMaxSequenceNumber, 0}),
sn_range_(std::make_pair(kMaxSequenceNumber, 0)),
last_access_(-1), last_access_(-1),
last_fsync_(0), last_fsync_(0),
header_valid_(false) {} header_valid_(false) {}
@ -72,6 +74,13 @@ BlobFile::~BlobFile() {
} }
} }
uint32_t BlobFile::column_family_id() const {
// TODO(yiwu): Should return column family id encoded in blob file after
// we add blob db column family support.
return reinterpret_cast<ColumnFamilyHandle*>(parent_->DefaultColumnFamily())
->GetID();
}
std::string BlobFile::PathName() const { std::string BlobFile::PathName() const {
return BlobFileName(path_to_dir_, file_number_); return BlobFileName(path_to_dir_, file_number_);
} }
@ -101,13 +110,14 @@ std::string BlobFile::DumpState() const {
"path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64 "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " gc_epoch: %" PRIu64
" file_size: %" PRIu64 " deleted_count: %" PRIu64 " file_size: %" PRIu64 " deleted_count: %" PRIu64
" deleted_size: %" PRIu64 " deleted_size: %" PRIu64
" closed: %d can_be_deleted: %d ttl_range: (%" PRIu64 ", %" PRIu64 " closed: %d can_be_deleted: %d expiration_range: (%" PRIu64
") sn_range: (%" PRIu64 " %" PRIu64 "), writer: %d reader: %d", ", %" PRIu64 ") sequence_range: (%" PRIu64 " %" PRIu64
"), writer: %d reader: %d",
path_to_dir_.c_str(), file_number_, blob_count_.load(), path_to_dir_.c_str(), file_number_, blob_count_.load(),
gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_, gc_epoch_.load(), file_size_.load(), deleted_count_, deleted_size_,
closed_.load(), can_be_deleted_.load(), ttl_range_.first, closed_.load(), can_be_deleted_.load(), expiration_range_.first,
ttl_range_.second, sn_range_.first, sn_range_.second, expiration_range_.second, sequence_range_.first,
(!!log_writer_), (!!ra_file_reader_)); sequence_range_.second, (!!log_writer_), (!!ra_file_reader_));
return str; return str;
} }
@ -122,17 +132,18 @@ Status BlobFile::WriteFooterAndCloseLocked() {
"File is being closed after footer %s", PathName().c_str()); "File is being closed after footer %s", PathName().c_str());
BlobLogFooter footer; BlobLogFooter footer;
footer.blob_count_ = blob_count_; footer.blob_count = blob_count_;
if (HasTTL()) footer.set_ttl_range(ttl_range_); if (HasTTL()) {
footer.expiration_range = expiration_range_;
}
footer.sn_range_ = sn_range_; footer.sequence_range = sequence_range_;
if (HasTimestamp()) footer.set_time_range(time_range_);
// this will close the file and reset the Writable File Pointer. // this will close the file and reset the Writable File Pointer.
Status s = log_writer_->AppendFooter(footer); Status s = log_writer_->AppendFooter(footer);
if (s.ok()) { if (s.ok()) {
closed_ = true; closed_ = true;
file_size_ += BlobLogFooter::kFooterSize; file_size_ += BlobLogFooter::kSize;
} else { } else {
ROCKS_LOG_ERROR(parent_->db_options_.info_log, ROCKS_LOG_ERROR(parent_->db_options_.info_log,
"Failure to read Header for blob-file %s", "Failure to read Header for blob-file %s",
@ -144,20 +155,20 @@ Status BlobFile::WriteFooterAndCloseLocked() {
} }
Status BlobFile::ReadFooter(BlobLogFooter* bf) { Status BlobFile::ReadFooter(BlobLogFooter* bf) {
if (file_size_ < (BlobLogHeader::kHeaderSize + BlobLogFooter::kFooterSize)) { if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) {
return Status::IOError("File does not have footer", PathName()); return Status::IOError("File does not have footer", PathName());
} }
uint64_t footer_offset = file_size_ - BlobLogFooter::kFooterSize; uint64_t footer_offset = file_size_ - BlobLogFooter::kSize;
// assume that ra_file_reader_ is valid before we enter this // assume that ra_file_reader_ is valid before we enter this
assert(ra_file_reader_); assert(ra_file_reader_);
Slice result; Slice result;
char scratch[BlobLogFooter::kFooterSize + 10]; char scratch[BlobLogFooter::kSize + 10];
Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kFooterSize, Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
&result, scratch); scratch);
if (!s.ok()) return s; if (!s.ok()) return s;
if (result.size() != BlobLogFooter::kFooterSize) { if (result.size() != BlobLogFooter::kSize) {
// should not happen // should not happen
return Status::IOError("EOF reached before footer"); return Status::IOError("EOF reached before footer");
} }
@ -167,21 +178,12 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) {
} }
Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
if (footer.HasTTL() != header_.HasTTL()) {
return Status::Corruption("has_ttl mismatch");
}
if (footer.HasTimestamp() != header_.HasTimestamp()) {
return Status::Corruption("has_ts mismatch");
}
// assume that file has been fully fsync'd // assume that file has been fully fsync'd
last_fsync_.store(file_size_); last_fsync_.store(file_size_);
blob_count_ = footer.GetBlobCount(); blob_count_ = footer.blob_count;
ttl_range_ = footer.GetTTLRange(); expiration_range_ = footer.expiration_range;
time_range_ = footer.GetTimeRange(); sequence_range_ = footer.sequence_range;
sn_range_ = footer.GetSNRange();
closed_ = true; closed_ = true;
return Status::OK(); return Status::OK();
} }
@ -229,10 +231,6 @@ std::shared_ptr<RandomAccessFileReader> BlobFile::GetOrOpenRandomAccessReader(
return ra_file_reader_; return ra_file_reader_;
} }
ColumnFamilyHandle* BlobFile::GetColumnFamily(DB* db) {
return db->DefaultColumnFamily();
}
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

View File

@ -13,11 +13,14 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_format.h"
#include "utilities/blob_db/blob_log_reader.h"
#include "utilities/blob_db/blob_log_writer.h" #include "utilities/blob_db/blob_log_writer.h"
namespace rocksdb { namespace rocksdb {
namespace blob_db { namespace blob_db {
class BlobDBImpl;
class BlobFile { class BlobFile {
friend class BlobDBImpl; friend class BlobDBImpl;
friend struct blobf_compare_ttl; friend struct blobf_compare_ttl;
@ -34,6 +37,10 @@ class BlobFile {
// after that // after that
uint64_t file_number_; uint64_t file_number_;
// If true, the keys in this file all has TTL. Otherwise all keys don't
// have TTL.
bool has_ttl_;
// number of blobs in the file // number of blobs in the file
std::atomic<uint64_t> blob_count_; std::atomic<uint64_t> blob_count_;
@ -62,14 +69,9 @@ class BlobFile {
// should this file been gc'd once to reconcile lost deletes/compactions // should this file been gc'd once to reconcile lost deletes/compactions
std::atomic<bool> gc_once_after_open_; std::atomic<bool> gc_once_after_open_;
// et - lt of the blobs ExpirationRange expiration_range_;
ttlrange_t ttl_range_;
// et - lt of the timestamp of the KV pairs. SequenceRange sequence_range_;
tsrange_t time_range_;
// ESN - LSN of the blobs
snrange_t sn_range_;
// Sequential/Append writer for blobs // Sequential/Append writer for blobs
std::shared_ptr<Writer> log_writer_; std::shared_ptr<Writer> log_writer_;
@ -96,7 +98,7 @@ class BlobFile {
~BlobFile(); ~BlobFile();
ColumnFamilyHandle* GetColumnFamily(DB* db); uint32_t column_family_id() const;
// Returns log file's pathname relative to the main db dir // Returns log file's pathname relative to the main db dir
// Eg. For a live-log-file = blob_dir/000003.blob // Eg. For a live-log-file = blob_dir/000003.blob
@ -128,30 +130,29 @@ class BlobFile {
} }
// All Get functions which are not atomic, will need ReadLock on the mutex // All Get functions which are not atomic, will need ReadLock on the mutex
tsrange_t GetTimeRange() const {
assert(HasTimestamp()); ExpirationRange GetExpirationRange() const { return expiration_range_; }
return time_range_;
void ExtendExpirationRange(uint64_t expiration) {
expiration_range_.first = std::min(expiration_range_.first, expiration);
expiration_range_.second = std::max(expiration_range_.second, expiration);
} }
ttlrange_t GetTTLRange() const { return ttl_range_; } SequenceRange GetSequenceRange() const { return sequence_range_; }
snrange_t GetSNRange() const { return sn_range_; } void SetSequenceRange(SequenceRange sequence_range) {
sequence_range_ = sequence_range;
bool HasTTL() const {
assert(header_valid_);
return header_.HasTTL();
}
bool HasTimestamp() const {
assert(header_valid_);
return header_.HasTimestamp();
} }
void ExtendSequenceRange(SequenceNumber sequence) { void ExtendSequenceRange(SequenceNumber sequence) {
sn_range_.first = std::min(sn_range_.first, sequence); sequence_range_.first = std::min(sequence_range_.first, sequence);
sn_range_.second = std::max(sn_range_.second, sequence); sequence_range_.second = std::max(sequence_range_.second, sequence);
} }
bool HasTTL() const { return has_ttl_; }
void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
std::shared_ptr<Writer> GetWriter() const { return log_writer_; } std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
void Fsync(); void Fsync();
@ -174,11 +175,9 @@ class BlobFile {
// previously closed file // previously closed file
Status SetFromFooterLocked(const BlobLogFooter& footer); Status SetFromFooterLocked(const BlobLogFooter& footer);
void set_time_range(const tsrange_t& tr) { time_range_ = tr; } void set_expiration_range(const ExpirationRange& expiration_range) {
expiration_range_ = expiration_range;
void set_ttl_range(const ttlrange_t& ttl) { ttl_range_ = ttl; } }
void SetSNRange(const snrange_t& snr) { sn_range_ = snr; }
// The following functions are atomic, and don't need locks // The following functions are atomic, and don't need locks
void SetFileSize(uint64_t fs) { file_size_ = fs; } void SetFileSize(uint64_t fs) { file_size_ = fs; }

View File

@ -6,284 +6,145 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_format.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
namespace rocksdb { namespace rocksdb {
namespace blob_db { namespace blob_db {
const uint32_t kMagicNumber = 2395959; void BlobLogHeader::EncodeTo(std::string* dst) {
const uint32_t kVersion1 = 1; assert(dst != nullptr);
const size_t kBlockSize = 32768; dst->clear();
dst->reserve(BlobLogHeader::kSize);
BlobLogHeader::BlobLogHeader() PutFixed32(dst, kMagicNumber);
: magic_number_(kMagicNumber), compression_(kNoCompression) {} PutFixed32(dst, version);
PutFixed32(dst, column_family_id);
BlobLogHeader& BlobLogHeader::operator=(BlobLogHeader&& in) noexcept { unsigned char flags = (has_ttl ? 1 : 0);
if (this != &in) { dst->push_back(flags);
magic_number_ = in.magic_number_; dst->push_back(compression);
version_ = in.version_; PutFixed64(dst, expiration_range.first);
ttl_guess_ = std::move(in.ttl_guess_); PutFixed64(dst, expiration_range.second);
ts_guess_ = std::move(in.ts_guess_);
compression_ = in.compression_;
}
return *this;
} }
BlobLogFooter::BlobLogFooter() : magic_number_(kMagicNumber), blob_count_(0) {} Status BlobLogHeader::DecodeFrom(Slice src) {
static const std::string kErrorMessage =
Status BlobLogFooter::DecodeFrom(const Slice& input) { "Error while decoding blob log header";
Slice slice(input); if (src.size() != BlobLogHeader::kSize) {
uint32_t val; return Status::Corruption(kErrorMessage,
if (!GetFixed32(&slice, &val)) { "Unexpected blob file header size");
return Status::Corruption("Invalid Blob Footer: flags");
} }
uint32_t magic_number;
bool has_ttl = false; unsigned char flags;
bool has_ts = false; if (!GetFixed32(&src, &magic_number) || !GetFixed32(&src, &version) ||
val >>= 8; !GetFixed32(&src, &column_family_id)) {
RecordSubType st = static_cast<RecordSubType>(val); return Status::Corruption(
switch (st) { kErrorMessage,
case kRegularType: "Error decoding magic number, version and column family id");
break;
case kTTLType:
has_ttl = true;
break;
case kTimestampType:
has_ts = true;
break;
default:
return Status::Corruption("Invalid Blob Footer: flags_val");
} }
if (magic_number != kMagicNumber) {
if (!GetFixed64(&slice, &blob_count_)) { return Status::Corruption(kErrorMessage, "Magic number mismatch");
return Status::Corruption("Invalid Blob Footer: blob_count");
} }
if (version != kVersion1) {
ttlrange_t temp_ttl; return Status::Corruption(kErrorMessage, "Unknown header version");
if (!GetFixed64(&slice, &temp_ttl.first) ||
!GetFixed64(&slice, &temp_ttl.second)) {
return Status::Corruption("Invalid Blob Footer: ttl_range");
} }
if (has_ttl) { flags = src.data()[0];
ttl_range_.reset(new ttlrange_t(temp_ttl)); compression = static_cast<CompressionType>(src.data()[1]);
has_ttl = (flags & 1) == 1;
src.remove_prefix(2);
if (!GetFixed64(&src, &expiration_range.first) ||
!GetFixed64(&src, &expiration_range.second)) {
return Status::Corruption(kErrorMessage, "Error decoding expiration range");
} }
if (!GetFixed64(&slice, &sn_range_.first) ||
!GetFixed64(&slice, &sn_range_.second)) {
return Status::Corruption("Invalid Blob Footer: sn_range");
}
tsrange_t temp_ts;
if (!GetFixed64(&slice, &temp_ts.first) ||
!GetFixed64(&slice, &temp_ts.second)) {
return Status::Corruption("Invalid Blob Footer: ts_range");
}
if (has_ts) {
ts_range_.reset(new tsrange_t(temp_ts));
}
if (!GetFixed32(&slice, &magic_number_) || magic_number_ != kMagicNumber) {
return Status::Corruption("Invalid Blob Footer: magic");
}
return Status::OK(); return Status::OK();
} }
void BlobLogFooter::EncodeTo(std::string* dst) const { void BlobLogFooter::EncodeTo(std::string* dst) {
dst->reserve(kFooterSize); assert(dst != nullptr);
dst->clear();
RecordType rt = kFullType; dst->reserve(BlobLogFooter::kSize);
RecordSubType st = kRegularType; PutFixed32(dst, kMagicNumber);
if (HasTTL()) { PutFixed64(dst, blob_count);
st = kTTLType; PutFixed64(dst, expiration_range.first);
} else if (HasTimestamp()) { PutFixed64(dst, expiration_range.second);
st = kTimestampType; PutFixed64(dst, sequence_range.first);
} PutFixed64(dst, sequence_range.second);
uint32_t val = static_cast<uint32_t>(rt) | (static_cast<uint32_t>(st) << 8); crc = crc32c::Value(dst->c_str(), dst->size());
PutFixed32(dst, val); crc = crc32c::Mask(crc);
PutFixed32(dst, crc);
PutFixed64(dst, blob_count_);
bool has_ttl = HasTTL();
bool has_ts = HasTimestamp();
if (has_ttl) {
PutFixed64(dst, ttl_range_.get()->first);
PutFixed64(dst, ttl_range_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
PutFixed64(dst, sn_range_.first);
PutFixed64(dst, sn_range_.second);
if (has_ts) {
PutFixed64(dst, ts_range_.get()->first);
PutFixed64(dst, ts_range_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
PutFixed32(dst, magic_number_);
} }
void BlobLogHeader::EncodeTo(std::string* dst) const { Status BlobLogFooter::DecodeFrom(Slice src) {
dst->reserve(kHeaderSize); static const std::string kErrorMessage =
"Error while decoding blob log footer";
PutFixed32(dst, magic_number_); if (src.size() != BlobLogFooter::kSize) {
return Status::Corruption(kErrorMessage,
PutFixed32(dst, version_); "Unexpected blob file footer size");
RecordSubType st = kRegularType;
bool has_ttl = HasTTL();
bool has_ts = HasTimestamp();
if (has_ttl) {
st = kTTLType;
} else if (has_ts) {
st = kTimestampType;
} }
uint32_t val = uint32_t src_crc = 0;
static_cast<uint32_t>(st) | (static_cast<uint32_t>(compression_) << 8); src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - 4);
PutFixed32(dst, val); src_crc = crc32c::Mask(src_crc);
uint32_t magic_number;
if (has_ttl) { if (!GetFixed32(&src, &magic_number) || !GetFixed64(&src, &blob_count) ||
PutFixed64(dst, ttl_guess_.get()->first); !GetFixed64(&src, &expiration_range.first) ||
PutFixed64(dst, ttl_guess_.get()->second); !GetFixed64(&src, &expiration_range.second) ||
} else { !GetFixed64(&src, &sequence_range.first) ||
PutFixed64(dst, 0); !GetFixed64(&src, &sequence_range.second) || !GetFixed32(&src, &crc)) {
PutFixed64(dst, 0); return Status::Corruption(kErrorMessage, "Error decoding content");
} }
if (magic_number != kMagicNumber) {
if (has_ts) { return Status::Corruption(kErrorMessage, "Magic number mismatch");
PutFixed64(dst, ts_guess_.get()->first);
PutFixed64(dst, ts_guess_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
} }
} if (src_crc != crc) {
return Status::Corruption(kErrorMessage, "CRC mismatch");
Status BlobLogHeader::DecodeFrom(const Slice& input) {
Slice slice(input);
if (!GetFixed32(&slice, &magic_number_) || magic_number_ != kMagicNumber) {
return Status::Corruption("Invalid Blob Log Header: magic");
} }
// as of today, we only support 1 version
if (!GetFixed32(&slice, &version_) || version_ != kVersion1) {
return Status::Corruption("Invalid Blob Log Header: version");
}
uint32_t val;
if (!GetFixed32(&slice, &val)) {
return Status::Corruption("Invalid Blob Log Header: subtype");
}
bool has_ttl = false;
bool has_ts = false;
RecordSubType st = static_cast<RecordSubType>(val & 0xff);
compression_ = static_cast<CompressionType>((val >> 8) & 0xff);
switch (st) {
case kRegularType:
break;
case kTTLType:
has_ttl = true;
break;
case kTimestampType:
has_ts = true;
break;
default:
return Status::Corruption("Invalid Blob Log Header: subtype_2");
}
ttlrange_t temp_ttl;
if (!GetFixed64(&slice, &temp_ttl.first) ||
!GetFixed64(&slice, &temp_ttl.second)) {
return Status::Corruption("Invalid Blob Log Header: ttl");
}
if (has_ttl) {
set_ttl_guess(temp_ttl);
}
tsrange_t temp_ts;
if (!GetFixed64(&slice, &temp_ts.first) ||
!GetFixed64(&slice, &temp_ts.second)) {
return Status::Corruption("Invalid Blob Log Header: timestamp");
}
if (has_ts) set_ts_guess(temp_ts);
return Status::OK(); return Status::OK();
} }
BlobLogRecord::BlobLogRecord() void BlobLogRecord::EncodeHeaderTo(std::string* dst) {
: checksum_(0), assert(dst != nullptr);
header_cksum_(0), dst->clear();
key_size_(0), dst->reserve(BlobLogRecord::kHeaderSize + key.size() + value.size());
blob_size_(0), PutFixed64(dst, key.size());
time_val_(0), PutFixed64(dst, value.size());
ttl_val_(0), PutFixed64(dst, expiration);
type_(0), header_crc = crc32c::Value(dst->c_str(), dst->size());
subtype_(0) {} header_crc = crc32c::Mask(header_crc);
PutFixed32(dst, header_crc);
BlobLogRecord::~BlobLogRecord() {} blob_crc = crc32c::Value(key.data(), key.size());
blob_crc = crc32c::Extend(blob_crc, value.data(), value.size());
void BlobLogRecord::ResizeKeyBuffer(size_t kbs) { blob_crc = crc32c::Mask(blob_crc);
if (kbs > key_buffer_.size()) { PutFixed32(dst, blob_crc);
key_buffer_.resize(kbs);
}
} }
void BlobLogRecord::ResizeBlobBuffer(size_t bbs) { Status BlobLogRecord::DecodeHeaderFrom(Slice src) {
if (bbs > blob_buffer_.size()) { static const std::string kErrorMessage = "Error while decoding blob record";
blob_buffer_.resize(bbs); if (src.size() != BlobLogRecord::kHeaderSize) {
return Status::Corruption(kErrorMessage,
"Unexpected blob record header size");
} }
uint32_t src_crc = 0;
src_crc = crc32c::Value(src.data(), BlobLogRecord::kHeaderSize - 8);
src_crc = crc32c::Mask(src_crc);
if (!GetFixed64(&src, &key_size) || !GetFixed64(&src, &value_size) ||
!GetFixed64(&src, &expiration) || !GetFixed32(&src, &header_crc) ||
!GetFixed32(&src, &blob_crc)) {
return Status::Corruption(kErrorMessage, "Error decoding content");
}
if (src_crc != header_crc) {
return Status::Corruption(kErrorMessage, "Header CRC mismatch");
}
return Status::OK();
} }
void BlobLogRecord::Clear() { Status BlobLogRecord::CheckBlobCRC() const {
checksum_ = 0; uint32_t expected_crc = 0;
header_cksum_ = 0; expected_crc = crc32c::Value(key.data(), key.size());
key_size_ = 0; expected_crc = crc32c::Extend(expected_crc, value.data(), value.size());
blob_size_ = 0; expected_crc = crc32c::Mask(expected_crc);
time_val_ = 0; if (expected_crc != blob_crc) {
ttl_val_ = 0; return Status::Corruption("Blob CRC mismatch");
type_ = subtype_ = 0;
key_.clear();
blob_.clear();
}
Status BlobLogRecord::DecodeHeaderFrom(const Slice& hdrslice) {
Slice input = hdrslice;
if (input.size() < kHeaderSize) {
return Status::Corruption("Invalid Blob Record Header: size");
} }
if (!GetFixed32(&input, &key_size_)) {
return Status::Corruption("Invalid Blob Record Header: key_size");
}
if (!GetFixed64(&input, &blob_size_)) {
return Status::Corruption("Invalid Blob Record Header: blob_size");
}
if (!GetFixed64(&input, &ttl_val_)) {
return Status::Corruption("Invalid Blob Record Header: ttl_val");
}
if (!GetFixed64(&input, &time_val_)) {
return Status::Corruption("Invalid Blob Record Header: time_val");
}
type_ = *(input.data());
input.remove_prefix(1);
subtype_ = *(input.data());
input.remove_prefix(1);
if (!GetFixed32(&input, &header_cksum_)) {
return Status::Corruption("Invalid Blob Record Header: header_cksum");
}
if (!GetFixed32(&input, &checksum_)) {
return Status::Corruption("Invalid Blob Record Header: checksum");
}
return Status::OK(); return Status::OK();
} }

View File

@ -9,243 +9,113 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <cstddef>
#include <cstdint>
#include <limits> #include <limits>
#include <memory>
#include <string>
#include <utility> #include <utility>
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
namespace rocksdb { namespace rocksdb {
namespace blob_db { namespace blob_db {
class BlobFile;
class BlobDBImpl;
constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37
constexpr uint32_t kVersion1 = 1;
constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max(); constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
enum RecordType : uint8_t { using ExpirationRange = std::pair<uint64_t, uint64_t>;
// Zero is reserved for preallocated files using SequenceRange = std::pair<uint64_t, uint64_t>;
kFullType = 0,
// For fragments // Format of blob log file header (30 bytes):
kFirstType = 1, //
kMiddleType = 2, // +--------------+---------+---------+-------+-------------+-------------------+
kLastType = 3, // | magic number | version | cf id | flags | compression | expiration range |
kMaxRecordType = kLastType // +--------------+---------+---------+-------+-------------+-------------------+
// | Fixed32 | Fixed32 | Fixed32 | char | char | Fixed64 Fixed64 |
// +--------------+---------+---------+-------+-------------+-------------------+
//
// List of flags:
// has_ttl: Whether the file contain TTL data.
//
// Expiration range in the header is a rough range based on
// blob_db_options.ttl_range_secs.
struct BlobLogHeader {
static constexpr size_t kSize = 30;
uint32_t version = kVersion1;
uint32_t column_family_id;
CompressionType compression;
bool has_ttl;
ExpirationRange expiration_range;
void EncodeTo(std::string* dst);
Status DecodeFrom(Slice slice);
}; };
enum RecordSubType : uint8_t { // Format of blob log file footer (48 bytes):
kRegularType = 0, //
kTTLType = 1, // +--------------+------------+-------------------+-------------------+------------+
kTimestampType = 2, // | magic number | blob count | expiration range | sequence range | footer CRC |
// +--------------+------------+-------------------+-------------------+------------+
// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed64 + Fixed64 | Fixed32 |
// +--------------+------------+-------------------+-------------------+------------+
//
// The footer will be presented only when the blob file is properly closed.
//
// Unlike the same field in file header, expiration range in the footer is the
// range of smallest and largest expiration of the data in this file.
struct BlobLogFooter {
static constexpr size_t kSize = 48;
uint64_t blob_count;
ExpirationRange expiration_range;
SequenceRange sequence_range;
uint32_t crc;
void EncodeTo(std::string* dst);
Status DecodeFrom(Slice slice);
}; };
extern const uint32_t kMagicNumber; // Blob record format (32 bytes header + key + value):
//
// +------------+--------------+------------+------------+----------+---------+-----------+
// | key length | value length | expiration | header CRC | blob CRC | key | value |
// +------------+--------------+------------+------------+----------+---------+-----------+
// | Fixed64 | Fixed64 | Fixed64 | Fixed32 | Fixed32 | key len | value len |
// +------------+--------------+------------+------------+----------+---------+-----------+
//
// If file has has_ttl = false, expiration field is always 0, and the blob
// doesn't has expiration.
//
// Also note that if compression is used, value is compressed value and value
// length is compressed value length.
//
// Header CRC is the checksum of (key_len + val_len + expiration), while
// blob CRC is the checksum of (key + value).
//
// We could use variable length encoding (Varint64) to save more space, but it
// make reader more complicated.
struct BlobLogRecord {
// header include fields up to blob CRC
static constexpr size_t kHeaderSize = 32;
class Reader; uint64_t key_size;
uint64_t value_size;
uint64_t expiration;
uint32_t header_crc;
uint32_t blob_crc;
Slice key;
Slice value;
std::string key_buf;
std::string value_buf;
using ttlrange_t = std::pair<uint64_t, uint64_t>; void EncodeHeaderTo(std::string* dst);
using tsrange_t = std::pair<uint64_t, uint64_t>;
using snrange_t = std::pair<rocksdb::SequenceNumber, rocksdb::SequenceNumber>;
class BlobLogHeader { Status DecodeHeaderFrom(Slice src);
friend class BlobFile;
friend class BlobDBImpl;
private: Status CheckBlobCRC() const;
uint32_t magic_number_ = 0;
uint32_t version_ = 1;
CompressionType compression_;
std::unique_ptr<ttlrange_t> ttl_guess_;
std::unique_ptr<tsrange_t> ts_guess_;
private:
void set_ttl_guess(const ttlrange_t& ttl) {
ttl_guess_.reset(new ttlrange_t(ttl));
}
void set_version(uint32_t v) { version_ = v; }
void set_ts_guess(const tsrange_t& ts) { ts_guess_.reset(new tsrange_t(ts)); }
public:
// magic number + version + flags + ttl guess + timestamp range = 44
static const size_t kHeaderSize = 4 + 4 + 4 + 8 * 2 + 8 * 2;
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& input);
BlobLogHeader();
uint32_t magic_number() const { return magic_number_; }
uint32_t version() const { return version_; }
CompressionType compression() const { return compression_; }
ttlrange_t ttl_range() const {
if (!ttl_guess_) {
return {0, 0};
}
return *ttl_guess_;
}
tsrange_t ts_range() const {
if (!ts_guess_) {
return {0, 0};
}
return *ts_guess_;
}
bool HasTTL() const { return ttl_guess_ != nullptr; }
bool HasTimestamp() const { return ts_guess_ != nullptr; }
BlobLogHeader& operator=(BlobLogHeader&& in) noexcept;
};
// Footer encapsulates the fixed information stored at the tail
// end of every blob log file.
class BlobLogFooter {
friend class BlobFile;
public:
// Use this constructor when you plan to write out the footer using
// EncodeTo(). Never use this constructor with DecodeFrom().
BlobLogFooter();
uint32_t magic_number() const { return magic_number_; }
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& input);
// convert this object to a human readable form
std::string ToString() const;
// footer size = 4 byte magic number
// 8 bytes count
// 8, 8 - ttl range
// 8, 8 - sn range
// 8, 8 - ts range
// = 64
static const size_t kFooterSize = 4 + 4 + 8 + (8 * 2) + (8 * 2) + (8 * 2);
bool HasTTL() const { return !!ttl_range_; }
bool HasTimestamp() const { return !!ts_range_; }
uint64_t GetBlobCount() const { return blob_count_; }
ttlrange_t GetTTLRange() const {
if (ttl_range_) {
*ttl_range_;
}
return {0, 0};
}
tsrange_t GetTimeRange() const {
if (ts_range_) {
return *ts_range_;
}
return {0, 0};
}
const snrange_t& GetSNRange() const { return sn_range_; }
private:
uint32_t magic_number_ = 0;
uint64_t blob_count_ = 0;
std::unique_ptr<ttlrange_t> ttl_range_;
std::unique_ptr<tsrange_t> ts_range_;
snrange_t sn_range_;
private:
void set_ttl_range(const ttlrange_t& ttl) {
ttl_range_.reset(new ttlrange_t(ttl));
}
void set_time_range(const tsrange_t& ts) {
ts_range_.reset(new tsrange_t(ts));
}
};
extern const size_t kBlockSize;
class BlobLogRecord {
friend class Reader;
private:
// this might not be set.
uint32_t checksum_;
uint32_t header_cksum_;
uint32_t key_size_;
uint64_t blob_size_;
uint64_t time_val_;
uint64_t ttl_val_;
char type_;
char subtype_;
Slice key_;
Slice blob_;
std::string key_buffer_;
std::string blob_buffer_;
private:
void Clear();
char* GetKeyBuffer() { return &(key_buffer_[0]); }
char* GetBlobBuffer() { return &(blob_buffer_[0]); }
void ResizeKeyBuffer(size_t kbs);
void ResizeBlobBuffer(size_t bbs);
public:
// Header is
// Key Length ( 4 bytes ),
// Blob Length ( 8 bytes),
// ttl (8 bytes), timestamp (8 bytes),
// type (1 byte), subtype (1 byte)
// header checksum (4 bytes), blob checksum (4 bytes),
// = 42
static const size_t kHeaderSize = 4 + 4 + 8 + 8 + 4 + 8 + 1 + 1;
public:
BlobLogRecord();
~BlobLogRecord();
const Slice& Key() const { return key_; }
const Slice& Blob() const { return blob_; }
uint32_t GetKeySize() const { return key_size_; }
uint64_t GetBlobSize() const { return blob_size_; }
bool HasTTL() const {
return ttl_val_ != std::numeric_limits<uint32_t>::max();
}
uint64_t GetTTL() const { return ttl_val_; }
uint64_t GetTimeVal() const { return time_val_; }
char type() const { return type_; }
char subtype() const { return subtype_; }
uint32_t header_checksum() const { return header_cksum_; }
uint32_t checksum() const { return checksum_; }
Status DecodeHeaderFrom(const Slice& hdrslice);
}; };
} // namespace blob_db } // namespace blob_db

View File

@ -7,10 +7,8 @@
#include "utilities/blob_db/blob_log_reader.h" #include "utilities/blob_db/blob_log_reader.h"
#include <cstdio> #include <algorithm>
#include "rocksdb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
namespace rocksdb { namespace rocksdb {
@ -18,115 +16,79 @@ namespace blob_db {
Reader::Reader(std::shared_ptr<Logger> info_log, Reader::Reader(std::shared_ptr<Logger> info_log,
unique_ptr<SequentialFileReader>&& _file) unique_ptr<SequentialFileReader>&& _file)
: info_log_(info_log), file_(std::move(_file)), buffer_(), next_byte_(0) { : info_log_(info_log), file_(std::move(_file)), buffer_(), next_byte_(0) {}
backing_store_.resize(kBlockSize);
}
Reader::~Reader() {} Status Reader::ReadSlice(uint64_t size, Slice* slice, std::string* buf) {
buf->reserve(size);
Status s = file_->Read(size, slice, &(*buf)[0]);
next_byte_ += size;
if (!s.ok()) {
return s;
}
if (slice->size() != size) {
return Status::Corruption("EOF reached while reading record");
}
return s;
}
Status Reader::ReadHeader(BlobLogHeader* header) { Status Reader::ReadHeader(BlobLogHeader* header) {
assert(file_.get() != nullptr); assert(file_.get() != nullptr);
assert(next_byte_ == 0); assert(next_byte_ == 0);
Status status = Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, &backing_store_);
file_->Read(BlobLogHeader::kHeaderSize, &buffer_, GetReadBuffer()); if (!s.ok()) {
next_byte_ += buffer_.size(); return s;
if (!status.ok()) return status;
if (buffer_.size() != BlobLogHeader::kHeaderSize) {
return Status::IOError("EOF reached before file header");
} }
status = header->DecodeFrom(buffer_); if (buffer_.size() != BlobLogHeader::kSize) {
return status; return Status::Corruption("EOF reached before file header");
}
return header->DecodeFrom(buffer_);
} }
Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
uint64_t* blob_offset) { uint64_t* blob_offset) {
record->Clear(); Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, &backing_store_);
buffer_.clear(); if (!s.ok()) {
backing_store_[0] = '\0'; return s;
}
Status status =
file_->Read(BlobLogRecord::kHeaderSize, &buffer_, GetReadBuffer());
next_byte_ += buffer_.size();
if (!status.ok()) return status;
if (buffer_.size() != BlobLogRecord::kHeaderSize) { if (buffer_.size() != BlobLogRecord::kHeaderSize) {
return Status::IOError("EOF reached before record header"); return Status::Corruption("EOF reached before record header");
} }
status = record->DecodeHeaderFrom(buffer_); s = record->DecodeHeaderFrom(buffer_);
if (!status.ok()) { if (!s.ok()) {
return status; return s;
} }
uint32_t header_crc = 0; uint64_t kb_size = record->key_size + record->value_size;
uint32_t blob_crc = 0;
size_t crc_data_size = BlobLogRecord::kHeaderSize - 2 * sizeof(uint32_t);
header_crc = crc32c::Extend(header_crc, buffer_.data(), crc_data_size);
uint64_t kb_size = record->GetKeySize() + record->GetBlobSize();
if (blob_offset != nullptr) { if (blob_offset != nullptr) {
*blob_offset = next_byte_ + record->GetKeySize(); *blob_offset = next_byte_ + record->key_size;
} }
switch (level) { switch (level) {
case kReadHeader: case kReadHeader:
file_->Skip(kb_size); file_->Skip(record->key_size + record->value_size);
next_byte_ += kb_size; next_byte_ += kb_size;
break;
case kReadHeaderKey: case kReadHeaderKey:
record->ResizeKeyBuffer(record->GetKeySize()); s = ReadSlice(record->key_size, &record->key, &record->key_buf);
status = file_->Read(record->GetKeySize(), &record->key_, file_->Skip(record->value_size);
record->GetKeyBuffer()); next_byte_ += record->value_size;
next_byte_ += record->key_.size(); break;
if (!status.ok()) return status;
if (record->key_.size() != record->GetKeySize()) {
return Status::IOError("EOF reached before key read");
}
header_crc =
crc32c::Extend(header_crc, record->key_.data(), record->GetKeySize());
header_crc = crc32c::Mask(header_crc);
if (header_crc != record->header_cksum_) {
return Status::Corruption("Record Checksum mismatch: header_cksum");
}
file_->Skip(record->GetBlobSize());
next_byte_ += record->GetBlobSize();
case kReadHeaderKeyBlob: case kReadHeaderKeyBlob:
record->ResizeKeyBuffer(record->GetKeySize()); s = ReadSlice(record->key_size, &record->key, &record->key_buf);
status = file_->Read(record->GetKeySize(), &record->key_, if (s.ok()) {
record->GetKeyBuffer()); s = ReadSlice(record->value_size, &record->value, &record->value_buf);
next_byte_ += record->key_.size();
if (!status.ok()) return status;
if (record->key_.size() != record->GetKeySize()) {
return Status::IOError("EOF reached before key read");
} }
if (s.ok()) {
header_crc = s = record->CheckBlobCRC();
crc32c::Extend(header_crc, record->key_.data(), record->GetKeySize());
header_crc = crc32c::Mask(header_crc);
if (header_crc != record->header_cksum_) {
return Status::Corruption("Record Checksum mismatch: header_cksum");
}
record->ResizeBlobBuffer(record->GetBlobSize());
status = file_->Read(record->GetBlobSize(), &record->blob_,
record->GetBlobBuffer());
next_byte_ += record->blob_.size();
if (!status.ok()) return status;
if (record->blob_.size() != record->GetBlobSize()) {
return Status::IOError("EOF reached during blob read");
}
blob_crc =
crc32c::Extend(blob_crc, record->blob_.data(), record->blob_.size());
blob_crc = crc32c::Mask(blob_crc);
if (blob_crc != record->checksum_) {
return Status::Corruption("Blob Checksum mismatch");
} }
break;
} }
return status; return s;
} }
} // namespace blob_db } // namespace blob_db

View File

@ -7,11 +7,9 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <cstdint>
#include <memory> #include <memory>
#include <string> #include <string>
#include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_format.h"
@ -51,7 +49,11 @@ class Reader {
Reader(std::shared_ptr<Logger> info_log, Reader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& file); std::unique_ptr<SequentialFileReader>&& file);
~Reader(); ~Reader() = default;
// No copying allowed
Reader(const Reader&) = delete;
Reader& operator=(const Reader&) = delete;
Status ReadHeader(BlobLogHeader* header); Status ReadHeader(BlobLogHeader* header);
@ -64,6 +66,8 @@ class Reader {
Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader, Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader,
uint64_t* blob_offset = nullptr); uint64_t* blob_offset = nullptr);
Status ReadSlice(uint64_t size, Slice* slice, std::string* buf);
SequentialFileReader* file() { return file_.get(); } SequentialFileReader* file() { return file_.get(); }
void ResetNextByte() { next_byte_ = 0; } void ResetNextByte() { next_byte_ = 0; }
@ -72,9 +76,6 @@ class Reader {
const SequentialFileReader* file_reader() const { return file_.get(); } const SequentialFileReader* file_reader() const { return file_.get(); }
private:
char* GetReadBuffer() { return &(backing_store_[0]); }
private: private:
std::shared_ptr<Logger> info_log_; std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_; const std::unique_ptr<SequentialFileReader> file_;
@ -84,10 +85,6 @@ class Reader {
// which byte to read next. For asserting proper usage // which byte to read next. For asserting proper usage
uint64_t next_byte_; uint64_t next_byte_;
// No copying allowed
Reader(const Reader&) = delete;
Reader& operator=(const Reader&) = delete;
}; };
} // namespace blob_db } // namespace blob_db

View File

@ -10,8 +10,8 @@
#include <string> #include <string>
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "utilities/blob_db/blob_log_format.h"
namespace rocksdb { namespace rocksdb {
namespace blob_db { namespace blob_db {
@ -24,18 +24,11 @@ Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
bytes_per_sync_(bpsync), bytes_per_sync_(bpsync),
next_sync_offset_(0), next_sync_offset_(0),
use_fsync_(use_fs), use_fsync_(use_fs),
last_elem_type_(kEtNone) { last_elem_type_(kEtNone) {}
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
}
Writer::~Writer() {}
void Writer::Sync() { dest_->Sync(use_fsync_); } void Writer::Sync() { dest_->Sync(use_fsync_); }
Status Writer::WriteHeader(const BlobLogHeader& header) { Status Writer::WriteHeader(BlobLogHeader& header) {
assert(block_offset_ == 0); assert(block_offset_ == 0);
assert(last_elem_type_ == kEtNone); assert(last_elem_type_ == kEtNone);
std::string str; std::string str;
@ -50,7 +43,7 @@ Status Writer::WriteHeader(const BlobLogHeader& header) {
return s; return s;
} }
Status Writer::AppendFooter(const BlobLogFooter& footer) { Status Writer::AppendFooter(BlobLogFooter& footer) {
assert(block_offset_ != 0); assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
@ -69,13 +62,13 @@ Status Writer::AppendFooter(const BlobLogFooter& footer) {
} }
Status Writer::AddRecord(const Slice& key, const Slice& val, Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset, uint64_t expiration, uint64_t* key_offset,
uint64_t ttl) { uint64_t* blob_offset) {
assert(block_offset_ != 0); assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf; std::string buf;
ConstructBlobHeader(&buf, key, val, ttl, -1); ConstructBlobHeader(&buf, key, val, expiration);
Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
return s; return s;
@ -87,44 +80,19 @@ Status Writer::AddRecord(const Slice& key, const Slice& val,
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf; std::string buf;
ConstructBlobHeader(&buf, key, val, -1, -1); ConstructBlobHeader(&buf, key, val, 0);
Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
return s; return s;
} }
void Writer::ConstructBlobHeader(std::string* headerbuf, const Slice& key, void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
const Slice& val, uint64_t ttl, int64_t ts) { const Slice& val, uint64_t expiration) {
headerbuf->reserve(BlobLogRecord::kHeaderSize); BlobLogRecord record;
record.key = key;
uint32_t key_size = static_cast<uint32_t>(key.size()); record.value = val;
PutFixed32(headerbuf, key_size); record.expiration = expiration;
PutFixed64(headerbuf, val.size()); record.EncodeHeaderTo(buf);
PutFixed64(headerbuf, ttl);
PutFixed64(headerbuf, ts);
RecordType t = kFullType;
headerbuf->push_back(static_cast<char>(t));
RecordSubType st = kRegularType;
if (ttl != kNoExpiration) {
st = kTTLType;
}
headerbuf->push_back(static_cast<char>(st));
uint32_t header_crc = 0;
header_crc =
crc32c::Extend(header_crc, headerbuf->c_str(), headerbuf->size());
header_crc = crc32c::Extend(header_crc, key.data(), key.size());
header_crc = crc32c::Mask(header_crc);
PutFixed32(headerbuf, header_crc);
uint32_t crc = 0;
// Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, val.data(), val.size());
crc = crc32c::Mask(crc); // Adjust for storage
PutFixed32(headerbuf, crc);
} }
Status Writer::EmitPhysicalRecord(const std::string& headerbuf, Status Writer::EmitPhysicalRecord(const std::string& headerbuf,

View File

@ -37,24 +37,29 @@ class Writer {
explicit Writer(std::unique_ptr<WritableFileWriter>&& dest, explicit Writer(std::unique_ptr<WritableFileWriter>&& dest,
uint64_t log_number, uint64_t bpsync, bool use_fsync, uint64_t log_number, uint64_t bpsync, bool use_fsync,
uint64_t boffset = 0); uint64_t boffset = 0);
~Writer();
static void ConstructBlobHeader(std::string* headerbuf, const Slice& key, ~Writer() = default;
const Slice& val, uint64_t ttl, int64_t ts);
// No copying allowed
Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;
static void ConstructBlobHeader(std::string* buf, const Slice& key,
const Slice& val, uint64_t expiration);
Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset, Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset); uint64_t* blob_offset);
Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset, Status AddRecord(const Slice& key, const Slice& val, uint64_t expiration,
uint64_t* blob_offset, uint64_t ttl); uint64_t* key_offset, uint64_t* blob_offset);
Status EmitPhysicalRecord(const std::string& headerbuf, const Slice& key, Status EmitPhysicalRecord(const std::string& headerbuf, const Slice& key,
const Slice& val, uint64_t* key_offset, const Slice& val, uint64_t* key_offset,
uint64_t* blob_offset); uint64_t* blob_offset);
Status AppendFooter(const BlobLogFooter& footer); Status AppendFooter(BlobLogFooter& footer);
Status WriteHeader(const BlobLogHeader& header); Status WriteHeader(BlobLogHeader& header);
WritableFileWriter* file() { return dest_.get(); } WritableFileWriter* file() { return dest_.get(); }
@ -76,15 +81,6 @@ class Writer {
uint64_t next_sync_offset_; uint64_t next_sync_offset_;
bool use_fsync_; bool use_fsync_;
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
// No copying allowed
Writer(const Writer&) = delete;
Writer& operator=(const Writer&) = delete;
public: public:
enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter }; enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter };
ElemType last_elem_type_; ElemType last_elem_type_;