Blob DB: Inline small values in base DB
Summary: Adding the `min_blob_size` option to allow storing small values in base db (in LSM tree) together with the key. The goal is to improve performance for small values, while taking advantage of blob db's low write amplification for large values. Also adding expiration timestamp to blob index. It will be useful to evict stale blob indexes in base db by adding a compaction filter. I'll work on the compaction filter in future patches. See blob_index.h for the new blob index format. There are 4 cases when writing a new key: * small value w/o TTL: put in base db as normal value (i.e. ValueType::kTypeValue) * small value w/ TTL: put (type, expiration, value) to base db. * large value w/o TTL: write value to blob log and put (type, file, offset, size, compression) to base db. * large value w/TTL: write value to blob log and put (type, expiration, file, offset, size, compression) to base db. Closes https://github.com/facebook/rocksdb/pull/3066 Differential Revision: D6142115 Pulled By: yiwu-arbug fbshipit-source-id: 9526e76e19f0839310a3f5f2a43772a4ad182cd0
This commit is contained in:
parent
05d5c575ac
commit
d66bb21e18
@ -1588,7 +1588,7 @@ bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) {
|
|||||||
if (snapshots_.empty()) {
|
if (snapshots_.empty()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return (snapshots_.newest()->GetSequenceNumber() > sn);
|
return (snapshots_.newest()->GetSequenceNumber() >= sn);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
|
@ -16,6 +16,8 @@ namespace rocksdb {
|
|||||||
// store multiple versions of a same user key due to snapshots, compaction not
|
// store multiple versions of a same user key due to snapshots, compaction not
|
||||||
// happening yet, etc.
|
// happening yet, etc.
|
||||||
struct KeyVersion {
|
struct KeyVersion {
|
||||||
|
KeyVersion() : user_key(""), value(""), sequence(0), type(0) {}
|
||||||
|
|
||||||
KeyVersion(const std::string& _user_key, const std::string& _value,
|
KeyVersion(const std::string& _user_key, const std::string& _value,
|
||||||
SequenceNumber _sequence, int _type)
|
SequenceNumber _sequence, int _type)
|
||||||
: user_key(_user_key), value(_value), sequence(_sequence), type(_type) {}
|
: user_key(_user_key), value(_value), sequence(_sequence), type(_type) {}
|
||||||
|
@ -52,6 +52,10 @@ struct BlobDBOptions {
|
|||||||
// and so on
|
// and so on
|
||||||
uint64_t ttl_range_secs = 3600;
|
uint64_t ttl_range_secs = 3600;
|
||||||
|
|
||||||
|
// The smallest value to store in blob log. Value larger than this threshold
|
||||||
|
// will be inlined in base DB together with the key.
|
||||||
|
uint64_t min_blob_size = 0;
|
||||||
|
|
||||||
// at what bytes will the blob files be synced to blob log.
|
// at what bytes will the blob files be synced to blob log.
|
||||||
uint64_t bytes_per_sync = 0;
|
uint64_t bytes_per_sync = 0;
|
||||||
|
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
#include "util/timer_queue.h"
|
#include "util/timer_queue.h"
|
||||||
#include "utilities/blob_db/blob_db_iterator.h"
|
#include "utilities/blob_db/blob_db_iterator.h"
|
||||||
|
#include "utilities/blob_db/blob_index.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
int kBlockBasedTableVersionFormat = 2;
|
int kBlockBasedTableVersionFormat = 2;
|
||||||
@ -49,78 +50,8 @@ void extendTimestamps(rocksdb::blob_db::tsrange_t* ts_range, uint64_t ts) {
|
|||||||
} // end namespace
|
} // end namespace
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
namespace blob_db {
|
namespace blob_db {
|
||||||
|
|
||||||
// BlobHandle is a pointer to the blob that is stored in the LSM
|
|
||||||
class BlobHandle {
|
|
||||||
public:
|
|
||||||
BlobHandle()
|
|
||||||
: file_number_(std::numeric_limits<uint64_t>::max()),
|
|
||||||
offset_(std::numeric_limits<uint64_t>::max()),
|
|
||||||
size_(std::numeric_limits<uint64_t>::max()),
|
|
||||||
compression_(kNoCompression) {}
|
|
||||||
|
|
||||||
uint64_t filenumber() const { return file_number_; }
|
|
||||||
void set_filenumber(uint64_t fn) { file_number_ = fn; }
|
|
||||||
|
|
||||||
// The offset of the block in the file.
|
|
||||||
uint64_t offset() const { return offset_; }
|
|
||||||
void set_offset(uint64_t _offset) { offset_ = _offset; }
|
|
||||||
|
|
||||||
// The size of the stored block
|
|
||||||
uint64_t size() const { return size_; }
|
|
||||||
void set_size(uint64_t _size) { size_ = _size; }
|
|
||||||
|
|
||||||
CompressionType compression() const { return compression_; }
|
|
||||||
void set_compression(CompressionType t) { compression_ = t; }
|
|
||||||
|
|
||||||
void EncodeTo(std::string* dst) const;
|
|
||||||
|
|
||||||
Status DecodeFrom(const Slice& input);
|
|
||||||
|
|
||||||
void clear();
|
|
||||||
|
|
||||||
private:
|
|
||||||
uint64_t file_number_;
|
|
||||||
uint64_t offset_;
|
|
||||||
uint64_t size_;
|
|
||||||
CompressionType compression_;
|
|
||||||
};
|
|
||||||
|
|
||||||
void BlobHandle::EncodeTo(std::string* dst) const {
|
|
||||||
// Sanity check that all fields have been set
|
|
||||||
assert(offset_ != std::numeric_limits<uint64_t>::max());
|
|
||||||
assert(size_ != std::numeric_limits<uint64_t>::max());
|
|
||||||
assert(file_number_ != std::numeric_limits<uint64_t>::max());
|
|
||||||
|
|
||||||
dst->reserve(30);
|
|
||||||
PutVarint64(dst, file_number_);
|
|
||||||
PutVarint64(dst, offset_);
|
|
||||||
PutVarint64(dst, size_);
|
|
||||||
dst->push_back(static_cast<unsigned char>(compression_));
|
|
||||||
}
|
|
||||||
|
|
||||||
void BlobHandle::clear() {
|
|
||||||
file_number_ = std::numeric_limits<uint64_t>::max();
|
|
||||||
offset_ = std::numeric_limits<uint64_t>::max();
|
|
||||||
size_ = std::numeric_limits<uint64_t>::max();
|
|
||||||
compression_ = kNoCompression;
|
|
||||||
}
|
|
||||||
|
|
||||||
Status BlobHandle::DecodeFrom(const Slice& input) {
|
|
||||||
Slice s(input);
|
|
||||||
Slice* p = &s;
|
|
||||||
if (GetVarint64(p, &file_number_) && GetVarint64(p, &offset_) &&
|
|
||||||
GetVarint64(p, &size_)) {
|
|
||||||
compression_ = static_cast<CompressionType>(p->data()[0]);
|
|
||||||
return Status::OK();
|
|
||||||
} else {
|
|
||||||
clear();
|
|
||||||
return Status::Corruption("bad blob handle");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Random blob_rgen(static_cast<uint32_t>(time(nullptr)));
|
Random blob_rgen(static_cast<uint32_t>(time(nullptr)));
|
||||||
|
|
||||||
void BlobDBFlushBeginListener::OnFlushBegin(DB* db, const FlushJobInfo& info) {
|
void BlobDBFlushBeginListener::OnFlushBegin(DB* db, const FlushJobInfo& info) {
|
||||||
@ -149,19 +80,20 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
|
|||||||
if (!is_new &&
|
if (!is_new &&
|
||||||
value_type ==
|
value_type ==
|
||||||
CompactionEventListener::CompactionListenerValueType::kValue) {
|
CompactionEventListener::CompactionListenerValueType::kValue) {
|
||||||
BlobHandle handle;
|
BlobIndex blob_index;
|
||||||
Status s = handle.DecodeFrom(existing_value);
|
Status s = blob_index.DecodeFrom(existing_value);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
if (impl_->debug_level_ >= 3)
|
if (impl_->debug_level_ >= 3)
|
||||||
ROCKS_LOG_INFO(impl_->db_options_.info_log,
|
ROCKS_LOG_INFO(
|
||||||
"CALLBACK COMPACTED OUT KEY: %s SN: %d "
|
impl_->db_options_.info_log,
|
||||||
"NEW: %d FN: %" PRIu64 " OFFSET: %" PRIu64
|
"CALLBACK COMPACTED OUT KEY: %s SN: %d "
|
||||||
" SIZE: %" PRIu64,
|
"NEW: %d FN: %" PRIu64 " OFFSET: %" PRIu64 " SIZE: %" PRIu64,
|
||||||
key.ToString().c_str(), sn, is_new, handle.filenumber(),
|
key.ToString().c_str(), sn, is_new, blob_index.file_number(),
|
||||||
handle.offset(), handle.size());
|
blob_index.offset(), blob_index.size());
|
||||||
|
|
||||||
impl_->override_vals_q_.enqueue({handle.filenumber(), key.size(),
|
impl_->override_vals_q_.enqueue({blob_index.file_number(), key.size(),
|
||||||
handle.offset(), handle.size(), sn});
|
blob_index.offset(), blob_index.size(),
|
||||||
|
sn});
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (impl_->debug_level_ >= 3)
|
if (impl_->debug_level_ >= 3)
|
||||||
@ -178,7 +110,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
|
|||||||
db_impl_(nullptr),
|
db_impl_(nullptr),
|
||||||
env_(db_options.env),
|
env_(db_options.env),
|
||||||
ttl_extractor_(blob_db_options.ttl_extractor.get()),
|
ttl_extractor_(blob_db_options.ttl_extractor.get()),
|
||||||
wo_set_(false),
|
|
||||||
bdb_options_(blob_db_options),
|
bdb_options_(blob_db_options),
|
||||||
db_options_(db_options),
|
db_options_(db_options),
|
||||||
env_options_(db_options),
|
env_options_(db_options),
|
||||||
@ -235,7 +166,6 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
|
|||||||
BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
|
BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
|
||||||
: BlobDB(db),
|
: BlobDB(db),
|
||||||
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
|
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
|
||||||
wo_set_(false),
|
|
||||||
bdb_options_(blob_db_options),
|
bdb_options_(blob_db_options),
|
||||||
db_options_(db->GetOptions()),
|
db_options_(db->GetOptions()),
|
||||||
env_options_(db_->GetOptions()),
|
env_options_(db_->GetOptions()),
|
||||||
@ -610,17 +540,6 @@ std::shared_ptr<Writer> BlobDBImpl::CheckOrCreateWriterLocked(
|
|||||||
return writer;
|
return writer;
|
||||||
}
|
}
|
||||||
|
|
||||||
void BlobDBImpl::UpdateWriteOptions(const WriteOptions& options) {
|
|
||||||
if (!wo_set_.load(std::memory_order_relaxed)) {
|
|
||||||
// DCLP
|
|
||||||
WriteLock wl(&mutex_);
|
|
||||||
if (!wo_set_.load(std::memory_order_acquire)) {
|
|
||||||
wo_set_.store(true, std::memory_order_release);
|
|
||||||
write_options_ = options;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
||||||
uint32_t val = blob_rgen.Next();
|
uint32_t val = blob_rgen.Next();
|
||||||
{
|
{
|
||||||
@ -736,14 +655,6 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
|
|||||||
return bfile;
|
return bfile;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
|
|
||||||
const Slice& value) {
|
|
||||||
std::string new_value;
|
|
||||||
Slice value_slice;
|
|
||||||
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
|
|
||||||
return PutUntil(options, key, value_slice, expiration);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
|
Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
|
||||||
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
|
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
|
||||||
Status s = db_->Delete(options, key);
|
Status s = db_->Delete(options, key);
|
||||||
@ -753,141 +664,94 @@ Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
|
||||||
class BlobInserter : public WriteBatch::Handler {
|
private:
|
||||||
private:
|
const WriteOptions& options_;
|
||||||
BlobDBImpl* impl_;
|
BlobDBImpl* blob_db_impl_;
|
||||||
SequenceNumber sequence_;
|
uint32_t default_cf_id_;
|
||||||
WriteBatch updates_blob_;
|
SequenceNumber sequence_;
|
||||||
std::shared_ptr<BlobFile> last_file_;
|
WriteBatch batch_;
|
||||||
bool has_put_;
|
|
||||||
std::string new_value_;
|
|
||||||
uint32_t default_cf_id_;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
|
BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
|
||||||
: impl_(impl),
|
uint32_t default_cf_id, SequenceNumber seq)
|
||||||
sequence_(seq),
|
: options_(options),
|
||||||
has_put_(false),
|
blob_db_impl_(blob_db_impl),
|
||||||
default_cf_id_(reinterpret_cast<ColumnFamilyHandleImpl*>(
|
default_cf_id_(default_cf_id),
|
||||||
impl_->DefaultColumnFamily())
|
sequence_(seq) {}
|
||||||
->cfd()
|
|
||||||
->GetID()) {}
|
|
||||||
|
|
||||||
SequenceNumber sequence() { return sequence_; }
|
SequenceNumber sequence() { return sequence_; }
|
||||||
|
|
||||||
WriteBatch* updates_blob() { return &updates_blob_; }
|
WriteBatch* batch() { return &batch_; }
|
||||||
|
|
||||||
std::shared_ptr<BlobFile>& last_file() { return last_file_; }
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
||||||
|
const Slice& value) override {
|
||||||
bool has_put() { return has_put_; }
|
if (column_family_id != default_cf_id_) {
|
||||||
|
return Status::NotSupported(
|
||||||
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
"Blob DB doesn't support non-default column family.");
|
||||||
const Slice& value_slice) override {
|
|
||||||
if (column_family_id != default_cf_id_) {
|
|
||||||
return Status::NotSupported(
|
|
||||||
"Blob DB doesn't support non-default column family.");
|
|
||||||
}
|
|
||||||
Slice value_unc;
|
|
||||||
uint64_t expiration =
|
|
||||||
impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
|
|
||||||
|
|
||||||
std::shared_ptr<BlobFile> bfile =
|
|
||||||
(expiration != kNoExpiration)
|
|
||||||
? impl_->SelectBlobFileTTL(expiration)
|
|
||||||
: ((last_file_) ? last_file_ : impl_->SelectBlobFile());
|
|
||||||
if (last_file_ && last_file_ != bfile) {
|
|
||||||
return Status::NotFound("too many blob files");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!bfile) {
|
|
||||||
return Status::NotFound("blob file not found");
|
|
||||||
}
|
|
||||||
|
|
||||||
last_file_ = bfile;
|
|
||||||
has_put_ = true;
|
|
||||||
|
|
||||||
std::string compression_output;
|
|
||||||
Slice value = impl_->GetCompressedSlice(value_unc, &compression_output);
|
|
||||||
|
|
||||||
std::string headerbuf;
|
|
||||||
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
|
|
||||||
std::string index_entry;
|
|
||||||
Status s = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry);
|
|
||||||
if (!s.ok()) {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
bfile->ExtendSequenceRange(sequence_);
|
|
||||||
sequence_++;
|
|
||||||
|
|
||||||
if (expiration != kNoExpiration) {
|
|
||||||
extendTTL(&(bfile->ttl_range_), expiration);
|
|
||||||
}
|
|
||||||
|
|
||||||
return WriteBatchInternal::PutBlobIndex(&updates_blob_, column_family_id,
|
|
||||||
key, index_entry);
|
|
||||||
}
|
}
|
||||||
|
std::string new_value;
|
||||||
|
Slice value_slice;
|
||||||
|
uint64_t expiration =
|
||||||
|
blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value);
|
||||||
|
Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice,
|
||||||
|
expiration, sequence_, &batch_);
|
||||||
|
sequence_++;
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
virtual Status DeleteCF(uint32_t column_family_id,
|
virtual Status DeleteCF(uint32_t column_family_id,
|
||||||
const Slice& key) override {
|
const Slice& key) override {
|
||||||
if (column_family_id != default_cf_id_) {
|
if (column_family_id != default_cf_id_) {
|
||||||
return Status::NotSupported(
|
return Status::NotSupported(
|
||||||
"Blob DB doesn't support non-default column family.");
|
"Blob DB doesn't support non-default column family.");
|
||||||
}
|
|
||||||
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key);
|
|
||||||
sequence_++;
|
|
||||||
return Status::OK();
|
|
||||||
}
|
}
|
||||||
|
Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
|
||||||
|
sequence_++;
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
virtual Status DeleteRange(uint32_t column_family_id,
|
virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
|
||||||
const Slice& begin_key, const Slice& end_key) {
|
const Slice& end_key) {
|
||||||
if (column_family_id != default_cf_id_) {
|
if (column_family_id != default_cf_id_) {
|
||||||
return Status::NotSupported(
|
return Status::NotSupported(
|
||||||
"Blob DB doesn't support non-default column family.");
|
"Blob DB doesn't support non-default column family.");
|
||||||
}
|
|
||||||
WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id,
|
|
||||||
begin_key, end_key);
|
|
||||||
sequence_++;
|
|
||||||
return Status::OK();
|
|
||||||
}
|
}
|
||||||
|
Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
|
||||||
|
begin_key, end_key);
|
||||||
|
sequence_++;
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
|
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
|
||||||
const Slice& /*key*/) override {
|
const Slice& /*key*/) override {
|
||||||
return Status::NotSupported("Not supported operation in blob db.");
|
return Status::NotSupported("Not supported operation in blob db.");
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
|
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
|
||||||
const Slice& /*value*/) override {
|
const Slice& /*value*/) override {
|
||||||
return Status::NotSupported("Not supported operation in blob db.");
|
return Status::NotSupported("Not supported operation in blob db.");
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void LogData(const Slice& blob) override {
|
virtual void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
|
||||||
updates_blob_.PutLogData(blob);
|
};
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
|
Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
||||||
MutexLock l(&write_mutex_);
|
MutexLock l(&write_mutex_);
|
||||||
|
|
||||||
SequenceNumber current_seq = db_impl_->GetLatestSequenceNumber() + 1;
|
uint32_t default_cf_id =
|
||||||
BlobInserter blob_inserter(this, current_seq);
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||||
|
SequenceNumber current_seq = GetLatestSequenceNumber() + 1;
|
||||||
|
BlobInserter blob_inserter(options, this, default_cf_id, current_seq);
|
||||||
Status s = updates->Iterate(&blob_inserter);
|
Status s = updates->Iterate(&blob_inserter);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
s = db_->Write(opts, blob_inserter.updates_blob());
|
s = db_->Write(options, blob_inserter.batch());
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
assert(current_seq ==
|
assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1);
|
||||||
WriteBatchInternal::Sequence(blob_inserter.updates_blob()));
|
|
||||||
assert(blob_inserter.sequence() ==
|
|
||||||
current_seq + WriteBatchInternal::Count(blob_inserter.updates_blob()));
|
|
||||||
if (blob_inserter.has_put()) {
|
|
||||||
s = CloseBlobFileIfNeeded(blob_inserter.last_file());
|
|
||||||
if (!s.ok()) {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// add deleted key to list of keys that have been deleted for book-keeping
|
// add deleted key to list of keys that have been deleted for book-keeping
|
||||||
class DeleteBookkeeper : public WriteBatch::Handler {
|
class DeleteBookkeeper : public WriteBatch::Handler {
|
||||||
@ -956,12 +820,92 @@ void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value) {
|
||||||
|
std::string new_value;
|
||||||
|
Slice value_slice;
|
||||||
|
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
|
||||||
|
return PutUntil(options, key, value_slice, expiration);
|
||||||
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
||||||
const Slice& key, const Slice& value,
|
const Slice& key, const Slice& value,
|
||||||
uint64_t ttl) {
|
uint64_t ttl) {
|
||||||
uint64_t now = EpochNow();
|
uint64_t now = EpochNow();
|
||||||
assert(std::numeric_limits<uint64_t>::max() - now > ttl);
|
uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
|
||||||
return PutUntil(options, key, value, now + ttl);
|
return PutUntil(options, key, value, expiration);
|
||||||
|
}
|
||||||
|
|
||||||
|
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value, uint64_t expiration) {
|
||||||
|
MutexLock l(&write_mutex_);
|
||||||
|
SequenceNumber sequence = GetLatestSequenceNumber() + 1;
|
||||||
|
WriteBatch batch;
|
||||||
|
Status s = PutBlobValue(options, key, value, expiration, sequence, &batch);
|
||||||
|
if (s.ok()) {
|
||||||
|
s = db_->Write(options, &batch);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value, uint64_t expiration,
|
||||||
|
SequenceNumber sequence, WriteBatch* batch) {
|
||||||
|
TEST_SYNC_POINT("BlobDBImpl::PutBlobValue:Start");
|
||||||
|
Status s;
|
||||||
|
std::string index_entry;
|
||||||
|
uint32_t column_family_id =
|
||||||
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
||||||
|
if (value.size() < bdb_options_.min_blob_size) {
|
||||||
|
if (expiration == kNoExpiration) {
|
||||||
|
// Put as normal value
|
||||||
|
s = batch->Put(key, value);
|
||||||
|
} else {
|
||||||
|
// Inlined with TTL
|
||||||
|
BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
|
||||||
|
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
|
||||||
|
index_entry);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
|
||||||
|
? SelectBlobFileTTL(expiration)
|
||||||
|
: SelectBlobFile();
|
||||||
|
if (!bfile) {
|
||||||
|
return Status::NotFound("Blob file not found");
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string compression_output;
|
||||||
|
Slice value_compressed = GetCompressedSlice(value, &compression_output);
|
||||||
|
|
||||||
|
std::string headerbuf;
|
||||||
|
Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration,
|
||||||
|
-1);
|
||||||
|
|
||||||
|
s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
|
||||||
|
&index_entry);
|
||||||
|
|
||||||
|
if (s.ok()) {
|
||||||
|
bfile->ExtendSequenceRange(sequence);
|
||||||
|
if (expiration != kNoExpiration) {
|
||||||
|
extendTTL(&(bfile->ttl_range_), expiration);
|
||||||
|
}
|
||||||
|
s = CloseBlobFileIfNeeded(bfile);
|
||||||
|
if (s.ok()) {
|
||||||
|
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
|
||||||
|
index_entry);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
|
"Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
|
||||||
|
" status: '%s' blob_file: '%s'",
|
||||||
|
bfile->PathName().c_str(), key.ToString().c_str(),
|
||||||
|
value.size(), s.ToString().c_str(),
|
||||||
|
bfile->DumpState().c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_SYNC_POINT("BlobDBImpl::PutBlobValue:Finish");
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
||||||
@ -976,63 +920,6 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
|||||||
return *compression_output;
|
return *compression_output;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
|
|
||||||
const Slice& value_unc, uint64_t expiration) {
|
|
||||||
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
|
|
||||||
MutexLock l(&write_mutex_);
|
|
||||||
UpdateWriteOptions(options);
|
|
||||||
|
|
||||||
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
|
|
||||||
? SelectBlobFileTTL(expiration)
|
|
||||||
: SelectBlobFile();
|
|
||||||
|
|
||||||
if (!bfile) return Status::NotFound("Blob file not found");
|
|
||||||
|
|
||||||
std::string compression_output;
|
|
||||||
Slice value = GetCompressedSlice(value_unc, &compression_output);
|
|
||||||
|
|
||||||
std::string headerbuf;
|
|
||||||
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
|
|
||||||
|
|
||||||
std::string index_entry;
|
|
||||||
Status s = AppendBlob(bfile, headerbuf, key, value, &index_entry);
|
|
||||||
if (!s.ok()) {
|
|
||||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
||||||
"Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
|
|
||||||
" status: '%s' blob_file: '%s'",
|
|
||||||
bfile->PathName().c_str(), key.ToString().c_str(),
|
|
||||||
value.size(), s.ToString().c_str(),
|
|
||||||
bfile->DumpState().c_str());
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
WriteBatch batch;
|
|
||||||
uint32_t column_family_id =
|
|
||||||
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
|
||||||
s = WriteBatchInternal::PutBlobIndex(&batch, column_family_id, key,
|
|
||||||
index_entry);
|
|
||||||
|
|
||||||
// this goes to the base db and can be expensive
|
|
||||||
if (s.ok()) {
|
|
||||||
s = db_->Write(options, &batch);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (s.ok()) {
|
|
||||||
// this is the sequence number of the write.
|
|
||||||
SequenceNumber sn = WriteBatchInternal::Sequence(&batch);
|
|
||||||
bfile->ExtendSequenceRange(sn);
|
|
||||||
|
|
||||||
if (expiration != kNoExpiration) {
|
|
||||||
extendTTL(&(bfile->ttl_range_), expiration);
|
|
||||||
}
|
|
||||||
|
|
||||||
s = CloseBlobFileIfNeeded(bfile);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
|
uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
|
||||||
Slice* value_slice,
|
Slice* value_slice,
|
||||||
std::string* new_value) {
|
std::string* new_value) {
|
||||||
@ -1049,7 +936,8 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
|
|||||||
|
|
||||||
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
||||||
const std::string& headerbuf, const Slice& key,
|
const std::string& headerbuf, const Slice& key,
|
||||||
const Slice& value, std::string* index_entry) {
|
const Slice& value, uint64_t expiration,
|
||||||
|
std::string* index_entry) {
|
||||||
auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size();
|
auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size();
|
||||||
if (bdb_options_.blob_dir_size > 0 &&
|
if (bdb_options_.blob_dir_size > 0 &&
|
||||||
(total_blob_space_.load() + size_put) > bdb_options_.blob_dir_size) {
|
(total_blob_space_.load() + size_put) > bdb_options_.blob_dir_size) {
|
||||||
@ -1086,18 +974,14 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
|||||||
last_period_write_ += size_put;
|
last_period_write_ += size_put;
|
||||||
total_blob_space_ += size_put;
|
total_blob_space_ += size_put;
|
||||||
|
|
||||||
BlobHandle handle;
|
if (expiration == kNoExpiration) {
|
||||||
handle.set_filenumber(bfile->BlobFileNumber());
|
BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
|
||||||
handle.set_size(value.size());
|
value.size(), bdb_options_.compression);
|
||||||
handle.set_offset(blob_offset);
|
} else {
|
||||||
handle.set_compression(bdb_options_.compression);
|
BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
|
||||||
handle.EncodeTo(index_entry);
|
blob_offset, value.size(),
|
||||||
|
bdb_options_.compression);
|
||||||
if (debug_level_ >= 3)
|
}
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
|
||||||
">Adding KEY FILE: %s: BC: %d OFFSET: %d SZ: %d",
|
|
||||||
bfile->PathName().c_str(), bfile->blob_count_.load(),
|
|
||||||
blob_offset, value.size());
|
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
@ -1138,29 +1022,45 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
|
|||||||
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||||
PinnableSlice* value) {
|
PinnableSlice* value) {
|
||||||
assert(value != nullptr);
|
assert(value != nullptr);
|
||||||
BlobHandle handle;
|
BlobIndex blob_index;
|
||||||
Status s = handle.DecodeFrom(index_entry);
|
Status s = blob_index.DecodeFrom(index_entry);
|
||||||
if (!s.ok()) return s;
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
|
||||||
|
return Status::NotFound("Key expired");
|
||||||
|
}
|
||||||
|
if (blob_index.IsInlined()) {
|
||||||
|
// TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
|
||||||
|
// memory buffer to avoid extra copy.
|
||||||
|
value->PinSelf(blob_index.value());
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
if (blob_index.size() == 0) {
|
||||||
|
value->PinSelf("");
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
// offset has to have certain min, as we will read CRC
|
// offset has to have certain min, as we will read CRC
|
||||||
// later from the Blob Header, which needs to be also a
|
// later from the Blob Header, which needs to be also a
|
||||||
// valid offset.
|
// valid offset.
|
||||||
if (handle.offset() <
|
if (blob_index.offset() <
|
||||||
(BlobLogHeader::kHeaderSize + BlobLogRecord::kHeaderSize + key.size())) {
|
(BlobLogHeader::kHeaderSize + BlobLogRecord::kHeaderSize + key.size())) {
|
||||||
if (debug_level_ >= 2) {
|
if (debug_level_ >= 2) {
|
||||||
ROCKS_LOG_ERROR(
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
db_options_.info_log,
|
"Invalid blob index file_number: %" PRIu64
|
||||||
"Invalid blob handle file_number: %" PRIu64 " blob_offset: %" PRIu64
|
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
|
||||||
" blob_size: %" PRIu64 " key: %s",
|
" key: %s",
|
||||||
handle.filenumber(), handle.offset(), handle.size(), key.data());
|
blob_index.file_number(), blob_index.offset(),
|
||||||
|
blob_index.size(), key.data());
|
||||||
}
|
}
|
||||||
return Status::NotFound("Blob Not Found, although found in LSM");
|
return Status::NotFound("Invalid blob offset");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<BlobFile> bfile;
|
std::shared_ptr<BlobFile> bfile;
|
||||||
{
|
{
|
||||||
ReadLock rl(&mutex_);
|
ReadLock rl(&mutex_);
|
||||||
auto hitr = blob_files_.find(handle.filenumber());
|
auto hitr = blob_files_.find(blob_index.file_number());
|
||||||
|
|
||||||
// file was deleted
|
// file was deleted
|
||||||
if (hitr == blob_files_.end()) {
|
if (hitr == blob_files_.end()) {
|
||||||
@ -1170,7 +1070,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
bfile = hitr->second;
|
bfile = hitr->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle.size() == 0 && value != nullptr) {
|
if (blob_index.size() == 0 && value != nullptr) {
|
||||||
value->PinSelf("");
|
value->PinSelf("");
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
@ -1186,19 +1086,19 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// allocate the buffer. This is safe in C++11
|
// allocate the buffer. This is safe in C++11
|
||||||
valueptr->resize(handle.size());
|
valueptr->resize(blob_index.size());
|
||||||
char* buffer = &(*valueptr)[0];
|
char* buffer = &(*valueptr)[0];
|
||||||
|
|
||||||
Slice blob_value;
|
Slice blob_value;
|
||||||
s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer);
|
s = reader->Read(blob_index.offset(), blob_index.size(), &blob_value, buffer);
|
||||||
if (!s.ok() || blob_value.size() != handle.size()) {
|
if (!s.ok() || blob_value.size() != blob_index.size()) {
|
||||||
if (debug_level_ >= 2) {
|
if (debug_level_ >= 2) {
|
||||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
"Failed to read blob from file: %s blob_offset: %" PRIu64
|
"Failed to read blob from file: %s blob_offset: %" PRIu64
|
||||||
" blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
|
" blob_size: %" PRIu64 " read: %d key: %s status: '%s'",
|
||||||
bfile->PathName().c_str(), handle.offset(), handle.size(),
|
bfile->PathName().c_str(), blob_index.offset(),
|
||||||
static_cast<int>(blob_value.size()), key.data(),
|
blob_index.size(), static_cast<int>(blob_value.size()),
|
||||||
s.ToString().c_str());
|
key.data(), s.ToString().c_str());
|
||||||
}
|
}
|
||||||
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
|
return Status::NotFound("Blob Not Found as couldnt retrieve Blob");
|
||||||
}
|
}
|
||||||
@ -1208,15 +1108,15 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
std::string crc_str;
|
std::string crc_str;
|
||||||
crc_str.resize(sizeof(uint32_t));
|
crc_str.resize(sizeof(uint32_t));
|
||||||
char* crc_buffer = &(crc_str[0]);
|
char* crc_buffer = &(crc_str[0]);
|
||||||
s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)),
|
s = reader->Read(blob_index.offset() - (key.size() + sizeof(uint32_t)),
|
||||||
sizeof(uint32_t), &crc_slice, crc_buffer);
|
sizeof(uint32_t), &crc_slice, crc_buffer);
|
||||||
if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) {
|
if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) {
|
||||||
if (debug_level_ >= 2) {
|
if (debug_level_ >= 2) {
|
||||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
"Failed to fetch blob crc file: %s blob_offset: %" PRIu64
|
"Failed to fetch blob crc file: %s blob_offset: %" PRIu64
|
||||||
" blob_size: %" PRIu64 " key: %s status: '%s'",
|
" blob_size: %" PRIu64 " key: %s status: '%s'",
|
||||||
bfile->PathName().c_str(), handle.offset(), handle.size(),
|
bfile->PathName().c_str(), blob_index.offset(),
|
||||||
key.data(), s.ToString().c_str());
|
blob_index.size(), key.data(), s.ToString().c_str());
|
||||||
}
|
}
|
||||||
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
|
return Status::NotFound("Blob Not Found as couldnt retrieve CRC");
|
||||||
}
|
}
|
||||||
@ -1228,8 +1128,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|||||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
"Blob crc mismatch file: %s blob_offset: %" PRIu64
|
"Blob crc mismatch file: %s blob_offset: %" PRIu64
|
||||||
" blob_size: %" PRIu64 " key: %s status: '%s'",
|
" blob_size: %" PRIu64 " key: %s status: '%s'",
|
||||||
bfile->PathName().c_str(), handle.offset(), handle.size(),
|
bfile->PathName().c_str(), blob_index.offset(),
|
||||||
key.data(), s.ToString().c_str());
|
blob_index.size(), key.data(), s.ToString().c_str());
|
||||||
}
|
}
|
||||||
return Status::Corruption("Corruption. Blob CRC mismatch");
|
return Status::Corruption("Corruption. Blob CRC mismatch");
|
||||||
}
|
}
|
||||||
@ -1357,8 +1257,9 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked(
|
|||||||
|
|
||||||
SequenceNumber esn = bfile->GetSNRange().first;
|
SequenceNumber esn = bfile->GetSNRange().first;
|
||||||
|
|
||||||
// this is not correct.
|
// TODO(yiwu): Here we should check instead if there is an active snapshot
|
||||||
// you want to check that there are no snapshots in the
|
// lies between the first sequence in the file, and the last sequence by
|
||||||
|
// the time the file finished being garbage collect.
|
||||||
bool notok = db_impl_->HasActiveSnapshotLaterThanSN(esn);
|
bool notok = db_impl_->HasActiveSnapshotLaterThanSN(esn);
|
||||||
if (notok) {
|
if (notok) {
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
ROCKS_LOG_INFO(db_options_.info_log,
|
||||||
@ -1398,16 +1299,16 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
|
bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
|
||||||
BlobHandle handle;
|
BlobIndex blob_index;
|
||||||
Status s = handle.DecodeFrom(index_entry);
|
Status s = blob_index.DecodeFrom(index_entry);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
ROCKS_LOG_INFO(db_options_.info_log,
|
||||||
"Could not parse lsm val in MarkBlobDeleted %s",
|
"Could not parse lsm val in MarkBlobDeleted %s",
|
||||||
index_entry.ToString().c_str());
|
index_entry.ToString().c_str());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
bool succ = FindFileAndEvictABlob(handle.filenumber(), key.size(),
|
bool succ = FindFileAndEvictABlob(blob_index.file_number(), key.size(),
|
||||||
handle.offset(), handle.size());
|
blob_index.offset(), blob_index.size());
|
||||||
return succ;
|
return succ;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1756,16 +1657,16 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
BlobHandle handle;
|
BlobIndex blob_index;
|
||||||
s = handle.DecodeFrom(index_entry);
|
s = blob_index.DecodeFrom(index_entry);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
"Error while decoding index entry: %s",
|
"Error while decoding index entry: %s",
|
||||||
s.ToString().c_str());
|
s.ToString().c_str());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (handle.filenumber() != bfptr->BlobFileNumber() ||
|
if (blob_index.file_number() != bfptr->BlobFileNumber() ||
|
||||||
handle.offset() != blob_offset) {
|
blob_index.offset() != blob_offset) {
|
||||||
// Key has been overwritten. Drop the blob record.
|
// Key has been overwritten. Drop the blob record.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -1842,12 +1743,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|||||||
s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset,
|
s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset,
|
||||||
&new_blob_offset, record.GetTTL());
|
&new_blob_offset, record.GetTTL());
|
||||||
|
|
||||||
BlobHandle new_handle;
|
BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
|
||||||
new_handle.set_filenumber(newfile->BlobFileNumber());
|
new_blob_offset, record.Blob().size(),
|
||||||
new_handle.set_size(record.Blob().size());
|
bdb_options_.compression);
|
||||||
new_handle.set_offset(new_blob_offset);
|
|
||||||
new_handle.set_compression(bdb_options_.compression);
|
|
||||||
new_handle.EncodeTo(&new_index_entry);
|
|
||||||
|
|
||||||
newfile->blob_count_++;
|
newfile->blob_count_++;
|
||||||
newfile->file_size_ +=
|
newfile->file_size_ +=
|
||||||
@ -2268,6 +2166,11 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options,
|
|||||||
}
|
}
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
|
Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||||
|
PinnableSlice* value) {
|
||||||
|
return GetBlobValue(key, index_entry, value);
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
|
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
|
||||||
ReadLock l(&mutex_);
|
ReadLock l(&mutex_);
|
||||||
std::vector<std::shared_ptr<BlobFile>> blob_files;
|
std::vector<std::shared_ptr<BlobFile>> blob_files;
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include <set>
|
#include <set>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <unordered_map>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@ -215,9 +216,6 @@ class BlobDBImpl : public BlobDB {
|
|||||||
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
||||||
const Slice& key, PinnableSlice* value) override;
|
const Slice& key, PinnableSlice* value) override;
|
||||||
|
|
||||||
Status GetBlobValue(const Slice& key, const Slice& index_entry,
|
|
||||||
PinnableSlice* value);
|
|
||||||
|
|
||||||
using BlobDB::NewIterator;
|
using BlobDB::NewIterator;
|
||||||
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
|
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
|
||||||
|
|
||||||
@ -249,7 +247,7 @@ class BlobDBImpl : public BlobDB {
|
|||||||
|
|
||||||
using BlobDB::PutUntil;
|
using BlobDB::PutUntil;
|
||||||
Status PutUntil(const WriteOptions& options, const Slice& key,
|
Status PutUntil(const WriteOptions& options, const Slice& key,
|
||||||
const Slice& value_unc, uint64_t expiration) override;
|
const Slice& value, uint64_t expiration) override;
|
||||||
|
|
||||||
Status LinkToBaseDB(DB* db) override;
|
Status LinkToBaseDB(DB* db) override;
|
||||||
|
|
||||||
@ -263,6 +261,9 @@ class BlobDBImpl : public BlobDB {
|
|||||||
~BlobDBImpl();
|
~BlobDBImpl();
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
|
Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||||
|
PinnableSlice* value);
|
||||||
|
|
||||||
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
|
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
|
||||||
|
|
||||||
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
|
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
|
||||||
@ -281,6 +282,7 @@ class BlobDBImpl : public BlobDB {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
class GarbageCollectionWriteCallback;
|
class GarbageCollectionWriteCallback;
|
||||||
|
class BlobInserter;
|
||||||
|
|
||||||
Status OpenPhase1();
|
Status OpenPhase1();
|
||||||
|
|
||||||
@ -288,6 +290,9 @@ class BlobDBImpl : public BlobDB {
|
|||||||
// Return true if a snapshot is created.
|
// Return true if a snapshot is created.
|
||||||
bool SetSnapshotIfNeeded(ReadOptions* read_options);
|
bool SetSnapshotIfNeeded(ReadOptions* read_options);
|
||||||
|
|
||||||
|
Status GetBlobValue(const Slice& key, const Slice& index_entry,
|
||||||
|
PinnableSlice* value);
|
||||||
|
|
||||||
Slice GetCompressedSlice(const Slice& raw,
|
Slice GetCompressedSlice(const Slice& raw,
|
||||||
std::string* compression_output) const;
|
std::string* compression_output) const;
|
||||||
|
|
||||||
@ -314,9 +319,14 @@ class BlobDBImpl : public BlobDB {
|
|||||||
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
|
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
|
||||||
Slice* value_slice, std::string* new_value);
|
Slice* value_slice, std::string* new_value);
|
||||||
|
|
||||||
|
Status PutBlobValue(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value, uint64_t expiration,
|
||||||
|
SequenceNumber sequence, WriteBatch* batch);
|
||||||
|
|
||||||
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
||||||
const std::string& headerbuf, const Slice& key,
|
const std::string& headerbuf, const Slice& key,
|
||||||
const Slice& value, std::string* index_entry);
|
const Slice& value, uint64_t expiration,
|
||||||
|
std::string* index_entry);
|
||||||
|
|
||||||
// find an existing blob log file based on the expiration unix epoch
|
// find an existing blob log file based on the expiration unix epoch
|
||||||
// if such a file does not exist, return nullptr
|
// if such a file does not exist, return nullptr
|
||||||
@ -327,8 +337,6 @@ class BlobDBImpl : public BlobDB {
|
|||||||
|
|
||||||
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
|
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
|
||||||
|
|
||||||
void UpdateWriteOptions(const WriteOptions& options);
|
|
||||||
|
|
||||||
void Shutdown();
|
void Shutdown();
|
||||||
|
|
||||||
// periodic sanity check. Bunch of checks
|
// periodic sanity check. Bunch of checks
|
||||||
@ -426,10 +434,6 @@ class BlobDBImpl : public BlobDB {
|
|||||||
Env* env_;
|
Env* env_;
|
||||||
TTLExtractor* ttl_extractor_;
|
TTLExtractor* ttl_extractor_;
|
||||||
|
|
||||||
// a boolean to capture whether write_options has been set
|
|
||||||
std::atomic<bool> wo_set_;
|
|
||||||
WriteOptions write_options_;
|
|
||||||
|
|
||||||
// the options that govern the behavior of Blob Storage
|
// the options that govern the behavior of Blob Storage
|
||||||
BlobDBOptions bdb_options_;
|
BlobDBOptions bdb_options_;
|
||||||
DBOptions db_options_;
|
DBOptions db_options_;
|
||||||
|
@ -5,19 +5,24 @@
|
|||||||
|
|
||||||
#ifndef ROCKSDB_LITE
|
#ifndef ROCKSDB_LITE
|
||||||
|
|
||||||
#include "utilities/blob_db/blob_db.h"
|
#include <algorithm>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
#include "db/db_test_util.h"
|
#include "db/db_test_util.h"
|
||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
|
#include "rocksdb/utilities/debug.h"
|
||||||
#include "util/cast_util.h"
|
#include "util/cast_util.h"
|
||||||
#include "util/random.h"
|
#include "util/random.h"
|
||||||
#include "util/string_util.h"
|
#include "util/string_util.h"
|
||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
#include "util/testharness.h"
|
#include "util/testharness.h"
|
||||||
|
#include "utilities/blob_db/blob_db.h"
|
||||||
#include "utilities/blob_db/blob_db_impl.h"
|
#include "utilities/blob_db/blob_db_impl.h"
|
||||||
|
#include "utilities/blob_db/blob_index.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
namespace blob_db {
|
namespace blob_db {
|
||||||
@ -26,6 +31,12 @@ class BlobDBTest : public testing::Test {
|
|||||||
public:
|
public:
|
||||||
const int kMaxBlobSize = 1 << 14;
|
const int kMaxBlobSize = 1 << 14;
|
||||||
|
|
||||||
|
struct BlobRecord {
|
||||||
|
std::string key;
|
||||||
|
std::string value;
|
||||||
|
uint64_t expiration = 0;
|
||||||
|
};
|
||||||
|
|
||||||
BlobDBTest()
|
BlobDBTest()
|
||||||
: dbname_(test::TmpDir() + "/blob_db_test"),
|
: dbname_(test::TmpDir() + "/blob_db_test"),
|
||||||
mock_env_(new MockTimeEnv(Env::Default())),
|
mock_env_(new MockTimeEnv(Env::Default())),
|
||||||
@ -127,6 +138,32 @@ class BlobDBTest : public testing::Test {
|
|||||||
delete iter;
|
delete iter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void VerifyBaseDB(
|
||||||
|
const std::map<std::string, KeyVersion> &expected_versions) {
|
||||||
|
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
|
||||||
|
DB *db = blob_db_->GetRootDB();
|
||||||
|
std::vector<KeyVersion> versions;
|
||||||
|
GetAllKeyVersions(db, "", "", &versions);
|
||||||
|
ASSERT_EQ(expected_versions.size(), versions.size());
|
||||||
|
size_t i = 0;
|
||||||
|
for (auto &key_version : expected_versions) {
|
||||||
|
const KeyVersion &expected_version = key_version.second;
|
||||||
|
ASSERT_EQ(expected_version.user_key, versions[i].user_key);
|
||||||
|
ASSERT_EQ(expected_version.sequence, versions[i].sequence);
|
||||||
|
ASSERT_EQ(expected_version.type, versions[i].type);
|
||||||
|
if (versions[i].type == kTypeValue) {
|
||||||
|
ASSERT_EQ(expected_version.value, versions[i].value);
|
||||||
|
} else {
|
||||||
|
ASSERT_EQ(kTypeBlobIndex, versions[i].type);
|
||||||
|
PinnableSlice value;
|
||||||
|
ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
|
||||||
|
versions[i].value, &value));
|
||||||
|
ASSERT_EQ(expected_version.value, value.ToString());
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void InsertBlobs() {
|
void InsertBlobs() {
|
||||||
WriteOptions wo;
|
WriteOptions wo;
|
||||||
std::string value;
|
std::string value;
|
||||||
@ -151,6 +188,7 @@ class BlobDBTest : public testing::Test {
|
|||||||
TEST_F(BlobDBTest, Put) {
|
TEST_F(BlobDBTest, Put) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
std::map<std::string, std::string> data;
|
std::map<std::string, std::string> data;
|
||||||
@ -166,6 +204,7 @@ TEST_F(BlobDBTest, PutWithTTL) {
|
|||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
bdb_options.ttl_range_secs = 1000;
|
bdb_options.ttl_range_secs = 1000;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options, options);
|
Open(bdb_options, options);
|
||||||
@ -195,6 +234,7 @@ TEST_F(BlobDBTest, PutUntil) {
|
|||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
bdb_options.ttl_range_secs = 1000;
|
bdb_options.ttl_range_secs = 1000;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options, options);
|
Open(bdb_options, options);
|
||||||
@ -226,6 +266,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
|
|||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
bdb_options.ttl_range_secs = 1000;
|
bdb_options.ttl_range_secs = 1000;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||||
bdb_options.num_concurrent_simple_blobs = 1;
|
bdb_options.num_concurrent_simple_blobs = 1;
|
||||||
bdb_options.ttl_extractor = ttl_extractor_;
|
bdb_options.ttl_extractor = ttl_extractor_;
|
||||||
@ -275,6 +316,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
|
|||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
bdb_options.ttl_range_secs = 1000;
|
bdb_options.ttl_range_secs = 1000;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||||
bdb_options.ttl_extractor = ttl_extractor_;
|
bdb_options.ttl_extractor = ttl_extractor_;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
@ -322,6 +364,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
|
|||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
bdb_options.ttl_range_secs = 1000;
|
bdb_options.ttl_range_secs = 1000;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||||
bdb_options.ttl_extractor = ttl_extractor_;
|
bdb_options.ttl_extractor = ttl_extractor_;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
@ -369,6 +412,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
|
|||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
bdb_options.ttl_range_secs = 1000;
|
bdb_options.ttl_range_secs = 1000;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||||
bdb_options.ttl_extractor = std::make_shared<TestTTLExtractor>();
|
bdb_options.ttl_extractor = std::make_shared<TestTTLExtractor>();
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
@ -403,6 +447,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
|
|||||||
TEST_F(BlobDBTest, StackableDBGet) {
|
TEST_F(BlobDBTest, StackableDBGet) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
std::map<std::string, std::string> data;
|
std::map<std::string, std::string> data;
|
||||||
@ -425,6 +470,7 @@ TEST_F(BlobDBTest, StackableDBGet) {
|
|||||||
TEST_F(BlobDBTest, WriteBatch) {
|
TEST_F(BlobDBTest, WriteBatch) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
std::map<std::string, std::string> data;
|
std::map<std::string, std::string> data;
|
||||||
@ -441,6 +487,7 @@ TEST_F(BlobDBTest, WriteBatch) {
|
|||||||
TEST_F(BlobDBTest, Delete) {
|
TEST_F(BlobDBTest, Delete) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
std::map<std::string, std::string> data;
|
std::map<std::string, std::string> data;
|
||||||
@ -456,6 +503,7 @@ TEST_F(BlobDBTest, Delete) {
|
|||||||
TEST_F(BlobDBTest, DeleteBatch) {
|
TEST_F(BlobDBTest, DeleteBatch) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
for (size_t i = 0; i < 100; i++) {
|
for (size_t i = 0; i < 100; i++) {
|
||||||
@ -473,6 +521,7 @@ TEST_F(BlobDBTest, DeleteBatch) {
|
|||||||
TEST_F(BlobDBTest, Override) {
|
TEST_F(BlobDBTest, Override) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
std::map<std::string, std::string> data;
|
std::map<std::string, std::string> data;
|
||||||
@ -490,6 +539,7 @@ TEST_F(BlobDBTest, Override) {
|
|||||||
TEST_F(BlobDBTest, Compression) {
|
TEST_F(BlobDBTest, Compression) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
bdb_options.compression = CompressionType::kSnappyCompression;
|
bdb_options.compression = CompressionType::kSnappyCompression;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
@ -541,6 +591,7 @@ TEST_F(BlobDBTest, MultipleWriters) {
|
|||||||
TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
|
TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
BlobDBImpl *blob_db_impl =
|
BlobDBImpl *blob_db_impl =
|
||||||
@ -580,6 +631,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
|
|||||||
TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
|
TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
|
ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
|
||||||
@ -591,8 +643,8 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
|
|||||||
|
|
||||||
SyncPoint::GetInstance()->LoadDependency(
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
|
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
|
||||||
"BlobDBImpl::PutUntil:Start"},
|
"BlobDBImpl::PutBlobValue:Start"},
|
||||||
{"BlobDBImpl::PutUntil:Finish",
|
{"BlobDBImpl::PutBlobValue:Finish",
|
||||||
"BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
|
"BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
@ -615,6 +667,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
|
|||||||
Options options;
|
Options options;
|
||||||
options.env = mock_env_.get();
|
options.env = mock_env_.get();
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options, options);
|
Open(bdb_options, options);
|
||||||
mock_env_->set_current_time(100);
|
mock_env_->set_current_time(100);
|
||||||
@ -628,8 +681,8 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
|
|||||||
|
|
||||||
SyncPoint::GetInstance()->LoadDependency(
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
|
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
|
||||||
"BlobDBImpl::PutUntil:Start"},
|
"BlobDBImpl::PutBlobValue:Start"},
|
||||||
{"BlobDBImpl::PutUntil:Finish",
|
{"BlobDBImpl::PutBlobValue:Finish",
|
||||||
"BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
|
"BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
@ -656,6 +709,7 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) {
|
|||||||
bdb_options.is_fifo = true;
|
bdb_options.is_fifo = true;
|
||||||
bdb_options.blob_dir_size = 100;
|
bdb_options.blob_dir_size = 100;
|
||||||
bdb_options.blob_file_size = 100;
|
bdb_options.blob_file_size = 100;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
std::string value(100, 'v');
|
std::string value(100, 'v');
|
||||||
@ -687,6 +741,7 @@ TEST_F(BlobDBTest, ReadWhileGC) {
|
|||||||
// run the same test for Get(), MultiGet() and Iterator each.
|
// run the same test for Get(), MultiGet() and Iterator each.
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < 2; i++) {
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
blob_db_->Put(WriteOptions(), "foo", "bar");
|
blob_db_->Put(WriteOptions(), "foo", "bar");
|
||||||
@ -798,6 +853,7 @@ TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
|
|||||||
TEST_F(BlobDBTest, GetLiveFilesMetaData) {
|
TEST_F(BlobDBTest, GetLiveFilesMetaData) {
|
||||||
Random rnd(301);
|
Random rnd(301);
|
||||||
BlobDBOptions bdb_options;
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.min_blob_size = 0;
|
||||||
bdb_options.disable_background_tasks = true;
|
bdb_options.disable_background_tasks = true;
|
||||||
Open(bdb_options);
|
Open(bdb_options);
|
||||||
std::map<std::string, std::string> data;
|
std::map<std::string, std::string> data;
|
||||||
@ -894,6 +950,75 @@ TEST_F(BlobDBTest, OutOfSpace) {
|
|||||||
ASSERT_TRUE(s.IsNoSpace());
|
ASSERT_TRUE(s.IsNoSpace());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(BlobDBTest, InlineSmallValues) {
|
||||||
|
constexpr uint64_t kMaxExpiration = 1000;
|
||||||
|
Random rnd(301);
|
||||||
|
BlobDBOptions bdb_options;
|
||||||
|
bdb_options.ttl_range_secs = kMaxExpiration;
|
||||||
|
bdb_options.min_blob_size = 100;
|
||||||
|
bdb_options.blob_file_size = 256 * 1000 * 1000;
|
||||||
|
bdb_options.disable_background_tasks = true;
|
||||||
|
Options options;
|
||||||
|
options.env = mock_env_.get();
|
||||||
|
mock_env_->set_current_time(0);
|
||||||
|
Open(bdb_options, options);
|
||||||
|
std::map<std::string, std::string> data;
|
||||||
|
std::map<std::string, KeyVersion> versions;
|
||||||
|
SequenceNumber first_non_ttl_seq = kMaxSequenceNumber;
|
||||||
|
SequenceNumber first_ttl_seq = kMaxSequenceNumber;
|
||||||
|
SequenceNumber last_non_ttl_seq = 0;
|
||||||
|
SequenceNumber last_ttl_seq = 0;
|
||||||
|
for (size_t i = 0; i < 1000; i++) {
|
||||||
|
bool is_small_value = rnd.Next() % 2;
|
||||||
|
bool has_ttl = rnd.Next() % 2;
|
||||||
|
uint64_t expiration = rnd.Next() % kMaxExpiration;
|
||||||
|
int len = is_small_value ? 50 : 200;
|
||||||
|
std::string key = "key" + ToString(i);
|
||||||
|
std::string value = test::RandomHumanReadableString(&rnd, len);
|
||||||
|
std::string blob_index;
|
||||||
|
data[key] = value;
|
||||||
|
SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
|
||||||
|
if (!has_ttl) {
|
||||||
|
ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
|
||||||
|
} else {
|
||||||
|
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
|
||||||
|
}
|
||||||
|
ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
|
||||||
|
versions[key] =
|
||||||
|
KeyVersion(key, value, sequence,
|
||||||
|
(is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
|
||||||
|
if (!is_small_value) {
|
||||||
|
if (!has_ttl) {
|
||||||
|
first_non_ttl_seq = std::min(first_non_ttl_seq, sequence);
|
||||||
|
last_non_ttl_seq = std::max(last_non_ttl_seq, sequence);
|
||||||
|
} else {
|
||||||
|
first_ttl_seq = std::min(first_ttl_seq, sequence);
|
||||||
|
last_ttl_seq = std::max(last_ttl_seq, sequence);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
VerifyDB(data);
|
||||||
|
VerifyBaseDB(versions);
|
||||||
|
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
|
||||||
|
auto blob_files = bdb_impl->TEST_GetBlobFiles();
|
||||||
|
ASSERT_EQ(2, blob_files.size());
|
||||||
|
std::shared_ptr<BlobFile> non_ttl_file;
|
||||||
|
std::shared_ptr<BlobFile> ttl_file;
|
||||||
|
if (blob_files[0]->HasTTL()) {
|
||||||
|
ttl_file = blob_files[0];
|
||||||
|
non_ttl_file = blob_files[1];
|
||||||
|
} else {
|
||||||
|
non_ttl_file = blob_files[0];
|
||||||
|
ttl_file = blob_files[1];
|
||||||
|
}
|
||||||
|
ASSERT_FALSE(non_ttl_file->HasTTL());
|
||||||
|
ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSNRange().first);
|
||||||
|
ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSNRange().second);
|
||||||
|
ASSERT_TRUE(ttl_file->HasTTL());
|
||||||
|
ASSERT_EQ(first_ttl_seq, ttl_file->GetSNRange().first);
|
||||||
|
ASSERT_EQ(last_ttl_seq, ttl_file->GetSNRange().second);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace blob_db
|
} // namespace blob_db
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
#include "db/dbformat.h"
|
||||||
#include "util/filename.h"
|
#include "util/filename.h"
|
||||||
#include "util/logging.h"
|
#include "util/logging.h"
|
||||||
#include "utilities/blob_db/blob_db_impl.h"
|
#include "utilities/blob_db/blob_db_impl.h"
|
||||||
@ -36,7 +37,7 @@ BlobFile::BlobFile()
|
|||||||
gc_once_after_open_(false),
|
gc_once_after_open_(false),
|
||||||
ttl_range_(std::make_pair(0, 0)),
|
ttl_range_(std::make_pair(0, 0)),
|
||||||
time_range_(std::make_pair(0, 0)),
|
time_range_(std::make_pair(0, 0)),
|
||||||
sn_range_(std::make_pair(0, 0)),
|
sn_range_(std::make_pair(kMaxSequenceNumber, 0)),
|
||||||
last_access_(-1),
|
last_access_(-1),
|
||||||
last_fsync_(0),
|
last_fsync_(0),
|
||||||
header_valid_(false) {}
|
header_valid_(false) {}
|
||||||
@ -55,7 +56,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn)
|
|||||||
gc_once_after_open_(false),
|
gc_once_after_open_(false),
|
||||||
ttl_range_(std::make_pair(0, 0)),
|
ttl_range_(std::make_pair(0, 0)),
|
||||||
time_range_(std::make_pair(0, 0)),
|
time_range_(std::make_pair(0, 0)),
|
||||||
sn_range_(std::make_pair(0, 0)),
|
sn_range_(std::make_pair(kMaxSequenceNumber, 0)),
|
||||||
last_access_(-1),
|
last_access_(-1),
|
||||||
last_fsync_(0),
|
last_fsync_(0),
|
||||||
header_valid_(false) {}
|
header_valid_(false) {}
|
||||||
|
161
utilities/blob_db/blob_index.h
Normal file
161
utilities/blob_db/blob_index.h
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
#pragma once
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
|
||||||
|
#include "rocksdb/options.h"
|
||||||
|
#include "util/coding.h"
|
||||||
|
#include "util/string_util.h"
|
||||||
|
|
||||||
|
namespace rocksdb {
|
||||||
|
namespace blob_db {
|
||||||
|
|
||||||
|
// BlobIndex is a pointer to the blob and metadata of the blob. The index is
|
||||||
|
// stored in base DB as ValueType::kTypeBlobIndex.
|
||||||
|
// There are three types of blob index:
|
||||||
|
//
|
||||||
|
// kInlinedTTL:
|
||||||
|
// +------+------------+---------------+
|
||||||
|
// | type | expiration | value |
|
||||||
|
// +------+------------+---------------+
|
||||||
|
// | char | varint64 | variable size |
|
||||||
|
// +------+------------+---------------+
|
||||||
|
//
|
||||||
|
// kBlob:
|
||||||
|
// +------+-------------+----------+----------+-------------+
|
||||||
|
// | type | file number | offset | size | compression |
|
||||||
|
// +------+-------------+----------+----------+-------------+
|
||||||
|
// | char | varint64 | varint64 | varint64 | char |
|
||||||
|
// +------+-------------+----------+----------+-------------+
|
||||||
|
//
|
||||||
|
// kBlobTTL:
|
||||||
|
// +------+------------+-------------+----------+----------+-------------+
|
||||||
|
// | type | expiration | file number | offset | size | compression |
|
||||||
|
// +------+------------+-------------+----------+----------+-------------+
|
||||||
|
// | char | varint64 | varint64 | varint64 | varint64 | char |
|
||||||
|
// +------+------------+-------------+----------+----------+-------------+
|
||||||
|
//
|
||||||
|
// There isn't a kInlined (without TTL) type since we can store it as a plain
|
||||||
|
// value (i.e. ValueType::kTypeValue).
|
||||||
|
class BlobIndex {
|
||||||
|
public:
|
||||||
|
enum class Type : unsigned char {
|
||||||
|
kInlinedTTL = 0,
|
||||||
|
kBlob = 1,
|
||||||
|
kBlobTTL = 2,
|
||||||
|
kUnknown = 3,
|
||||||
|
};
|
||||||
|
|
||||||
|
BlobIndex() : type_(Type::kUnknown) {}
|
||||||
|
|
||||||
|
bool IsInlined() const { return type_ == Type::kInlinedTTL; }
|
||||||
|
|
||||||
|
bool HasTTL() const {
|
||||||
|
return type_ == Type::kInlinedTTL || type_ == Type::kBlobTTL;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t expiration() const {
|
||||||
|
assert(HasTTL());
|
||||||
|
return expiration_;
|
||||||
|
}
|
||||||
|
|
||||||
|
const Slice& value() const {
|
||||||
|
assert(IsInlined());
|
||||||
|
return value_;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t file_number() const {
|
||||||
|
assert(!IsInlined());
|
||||||
|
return file_number_;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t offset() const {
|
||||||
|
assert(!IsInlined());
|
||||||
|
return offset_;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t size() const {
|
||||||
|
assert(!IsInlined());
|
||||||
|
return size_;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status DecodeFrom(Slice slice) {
|
||||||
|
static const std::string kErrorMessage = "Error while decoding blob index";
|
||||||
|
assert(slice.size() > 0);
|
||||||
|
type_ = static_cast<Type>(*slice.data());
|
||||||
|
if (type_ >= Type::kUnknown) {
|
||||||
|
return Status::Corruption(
|
||||||
|
kErrorMessage,
|
||||||
|
"Unknown blob index type: " + ToString(static_cast<char>(type_)));
|
||||||
|
}
|
||||||
|
slice = Slice(slice.data() + 1, slice.size() - 1);
|
||||||
|
if (HasTTL()) {
|
||||||
|
if (!GetVarint64(&slice, &expiration_)) {
|
||||||
|
return Status::Corruption(kErrorMessage, "Corrupted expiration");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (IsInlined()) {
|
||||||
|
value_ = slice;
|
||||||
|
} else {
|
||||||
|
if (GetVarint64(&slice, &file_number_) && GetVarint64(&slice, &offset_) &&
|
||||||
|
GetVarint64(&slice, &size_) && slice.size() == 1) {
|
||||||
|
compression_ = static_cast<CompressionType>(*slice.data());
|
||||||
|
} else {
|
||||||
|
return Status::Corruption(kErrorMessage, "Corrupted blob offset");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void EncodeInlinedTTL(std::string* dst, uint64_t expiration,
|
||||||
|
const Slice& value) {
|
||||||
|
assert(dst != nullptr);
|
||||||
|
dst->clear();
|
||||||
|
dst->reserve(1 + kMaxVarint64Length + value.size());
|
||||||
|
dst->push_back(static_cast<char>(Type::kInlinedTTL));
|
||||||
|
PutVarint64(dst, expiration);
|
||||||
|
dst->append(value.data(), value.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
static void EncodeBlob(std::string* dst, uint64_t file_number,
|
||||||
|
uint64_t offset, uint64_t size,
|
||||||
|
CompressionType compression) {
|
||||||
|
assert(dst != nullptr);
|
||||||
|
dst->clear();
|
||||||
|
dst->reserve(kMaxVarint64Length * 3 + 2);
|
||||||
|
dst->push_back(static_cast<char>(Type::kBlob));
|
||||||
|
PutVarint64(dst, file_number);
|
||||||
|
PutVarint64(dst, offset);
|
||||||
|
PutVarint64(dst, size);
|
||||||
|
dst->push_back(static_cast<char>(compression));
|
||||||
|
}
|
||||||
|
|
||||||
|
static void EncodeBlobTTL(std::string* dst, uint64_t expiration,
|
||||||
|
uint64_t file_number, uint64_t offset,
|
||||||
|
uint64_t size, CompressionType compression) {
|
||||||
|
assert(dst != nullptr);
|
||||||
|
dst->clear();
|
||||||
|
dst->reserve(kMaxVarint64Length * 4 + 2);
|
||||||
|
dst->push_back(static_cast<char>(Type::kBlobTTL));
|
||||||
|
PutVarint64(dst, expiration);
|
||||||
|
PutVarint64(dst, file_number);
|
||||||
|
PutVarint64(dst, offset);
|
||||||
|
PutVarint64(dst, size);
|
||||||
|
dst->push_back(static_cast<char>(compression));
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Type type_ = Type::kUnknown;
|
||||||
|
uint64_t expiration_ = 0;
|
||||||
|
Slice value_;
|
||||||
|
uint64_t file_number_ = 0;
|
||||||
|
uint64_t offset_ = 0;
|
||||||
|
uint64_t size_ = 0;
|
||||||
|
CompressionType compression_ = kNoCompression;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace blob_db
|
||||||
|
} // namespace rocksdb
|
||||||
|
#endif // ROCKSDB_LITE
|
Loading…
Reference in New Issue
Block a user