rocksdb/utilities/blob_db/blob_log_writer.cc
Yi Wu 419b93c56f Blob DB: not writing sequence number as blob record footer
Summary:
Previously each time we write a blob we write blog_record_header + key + value + blob_record_footer to blob log. The footer only contains a sequence and a crc for the sequence number. The sequence number was used in garbage collection to verify the value is recent. After #2703 we moved to use optimistic transaction and no longer use sequence number from the footer. Remove the footer altogether.

There's another usage of sequence number and we are keeping it: Each blob log file keep track of sequence number range of keys in it, and use it to check if it is reference by a snapshot, before being deleted.
Closes https://github.com/facebook/rocksdb/pull/3005

Differential Revision: D6057585

Pulled By: yiwu-arbug

fbshipit-source-id: d6da53c457a316e9723f359a1b47facfc3ffe090
2017-11-02 23:07:27 -07:00

154 lines
4.3 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_log_writer.h"
#include <cstdint>
#include <string>
#include "rocksdb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
namespace rocksdb {
namespace blob_db {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
uint64_t bpsync, bool use_fs, uint64_t boffset)
: dest_(std::move(dest)),
log_number_(log_number),
block_offset_(boffset),
bytes_per_sync_(bpsync),
next_sync_offset_(0),
use_fsync_(use_fs),
last_elem_type_(kEtNone) {
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
}
Writer::~Writer() {}
void Writer::Sync() { dest_->Sync(use_fsync_); }
Status Writer::WriteHeader(const BlobLogHeader& header) {
assert(block_offset_ == 0);
assert(last_elem_type_ == kEtNone);
std::string str;
header.EncodeTo(&str);
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
s = dest_->Flush();
}
last_elem_type_ = kEtFileHdr;
return s;
}
Status Writer::AppendFooter(const BlobLogFooter& footer) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string str;
footer.EncodeTo(&str);
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
s = dest_->Close();
dest_.reset();
}
last_elem_type_ = kEtFileFooter;
return s;
}
Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset,
uint64_t ttl) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf;
ConstructBlobHeader(&buf, key, val, ttl, -1);
Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
return s;
}
Status Writer::AddRecord(const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
assert(block_offset_ != 0);
assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
std::string buf;
ConstructBlobHeader(&buf, key, val, -1, -1);
Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
return s;
}
void Writer::ConstructBlobHeader(std::string* headerbuf, const Slice& key,
const Slice& val, uint64_t ttl, int64_t ts) {
headerbuf->reserve(BlobLogRecord::kHeaderSize);
uint32_t key_size = static_cast<uint32_t>(key.size());
PutFixed32(headerbuf, key_size);
PutFixed64(headerbuf, val.size());
PutFixed64(headerbuf, ttl);
PutFixed64(headerbuf, ts);
RecordType t = kFullType;
headerbuf->push_back(static_cast<char>(t));
RecordSubType st = kRegularType;
if (ttl != kNoExpiration) {
st = kTTLType;
}
headerbuf->push_back(static_cast<char>(st));
uint32_t header_crc = 0;
header_crc =
crc32c::Extend(header_crc, headerbuf->c_str(), headerbuf->size());
header_crc = crc32c::Extend(header_crc, key.data(), key.size());
header_crc = crc32c::Mask(header_crc);
PutFixed32(headerbuf, header_crc);
uint32_t crc = 0;
// Compute the crc of the record type and the payload.
crc = crc32c::Extend(crc, val.data(), val.size());
crc = crc32c::Mask(crc); // Adjust for storage
PutFixed32(headerbuf, crc);
}
Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
const Slice& key, const Slice& val,
uint64_t* key_offset, uint64_t* blob_offset) {
Status s = dest_->Append(Slice(headerbuf));
if (s.ok()) {
s = dest_->Append(key);
}
if (s.ok()) {
s = dest_->Append(val);
}
if (s.ok()) {
s = dest_->Flush();
}
*key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
*blob_offset = *key_offset + key.size();
block_offset_ = *blob_offset + val.size();
last_elem_type_ = kEtRecord;
return s;
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE