rocksdb/utilities/blob_db/blob_log_format.cc
Yi Wu 0552029b5c Blob DB: not writing sequence number as blob record footer
Summary:
Previously each time we write a blob we write blog_record_header + key + value + blob_record_footer to blob log. The footer only contains a sequence and a crc for the sequence number. The sequence number was used in garbage collection to verify the value is recent. After #2703 we moved to use optimistic transaction and no longer use sequence number from the footer. Remove the footer altogether.

There's another usage of sequence number and we are keeping it: Each blob log file keep track of sequence number range of keys in it, and use it to check if it is reference by a snapshot, before being deleted.
Closes https://github.com/facebook/rocksdb/pull/3005

Differential Revision: D6057585

Pulled By: yiwu-arbug

fbshipit-source-id: d6da53c457a316e9723f359a1b47facfc3ffe090
2017-10-17 12:13:08 -07:00

293 lines
7.3 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_format.h"
#include "util/coding.h"
#include "util/crc32c.h"
namespace rocksdb {
namespace blob_db {
const uint32_t kMagicNumber = 2395959;
const uint32_t kVersion1 = 1;
const size_t kBlockSize = 32768;
BlobLogHeader::BlobLogHeader()
: magic_number_(kMagicNumber), compression_(kNoCompression) {}
BlobLogHeader& BlobLogHeader::operator=(BlobLogHeader&& in) noexcept {
if (this != &in) {
magic_number_ = in.magic_number_;
version_ = in.version_;
ttl_guess_ = std::move(in.ttl_guess_);
ts_guess_ = std::move(in.ts_guess_);
compression_ = in.compression_;
}
return *this;
}
BlobLogFooter::BlobLogFooter() : magic_number_(kMagicNumber), blob_count_(0) {}
Status BlobLogFooter::DecodeFrom(const Slice& input) {
Slice slice(input);
uint32_t val;
if (!GetFixed32(&slice, &val)) {
return Status::Corruption("Invalid Blob Footer: flags");
}
bool has_ttl = false;
bool has_ts = false;
val >>= 8;
RecordSubType st = static_cast<RecordSubType>(val);
switch (st) {
case kRegularType:
break;
case kTTLType:
has_ttl = true;
break;
case kTimestampType:
has_ts = true;
break;
default:
return Status::Corruption("Invalid Blob Footer: flags_val");
}
if (!GetFixed64(&slice, &blob_count_)) {
return Status::Corruption("Invalid Blob Footer: blob_count");
}
ttlrange_t temp_ttl;
if (!GetFixed64(&slice, &temp_ttl.first) ||
!GetFixed64(&slice, &temp_ttl.second)) {
return Status::Corruption("Invalid Blob Footer: ttl_range");
}
if (has_ttl) {
ttl_range_.reset(new ttlrange_t(temp_ttl));
}
if (!GetFixed64(&slice, &sn_range_.first) ||
!GetFixed64(&slice, &sn_range_.second)) {
return Status::Corruption("Invalid Blob Footer: sn_range");
}
tsrange_t temp_ts;
if (!GetFixed64(&slice, &temp_ts.first) ||
!GetFixed64(&slice, &temp_ts.second)) {
return Status::Corruption("Invalid Blob Footer: ts_range");
}
if (has_ts) {
ts_range_.reset(new tsrange_t(temp_ts));
}
if (!GetFixed32(&slice, &magic_number_) || magic_number_ != kMagicNumber) {
return Status::Corruption("Invalid Blob Footer: magic");
}
return Status::OK();
}
void BlobLogFooter::EncodeTo(std::string* dst) const {
dst->reserve(kFooterSize);
RecordType rt = kFullType;
RecordSubType st = kRegularType;
if (HasTTL()) {
st = kTTLType;
} else if (HasTimestamp()) {
st = kTimestampType;
}
uint32_t val = static_cast<uint32_t>(rt) | (static_cast<uint32_t>(st) << 8);
PutFixed32(dst, val);
PutFixed64(dst, blob_count_);
bool has_ttl = HasTTL();
bool has_ts = HasTimestamp();
if (has_ttl) {
PutFixed64(dst, ttl_range_.get()->first);
PutFixed64(dst, ttl_range_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
PutFixed64(dst, sn_range_.first);
PutFixed64(dst, sn_range_.second);
if (has_ts) {
PutFixed64(dst, ts_range_.get()->first);
PutFixed64(dst, ts_range_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
PutFixed32(dst, magic_number_);
}
void BlobLogHeader::EncodeTo(std::string* dst) const {
dst->reserve(kHeaderSize);
PutFixed32(dst, magic_number_);
PutFixed32(dst, version_);
RecordSubType st = kRegularType;
bool has_ttl = HasTTL();
bool has_ts = HasTimestamp();
if (has_ttl) {
st = kTTLType;
} else if (has_ts) {
st = kTimestampType;
}
uint32_t val =
static_cast<uint32_t>(st) | (static_cast<uint32_t>(compression_) << 8);
PutFixed32(dst, val);
if (has_ttl) {
PutFixed64(dst, ttl_guess_.get()->first);
PutFixed64(dst, ttl_guess_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
if (has_ts) {
PutFixed64(dst, ts_guess_.get()->first);
PutFixed64(dst, ts_guess_.get()->second);
} else {
PutFixed64(dst, 0);
PutFixed64(dst, 0);
}
}
Status BlobLogHeader::DecodeFrom(const Slice& input) {
Slice slice(input);
if (!GetFixed32(&slice, &magic_number_) || magic_number_ != kMagicNumber) {
return Status::Corruption("Invalid Blob Log Header: magic");
}
// as of today, we only support 1 version
if (!GetFixed32(&slice, &version_) || version_ != kVersion1) {
return Status::Corruption("Invalid Blob Log Header: version");
}
uint32_t val;
if (!GetFixed32(&slice, &val)) {
return Status::Corruption("Invalid Blob Log Header: subtype");
}
bool has_ttl = false;
bool has_ts = false;
RecordSubType st = static_cast<RecordSubType>(val & 0xff);
compression_ = static_cast<CompressionType>((val >> 8) & 0xff);
switch (st) {
case kRegularType:
break;
case kTTLType:
has_ttl = true;
break;
case kTimestampType:
has_ts = true;
break;
default:
return Status::Corruption("Invalid Blob Log Header: subtype_2");
}
ttlrange_t temp_ttl;
if (!GetFixed64(&slice, &temp_ttl.first) ||
!GetFixed64(&slice, &temp_ttl.second)) {
return Status::Corruption("Invalid Blob Log Header: ttl");
}
if (has_ttl) {
set_ttl_guess(temp_ttl);
}
tsrange_t temp_ts;
if (!GetFixed64(&slice, &temp_ts.first) ||
!GetFixed64(&slice, &temp_ts.second)) {
return Status::Corruption("Invalid Blob Log Header: timestamp");
}
if (has_ts) set_ts_guess(temp_ts);
return Status::OK();
}
BlobLogRecord::BlobLogRecord()
: checksum_(0),
header_cksum_(0),
key_size_(0),
blob_size_(0),
time_val_(0),
ttl_val_(0),
type_(0),
subtype_(0) {}
BlobLogRecord::~BlobLogRecord() {}
void BlobLogRecord::ResizeKeyBuffer(size_t kbs) {
if (kbs > key_buffer_.size()) {
key_buffer_.resize(kbs);
}
}
void BlobLogRecord::ResizeBlobBuffer(size_t bbs) {
if (bbs > blob_buffer_.size()) {
blob_buffer_.resize(bbs);
}
}
void BlobLogRecord::Clear() {
checksum_ = 0;
header_cksum_ = 0;
key_size_ = 0;
blob_size_ = 0;
time_val_ = 0;
ttl_val_ = 0;
type_ = subtype_ = 0;
key_.clear();
blob_.clear();
}
Status BlobLogRecord::DecodeHeaderFrom(const Slice& hdrslice) {
Slice input = hdrslice;
if (input.size() < kHeaderSize) {
return Status::Corruption("Invalid Blob Record Header: size");
}
if (!GetFixed32(&input, &key_size_)) {
return Status::Corruption("Invalid Blob Record Header: key_size");
}
if (!GetFixed64(&input, &blob_size_)) {
return Status::Corruption("Invalid Blob Record Header: blob_size");
}
if (!GetFixed64(&input, &ttl_val_)) {
return Status::Corruption("Invalid Blob Record Header: ttl_val");
}
if (!GetFixed64(&input, &time_val_)) {
return Status::Corruption("Invalid Blob Record Header: time_val");
}
type_ = *(input.data());
input.remove_prefix(1);
subtype_ = *(input.data());
input.remove_prefix(1);
if (!GetFixed32(&input, &header_cksum_)) {
return Status::Corruption("Invalid Blob Record Header: header_cksum");
}
if (!GetFixed32(&input, &checksum_)) {
return Status::Corruption("Invalid Blob Record Header: checksum");
}
return Status::OK();
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE