diff --git a/db/db_impl.cc b/db/db_impl.cc index 8e9754320..0bf425afb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1588,7 +1588,7 @@ bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) { if (snapshots_.empty()) { return false; } - return (snapshots_.newest()->GetSequenceNumber() > sn); + return (snapshots_.newest()->GetSequenceNumber() >= sn); } #ifndef ROCKSDB_LITE diff --git a/include/rocksdb/utilities/debug.h b/include/rocksdb/utilities/debug.h index 3e325f69a..bc5b9bf03 100644 --- a/include/rocksdb/utilities/debug.h +++ b/include/rocksdb/utilities/debug.h @@ -16,6 +16,8 @@ namespace rocksdb { // store multiple versions of a same user key due to snapshots, compaction not // happening yet, etc. struct KeyVersion { + KeyVersion() : user_key(""), value(""), sequence(0), type(0) {} + KeyVersion(const std::string& _user_key, const std::string& _value, SequenceNumber _sequence, int _type) : user_key(_user_key), value(_value), sequence(_sequence), type(_type) {} diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 67463d07b..76ab95555 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -52,6 +52,10 @@ struct BlobDBOptions { // and so on uint64_t ttl_range_secs = 3600; + // The smallest value to store in blob log. Value larger than this threshold + // will be inlined in base DB together with the key. + uint64_t min_blob_size = 0; + // at what bytes will the blob files be synced to blob log. uint64_t bytes_per_sync = 0; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index d04b87469..1b915420f 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -33,6 +33,7 @@ #include "util/sync_point.h" #include "util/timer_queue.h" #include "utilities/blob_db/blob_db_iterator.h" +#include "utilities/blob_db/blob_index.h" namespace { int kBlockBasedTableVersionFormat = 2; @@ -49,78 +50,8 @@ void extendTimestamps(rocksdb::blob_db::tsrange_t* ts_range, uint64_t ts) { } // end namespace namespace rocksdb { - namespace blob_db { -// BlobHandle is a pointer to the blob that is stored in the LSM -class BlobHandle { - public: - BlobHandle() - : file_number_(std::numeric_limits::max()), - offset_(std::numeric_limits::max()), - size_(std::numeric_limits::max()), - compression_(kNoCompression) {} - - uint64_t filenumber() const { return file_number_; } - void set_filenumber(uint64_t fn) { file_number_ = fn; } - - // The offset of the block in the file. - uint64_t offset() const { return offset_; } - void set_offset(uint64_t _offset) { offset_ = _offset; } - - // The size of the stored block - uint64_t size() const { return size_; } - void set_size(uint64_t _size) { size_ = _size; } - - CompressionType compression() const { return compression_; } - void set_compression(CompressionType t) { compression_ = t; } - - void EncodeTo(std::string* dst) const; - - Status DecodeFrom(const Slice& input); - - void clear(); - - private: - uint64_t file_number_; - uint64_t offset_; - uint64_t size_; - CompressionType compression_; -}; - -void BlobHandle::EncodeTo(std::string* dst) const { - // Sanity check that all fields have been set - assert(offset_ != std::numeric_limits::max()); - assert(size_ != std::numeric_limits::max()); - assert(file_number_ != std::numeric_limits::max()); - - dst->reserve(30); - PutVarint64(dst, file_number_); - PutVarint64(dst, offset_); - PutVarint64(dst, size_); - dst->push_back(static_cast(compression_)); -} - -void BlobHandle::clear() { - file_number_ = std::numeric_limits::max(); - offset_ = std::numeric_limits::max(); - size_ = std::numeric_limits::max(); - compression_ = kNoCompression; -} - -Status BlobHandle::DecodeFrom(const Slice& input) { - Slice s(input); - Slice* p = &s; - if (GetVarint64(p, &file_number_) && GetVarint64(p, &offset_) && - GetVarint64(p, &size_)) { - compression_ = static_cast(p->data()[0]); - return Status::OK(); - } else { - clear(); - return Status::Corruption("bad blob handle"); - } -} - Random blob_rgen(static_cast(time(nullptr))); void BlobDBFlushBeginListener::OnFlushBegin(DB* db, const FlushJobInfo& info) { @@ -149,19 +80,20 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction( if (!is_new && value_type == CompactionEventListener::CompactionListenerValueType::kValue) { - BlobHandle handle; - Status s = handle.DecodeFrom(existing_value); + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(existing_value); if (s.ok()) { if (impl_->debug_level_ >= 3) - ROCKS_LOG_INFO(impl_->db_options_.info_log, - "CALLBACK COMPACTED OUT KEY: %s SN: %d " - "NEW: %d FN: %" PRIu64 " OFFSET: %" PRIu64 - " SIZE: %" PRIu64, - key.ToString().c_str(), sn, is_new, handle.filenumber(), - handle.offset(), handle.size()); + ROCKS_LOG_INFO( + impl_->db_options_.info_log, + "CALLBACK COMPACTED OUT KEY: %s SN: %d " + "NEW: %d FN: %" PRIu64 " OFFSET: %" PRIu64 " SIZE: %" PRIu64, + key.ToString().c_str(), sn, is_new, blob_index.file_number(), + blob_index.offset(), blob_index.size()); - impl_->override_vals_q_.enqueue({handle.filenumber(), key.size(), - handle.offset(), handle.size(), sn}); + impl_->override_vals_q_.enqueue({blob_index.file_number(), key.size(), + blob_index.offset(), blob_index.size(), + sn}); } } else { if (impl_->debug_level_ >= 3) @@ -178,7 +110,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, db_impl_(nullptr), env_(db_options.env), ttl_extractor_(blob_db_options.ttl_extractor.get()), - wo_set_(false), bdb_options_(blob_db_options), db_options_(db_options), env_options_(db_options), @@ -235,7 +166,6 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options) : BlobDB(db), db_impl_(static_cast_with_check(db)), - wo_set_(false), bdb_options_(blob_db_options), db_options_(db->GetOptions()), env_options_(db_->GetOptions()), @@ -610,17 +540,6 @@ std::shared_ptr BlobDBImpl::CheckOrCreateWriterLocked( return writer; } -void BlobDBImpl::UpdateWriteOptions(const WriteOptions& options) { - if (!wo_set_.load(std::memory_order_relaxed)) { - // DCLP - WriteLock wl(&mutex_); - if (!wo_set_.load(std::memory_order_acquire)) { - wo_set_.store(true, std::memory_order_release); - write_options_ = options; - } - } -} - std::shared_ptr BlobDBImpl::SelectBlobFile() { uint32_t val = blob_rgen.Next(); { @@ -736,14 +655,6 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { return bfile; } -Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, - const Slice& value) { - std::string new_value; - Slice value_slice; - uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value); - return PutUntil(options, key, value_slice, expiration); -} - Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) { SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); Status s = db_->Delete(options, key); @@ -753,141 +664,94 @@ Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) { return s; } -Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { - class BlobInserter : public WriteBatch::Handler { - private: - BlobDBImpl* impl_; - SequenceNumber sequence_; - WriteBatch updates_blob_; - std::shared_ptr last_file_; - bool has_put_; - std::string new_value_; - uint32_t default_cf_id_; +class BlobDBImpl::BlobInserter : public WriteBatch::Handler { + private: + const WriteOptions& options_; + BlobDBImpl* blob_db_impl_; + uint32_t default_cf_id_; + SequenceNumber sequence_; + WriteBatch batch_; - public: - BlobInserter(BlobDBImpl* impl, SequenceNumber seq) - : impl_(impl), - sequence_(seq), - has_put_(false), - default_cf_id_(reinterpret_cast( - impl_->DefaultColumnFamily()) - ->cfd() - ->GetID()) {} + public: + BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl, + uint32_t default_cf_id, SequenceNumber seq) + : options_(options), + blob_db_impl_(blob_db_impl), + default_cf_id_(default_cf_id), + sequence_(seq) {} - SequenceNumber sequence() { return sequence_; } + SequenceNumber sequence() { return sequence_; } - WriteBatch* updates_blob() { return &updates_blob_; } + WriteBatch* batch() { return &batch_; } - std::shared_ptr& last_file() { return last_file_; } - - bool has_put() { return has_put_; } - - virtual Status PutCF(uint32_t column_family_id, const Slice& key, - const Slice& value_slice) override { - if (column_family_id != default_cf_id_) { - return Status::NotSupported( - "Blob DB doesn't support non-default column family."); - } - Slice value_unc; - uint64_t expiration = - impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_); - - std::shared_ptr bfile = - (expiration != kNoExpiration) - ? impl_->SelectBlobFileTTL(expiration) - : ((last_file_) ? last_file_ : impl_->SelectBlobFile()); - if (last_file_ && last_file_ != bfile) { - return Status::NotFound("too many blob files"); - } - - if (!bfile) { - return Status::NotFound("blob file not found"); - } - - last_file_ = bfile; - has_put_ = true; - - std::string compression_output; - Slice value = impl_->GetCompressedSlice(value_unc, &compression_output); - - std::string headerbuf; - Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); - std::string index_entry; - Status s = impl_->AppendBlob(bfile, headerbuf, key, value, &index_entry); - if (!s.ok()) { - return s; - } - bfile->ExtendSequenceRange(sequence_); - sequence_++; - - if (expiration != kNoExpiration) { - extendTTL(&(bfile->ttl_range_), expiration); - } - - return WriteBatchInternal::PutBlobIndex(&updates_blob_, column_family_id, - key, index_entry); + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); } + std::string new_value; + Slice value_slice; + uint64_t expiration = + blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value); + Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice, + expiration, sequence_, &batch_); + sequence_++; + return s; + } - virtual Status DeleteCF(uint32_t column_family_id, - const Slice& key) override { - if (column_family_id != default_cf_id_) { - return Status::NotSupported( - "Blob DB doesn't support non-default column family."); - } - WriteBatchInternal::Delete(&updates_blob_, column_family_id, key); - sequence_++; - return Status::OK(); + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); } + Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key); + sequence_++; + return s; + } - virtual Status DeleteRange(uint32_t column_family_id, - const Slice& begin_key, const Slice& end_key) { - if (column_family_id != default_cf_id_) { - return Status::NotSupported( - "Blob DB doesn't support non-default column family."); - } - WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id, - begin_key, end_key); - sequence_++; - return Status::OK(); + virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key, + const Slice& end_key) { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); } + Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id, + begin_key, end_key); + sequence_++; + return s; + } - virtual Status SingleDeleteCF(uint32_t /*column_family_id*/, - const Slice& /*key*/) override { - return Status::NotSupported("Not supported operation in blob db."); - } + virtual Status SingleDeleteCF(uint32_t /*column_family_id*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } - virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/, - const Slice& /*value*/) override { - return Status::NotSupported("Not supported operation in blob db."); - } + virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } - virtual void LogData(const Slice& blob) override { - updates_blob_.PutLogData(blob); - } - }; + virtual void LogData(const Slice& blob) override { batch_.PutLogData(blob); } +}; +Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { MutexLock l(&write_mutex_); - SequenceNumber current_seq = db_impl_->GetLatestSequenceNumber() + 1; - BlobInserter blob_inserter(this, current_seq); + uint32_t default_cf_id = + reinterpret_cast(DefaultColumnFamily())->GetID(); + SequenceNumber current_seq = GetLatestSequenceNumber() + 1; + BlobInserter blob_inserter(options, this, default_cf_id, current_seq); Status s = updates->Iterate(&blob_inserter); if (!s.ok()) { return s; } - s = db_->Write(opts, blob_inserter.updates_blob()); + s = db_->Write(options, blob_inserter.batch()); if (!s.ok()) { return s; } - assert(current_seq == - WriteBatchInternal::Sequence(blob_inserter.updates_blob())); - assert(blob_inserter.sequence() == - current_seq + WriteBatchInternal::Count(blob_inserter.updates_blob())); - if (blob_inserter.has_put()) { - s = CloseBlobFileIfNeeded(blob_inserter.last_file()); - if (!s.ok()) { - return s; - } - } + assert(blob_inserter.sequence() == GetLatestSequenceNumber() + 1); // add deleted key to list of keys that have been deleted for book-keeping class DeleteBookkeeper : public WriteBatch::Handler { @@ -956,12 +820,92 @@ void BlobDBImpl::GetLiveFilesMetaData(std::vector* metadata) { } } +Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, + const Slice& value) { + std::string new_value; + Slice value_slice; + uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value); + return PutUntil(options, key, value_slice, expiration); +} + Status BlobDBImpl::PutWithTTL(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) { uint64_t now = EpochNow(); - assert(std::numeric_limits::max() - now > ttl); - return PutUntil(options, key, value, now + ttl); + uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration; + return PutUntil(options, key, value, expiration); +} + +Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) { + MutexLock l(&write_mutex_); + SequenceNumber sequence = GetLatestSequenceNumber() + 1; + WriteBatch batch; + Status s = PutBlobValue(options, key, value, expiration, sequence, &batch); + if (s.ok()) { + s = db_->Write(options, &batch); + } + return s; +} + +Status BlobDBImpl::PutBlobValue(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration, + SequenceNumber sequence, WriteBatch* batch) { + TEST_SYNC_POINT("BlobDBImpl::PutBlobValue:Start"); + Status s; + std::string index_entry; + uint32_t column_family_id = + reinterpret_cast(DefaultColumnFamily())->GetID(); + if (value.size() < bdb_options_.min_blob_size) { + if (expiration == kNoExpiration) { + // Put as normal value + s = batch->Put(key, value); + } else { + // Inlined with TTL + BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value); + s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, + index_entry); + } + } else { + std::shared_ptr bfile = (expiration != kNoExpiration) + ? SelectBlobFileTTL(expiration) + : SelectBlobFile(); + if (!bfile) { + return Status::NotFound("Blob file not found"); + } + + std::string compression_output; + Slice value_compressed = GetCompressedSlice(value, &compression_output); + + std::string headerbuf; + Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration, + -1); + + s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration, + &index_entry); + + if (s.ok()) { + bfile->ExtendSequenceRange(sequence); + if (expiration != kNoExpiration) { + extendTTL(&(bfile->ttl_range_), expiration); + } + s = CloseBlobFileIfNeeded(bfile); + if (s.ok()) { + s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, + index_entry); + } + } else { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to append blob to FILE: %s: KEY: %s VALSZ: %d" + " status: '%s' blob_file: '%s'", + bfile->PathName().c_str(), key.ToString().c_str(), + value.size(), s.ToString().c_str(), + bfile->DumpState().c_str()); + } + } + + TEST_SYNC_POINT("BlobDBImpl::PutBlobValue:Finish"); + return s; } Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, @@ -976,63 +920,6 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, return *compression_output; } -Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, - const Slice& value_unc, uint64_t expiration) { - TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); - MutexLock l(&write_mutex_); - UpdateWriteOptions(options); - - std::shared_ptr bfile = (expiration != kNoExpiration) - ? SelectBlobFileTTL(expiration) - : SelectBlobFile(); - - if (!bfile) return Status::NotFound("Blob file not found"); - - std::string compression_output; - Slice value = GetCompressedSlice(value_unc, &compression_output); - - std::string headerbuf; - Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1); - - std::string index_entry; - Status s = AppendBlob(bfile, headerbuf, key, value, &index_entry); - if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_.info_log, - "Failed to append blob to FILE: %s: KEY: %s VALSZ: %d" - " status: '%s' blob_file: '%s'", - bfile->PathName().c_str(), key.ToString().c_str(), - value.size(), s.ToString().c_str(), - bfile->DumpState().c_str()); - return s; - } - - WriteBatch batch; - uint32_t column_family_id = - reinterpret_cast(DefaultColumnFamily())->GetID(); - s = WriteBatchInternal::PutBlobIndex(&batch, column_family_id, key, - index_entry); - - // this goes to the base db and can be expensive - if (s.ok()) { - s = db_->Write(options, &batch); - } - - if (s.ok()) { - // this is the sequence number of the write. - SequenceNumber sn = WriteBatchInternal::Sequence(&batch); - bfile->ExtendSequenceRange(sn); - - if (expiration != kNoExpiration) { - extendTTL(&(bfile->ttl_range_), expiration); - } - - s = CloseBlobFileIfNeeded(bfile); - } - - TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish"); - return s; -} - uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value, Slice* value_slice, std::string* new_value) { @@ -1049,7 +936,8 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value, Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, - const Slice& value, std::string* index_entry) { + const Slice& value, uint64_t expiration, + std::string* index_entry) { auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size(); if (bdb_options_.blob_dir_size > 0 && (total_blob_space_.load() + size_put) > bdb_options_.blob_dir_size) { @@ -1086,18 +974,14 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, last_period_write_ += size_put; total_blob_space_ += size_put; - BlobHandle handle; - handle.set_filenumber(bfile->BlobFileNumber()); - handle.set_size(value.size()); - handle.set_offset(blob_offset); - handle.set_compression(bdb_options_.compression); - handle.EncodeTo(index_entry); - - if (debug_level_ >= 3) - ROCKS_LOG_INFO(db_options_.info_log, - ">Adding KEY FILE: %s: BC: %d OFFSET: %d SZ: %d", - bfile->PathName().c_str(), bfile->blob_count_.load(), - blob_offset, value.size()); + if (expiration == kNoExpiration) { + BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset, + value.size(), bdb_options_.compression); + } else { + BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(), + blob_offset, value.size(), + bdb_options_.compression); + } return s; } @@ -1138,29 +1022,45 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value) { assert(value != nullptr); - BlobHandle handle; - Status s = handle.DecodeFrom(index_entry); - if (!s.ok()) return s; + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(index_entry); + if (!s.ok()) { + return s; + } + if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) { + return Status::NotFound("Key expired"); + } + if (blob_index.IsInlined()) { + // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same + // memory buffer to avoid extra copy. + value->PinSelf(blob_index.value()); + return Status::OK(); + } + if (blob_index.size() == 0) { + value->PinSelf(""); + return Status::OK(); + } // offset has to have certain min, as we will read CRC // later from the Blob Header, which needs to be also a // valid offset. - if (handle.offset() < + if (blob_index.offset() < (BlobLogHeader::kHeaderSize + BlobLogRecord::kHeaderSize + key.size())) { if (debug_level_ >= 2) { - ROCKS_LOG_ERROR( - db_options_.info_log, - "Invalid blob handle file_number: %" PRIu64 " blob_offset: %" PRIu64 - " blob_size: %" PRIu64 " key: %s", - handle.filenumber(), handle.offset(), handle.size(), key.data()); + ROCKS_LOG_ERROR(db_options_.info_log, + "Invalid blob index file_number: %" PRIu64 + " blob_offset: %" PRIu64 " blob_size: %" PRIu64 + " key: %s", + blob_index.file_number(), blob_index.offset(), + blob_index.size(), key.data()); } - return Status::NotFound("Blob Not Found, although found in LSM"); + return Status::NotFound("Invalid blob offset"); } std::shared_ptr bfile; { ReadLock rl(&mutex_); - auto hitr = blob_files_.find(handle.filenumber()); + auto hitr = blob_files_.find(blob_index.file_number()); // file was deleted if (hitr == blob_files_.end()) { @@ -1170,7 +1070,7 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, bfile = hitr->second; } - if (handle.size() == 0 && value != nullptr) { + if (blob_index.size() == 0 && value != nullptr) { value->PinSelf(""); return Status::OK(); } @@ -1186,19 +1086,19 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, } // allocate the buffer. This is safe in C++11 - valueptr->resize(handle.size()); + valueptr->resize(blob_index.size()); char* buffer = &(*valueptr)[0]; Slice blob_value; - s = reader->Read(handle.offset(), handle.size(), &blob_value, buffer); - if (!s.ok() || blob_value.size() != handle.size()) { + s = reader->Read(blob_index.offset(), blob_index.size(), &blob_value, buffer); + if (!s.ok() || blob_value.size() != blob_index.size()) { if (debug_level_ >= 2) { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to read blob from file: %s blob_offset: %" PRIu64 " blob_size: %" PRIu64 " read: %d key: %s status: '%s'", - bfile->PathName().c_str(), handle.offset(), handle.size(), - static_cast(blob_value.size()), key.data(), - s.ToString().c_str()); + bfile->PathName().c_str(), blob_index.offset(), + blob_index.size(), static_cast(blob_value.size()), + key.data(), s.ToString().c_str()); } return Status::NotFound("Blob Not Found as couldnt retrieve Blob"); } @@ -1208,15 +1108,15 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, std::string crc_str; crc_str.resize(sizeof(uint32_t)); char* crc_buffer = &(crc_str[0]); - s = reader->Read(handle.offset() - (key.size() + sizeof(uint32_t)), + s = reader->Read(blob_index.offset() - (key.size() + sizeof(uint32_t)), sizeof(uint32_t), &crc_slice, crc_buffer); if (!s.ok() || !GetFixed32(&crc_slice, &crc_exp)) { if (debug_level_ >= 2) { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to fetch blob crc file: %s blob_offset: %" PRIu64 " blob_size: %" PRIu64 " key: %s status: '%s'", - bfile->PathName().c_str(), handle.offset(), handle.size(), - key.data(), s.ToString().c_str()); + bfile->PathName().c_str(), blob_index.offset(), + blob_index.size(), key.data(), s.ToString().c_str()); } return Status::NotFound("Blob Not Found as couldnt retrieve CRC"); } @@ -1228,8 +1128,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, ROCKS_LOG_ERROR(db_options_.info_log, "Blob crc mismatch file: %s blob_offset: %" PRIu64 " blob_size: %" PRIu64 " key: %s status: '%s'", - bfile->PathName().c_str(), handle.offset(), handle.size(), - key.data(), s.ToString().c_str()); + bfile->PathName().c_str(), blob_index.offset(), + blob_index.size(), key.data(), s.ToString().c_str()); } return Status::Corruption("Corruption. Blob CRC mismatch"); } @@ -1357,8 +1257,9 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked( SequenceNumber esn = bfile->GetSNRange().first; - // this is not correct. - // you want to check that there are no snapshots in the + // TODO(yiwu): Here we should check instead if there is an active snapshot + // lies between the first sequence in the file, and the last sequence by + // the time the file finished being garbage collect. bool notok = db_impl_->HasActiveSnapshotLaterThanSN(esn); if (notok) { ROCKS_LOG_INFO(db_options_.info_log, @@ -1398,16 +1299,16 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, } bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) { - BlobHandle handle; - Status s = handle.DecodeFrom(index_entry); + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(index_entry); if (!s.ok()) { ROCKS_LOG_INFO(db_options_.info_log, "Could not parse lsm val in MarkBlobDeleted %s", index_entry.ToString().c_str()); return false; } - bool succ = FindFileAndEvictABlob(handle.filenumber(), key.size(), - handle.offset(), handle.size()); + bool succ = FindFileAndEvictABlob(blob_index.file_number(), key.size(), + blob_index.offset(), blob_index.size()); return succ; } @@ -1756,16 +1657,16 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, continue; } - BlobHandle handle; - s = handle.DecodeFrom(index_entry); + BlobIndex blob_index; + s = blob_index.DecodeFrom(index_entry); if (!s.ok()) { ROCKS_LOG_ERROR(db_options_.info_log, "Error while decoding index entry: %s", s.ToString().c_str()); break; } - if (handle.filenumber() != bfptr->BlobFileNumber() || - handle.offset() != blob_offset) { + if (blob_index.file_number() != bfptr->BlobFileNumber() || + blob_index.offset() != blob_offset) { // Key has been overwritten. Drop the blob record. continue; } @@ -1842,12 +1743,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, s = new_writer->AddRecord(record.Key(), record.Blob(), &new_key_offset, &new_blob_offset, record.GetTTL()); - BlobHandle new_handle; - new_handle.set_filenumber(newfile->BlobFileNumber()); - new_handle.set_size(record.Blob().size()); - new_handle.set_offset(new_blob_offset); - new_handle.set_compression(bdb_options_.compression); - new_handle.EncodeTo(&new_index_entry); + BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(), + new_blob_offset, record.Blob().size(), + bdb_options_.compression); newfile->blob_count_++; newfile->file_size_ += @@ -2268,6 +2166,11 @@ Status DestroyBlobDB(const std::string& dbname, const Options& options, } #ifndef NDEBUG +Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value) { + return GetBlobValue(key, index_entry, value); +} + std::vector> BlobDBImpl::TEST_GetBlobFiles() const { ReadLock l(&mutex_); std::vector> blob_files; diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 6496c585d..b18d26e1f 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -215,9 +216,6 @@ class BlobDBImpl : public BlobDB { Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; - Status GetBlobValue(const Slice& key, const Slice& index_entry, - PinnableSlice* value); - using BlobDB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& read_options) override; @@ -249,7 +247,7 @@ class BlobDBImpl : public BlobDB { using BlobDB::PutUntil; Status PutUntil(const WriteOptions& options, const Slice& key, - const Slice& value_unc, uint64_t expiration) override; + const Slice& value, uint64_t expiration) override; Status LinkToBaseDB(DB* db) override; @@ -263,6 +261,9 @@ class BlobDBImpl : public BlobDB { ~BlobDBImpl(); #ifndef NDEBUG + Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value); + std::vector> TEST_GetBlobFiles() const; std::vector> TEST_GetObsoleteFiles() const; @@ -281,6 +282,7 @@ class BlobDBImpl : public BlobDB { private: class GarbageCollectionWriteCallback; + class BlobInserter; Status OpenPhase1(); @@ -288,6 +290,9 @@ class BlobDBImpl : public BlobDB { // Return true if a snapshot is created. bool SetSnapshotIfNeeded(ReadOptions* read_options); + Status GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value); + Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const; @@ -314,9 +319,14 @@ class BlobDBImpl : public BlobDB { uint64_t ExtractExpiration(const Slice& key, const Slice& value, Slice* value_slice, std::string* new_value); + Status PutBlobValue(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration, + SequenceNumber sequence, WriteBatch* batch); + Status AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, - const Slice& value, std::string* index_entry); + const Slice& value, uint64_t expiration, + std::string* index_entry); // find an existing blob log file based on the expiration unix epoch // if such a file does not exist, return nullptr @@ -327,8 +337,6 @@ class BlobDBImpl : public BlobDB { std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; - void UpdateWriteOptions(const WriteOptions& options); - void Shutdown(); // periodic sanity check. Bunch of checks @@ -426,10 +434,6 @@ class BlobDBImpl : public BlobDB { Env* env_; TTLExtractor* ttl_extractor_; - // a boolean to capture whether write_options has been set - std::atomic wo_set_; - WriteOptions write_options_; - // the options that govern the behavior of Blob Storage BlobDBOptions bdb_options_; DBOptions db_options_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 592ee609c..6f16e5b3d 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -5,19 +5,24 @@ #ifndef ROCKSDB_LITE -#include "utilities/blob_db/blob_db.h" +#include #include #include #include #include +#include + #include "db/db_test_util.h" #include "port/port.h" +#include "rocksdb/utilities/debug.h" #include "util/cast_util.h" #include "util/random.h" #include "util/string_util.h" #include "util/sync_point.h" #include "util/testharness.h" +#include "utilities/blob_db/blob_db.h" #include "utilities/blob_db/blob_db_impl.h" +#include "utilities/blob_db/blob_index.h" namespace rocksdb { namespace blob_db { @@ -26,6 +31,12 @@ class BlobDBTest : public testing::Test { public: const int kMaxBlobSize = 1 << 14; + struct BlobRecord { + std::string key; + std::string value; + uint64_t expiration = 0; + }; + BlobDBTest() : dbname_(test::TmpDir() + "/blob_db_test"), mock_env_(new MockTimeEnv(Env::Default())), @@ -127,6 +138,32 @@ class BlobDBTest : public testing::Test { delete iter; } + void VerifyBaseDB( + const std::map &expected_versions) { + auto *bdb_impl = static_cast(blob_db_); + DB *db = blob_db_->GetRootDB(); + std::vector versions; + GetAllKeyVersions(db, "", "", &versions); + ASSERT_EQ(expected_versions.size(), versions.size()); + size_t i = 0; + for (auto &key_version : expected_versions) { + const KeyVersion &expected_version = key_version.second; + ASSERT_EQ(expected_version.user_key, versions[i].user_key); + ASSERT_EQ(expected_version.sequence, versions[i].sequence); + ASSERT_EQ(expected_version.type, versions[i].type); + if (versions[i].type == kTypeValue) { + ASSERT_EQ(expected_version.value, versions[i].value); + } else { + ASSERT_EQ(kTypeBlobIndex, versions[i].type); + PinnableSlice value; + ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key, + versions[i].value, &value)); + ASSERT_EQ(expected_version.value, value.ToString()); + } + i++; + } + } + void InsertBlobs() { WriteOptions wo; std::string value; @@ -151,6 +188,7 @@ class BlobDBTest : public testing::Test { TEST_F(BlobDBTest, Put) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); std::map data; @@ -166,6 +204,7 @@ TEST_F(BlobDBTest, PutWithTTL) { options.env = mock_env_.get(); BlobDBOptions bdb_options; bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; bdb_options.blob_file_size = 256 * 1000 * 1000; bdb_options.disable_background_tasks = true; Open(bdb_options, options); @@ -195,6 +234,7 @@ TEST_F(BlobDBTest, PutUntil) { options.env = mock_env_.get(); BlobDBOptions bdb_options; bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; bdb_options.blob_file_size = 256 * 1000 * 1000; bdb_options.disable_background_tasks = true; Open(bdb_options, options); @@ -226,6 +266,7 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { options.env = mock_env_.get(); BlobDBOptions bdb_options; bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; bdb_options.blob_file_size = 256 * 1000 * 1000; bdb_options.num_concurrent_simple_blobs = 1; bdb_options.ttl_extractor = ttl_extractor_; @@ -275,6 +316,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { options.env = mock_env_.get(); BlobDBOptions bdb_options; bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; bdb_options.blob_file_size = 256 * 1000 * 1000; bdb_options.ttl_extractor = ttl_extractor_; bdb_options.disable_background_tasks = true; @@ -322,6 +364,7 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { options.env = mock_env_.get(); BlobDBOptions bdb_options; bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; bdb_options.blob_file_size = 256 * 1000 * 1000; bdb_options.ttl_extractor = ttl_extractor_; bdb_options.disable_background_tasks = true; @@ -369,6 +412,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { options.env = mock_env_.get(); BlobDBOptions bdb_options; bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; bdb_options.blob_file_size = 256 * 1000 * 1000; bdb_options.ttl_extractor = std::make_shared(); bdb_options.disable_background_tasks = true; @@ -403,6 +447,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { TEST_F(BlobDBTest, StackableDBGet) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); std::map data; @@ -425,6 +470,7 @@ TEST_F(BlobDBTest, StackableDBGet) { TEST_F(BlobDBTest, WriteBatch) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); std::map data; @@ -441,6 +487,7 @@ TEST_F(BlobDBTest, WriteBatch) { TEST_F(BlobDBTest, Delete) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); std::map data; @@ -456,6 +503,7 @@ TEST_F(BlobDBTest, Delete) { TEST_F(BlobDBTest, DeleteBatch) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); for (size_t i = 0; i < 100; i++) { @@ -473,6 +521,7 @@ TEST_F(BlobDBTest, DeleteBatch) { TEST_F(BlobDBTest, Override) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); std::map data; @@ -490,6 +539,7 @@ TEST_F(BlobDBTest, Override) { TEST_F(BlobDBTest, Compression) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; bdb_options.compression = CompressionType::kSnappyCompression; Open(bdb_options); @@ -541,6 +591,7 @@ TEST_F(BlobDBTest, MultipleWriters) { TEST_F(BlobDBTest, GCAfterOverwriteKeys) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); BlobDBImpl *blob_db_impl = @@ -580,6 +631,7 @@ TEST_F(BlobDBTest, GCAfterOverwriteKeys) { TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1")); @@ -591,8 +643,8 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { SyncPoint::GetInstance()->LoadDependency( {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", - "BlobDBImpl::PutUntil:Start"}, - {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::PutBlobValue:Start"}, + {"BlobDBImpl::PutBlobValue:Finish", "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}}); SyncPoint::GetInstance()->EnableProcessing(); @@ -615,6 +667,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { Options options; options.env = mock_env_.get(); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options, options); mock_env_->set_current_time(100); @@ -628,8 +681,8 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { SyncPoint::GetInstance()->LoadDependency( {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", - "BlobDBImpl::PutUntil:Start"}, - {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::PutBlobValue:Start"}, + {"BlobDBImpl::PutBlobValue:Finish", "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}}); SyncPoint::GetInstance()->EnableProcessing(); @@ -656,6 +709,7 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) { bdb_options.is_fifo = true; bdb_options.blob_dir_size = 100; bdb_options.blob_file_size = 100; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); std::string value(100, 'v'); @@ -687,6 +741,7 @@ TEST_F(BlobDBTest, ReadWhileGC) { // run the same test for Get(), MultiGet() and Iterator each. for (int i = 0; i < 2; i++) { BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); blob_db_->Put(WriteOptions(), "foo", "bar"); @@ -798,6 +853,7 @@ TEST_F(BlobDBTest, ColumnFamilyNotSupported) { TEST_F(BlobDBTest, GetLiveFilesMetaData) { Random rnd(301); BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; Open(bdb_options); std::map data; @@ -894,6 +950,75 @@ TEST_F(BlobDBTest, OutOfSpace) { ASSERT_TRUE(s.IsNoSpace()); } +TEST_F(BlobDBTest, InlineSmallValues) { + constexpr uint64_t kMaxExpiration = 1000; + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = kMaxExpiration; + bdb_options.min_blob_size = 100; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + mock_env_->set_current_time(0); + Open(bdb_options, options); + std::map data; + std::map versions; + SequenceNumber first_non_ttl_seq = kMaxSequenceNumber; + SequenceNumber first_ttl_seq = kMaxSequenceNumber; + SequenceNumber last_non_ttl_seq = 0; + SequenceNumber last_ttl_seq = 0; + for (size_t i = 0; i < 1000; i++) { + bool is_small_value = rnd.Next() % 2; + bool has_ttl = rnd.Next() % 2; + uint64_t expiration = rnd.Next() % kMaxExpiration; + int len = is_small_value ? 50 : 200; + std::string key = "key" + ToString(i); + std::string value = test::RandomHumanReadableString(&rnd, len); + std::string blob_index; + data[key] = value; + SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + if (!has_ttl) { + ASSERT_OK(blob_db_->Put(WriteOptions(), key, value)); + } else { + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration)); + } + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + versions[key] = + KeyVersion(key, value, sequence, + (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex); + if (!is_small_value) { + if (!has_ttl) { + first_non_ttl_seq = std::min(first_non_ttl_seq, sequence); + last_non_ttl_seq = std::max(last_non_ttl_seq, sequence); + } else { + first_ttl_seq = std::min(first_ttl_seq, sequence); + last_ttl_seq = std::max(last_ttl_seq, sequence); + } + } + } + VerifyDB(data); + VerifyBaseDB(versions); + auto *bdb_impl = static_cast(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + std::shared_ptr non_ttl_file; + std::shared_ptr ttl_file; + if (blob_files[0]->HasTTL()) { + ttl_file = blob_files[0]; + non_ttl_file = blob_files[1]; + } else { + non_ttl_file = blob_files[0]; + ttl_file = blob_files[1]; + } + ASSERT_FALSE(non_ttl_file->HasTTL()); + ASSERT_EQ(first_non_ttl_seq, non_ttl_file->GetSNRange().first); + ASSERT_EQ(last_non_ttl_seq, non_ttl_file->GetSNRange().second); + ASSERT_TRUE(ttl_file->HasTTL()); + ASSERT_EQ(first_ttl_seq, ttl_file->GetSNRange().first); + ASSERT_EQ(last_ttl_seq, ttl_file->GetSNRange().second); +} + } // namespace blob_db } // namespace rocksdb diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index b247a69f3..9989bacf3 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -15,6 +15,7 @@ #include #include +#include "db/dbformat.h" #include "util/filename.h" #include "util/logging.h" #include "utilities/blob_db/blob_db_impl.h" @@ -36,7 +37,7 @@ BlobFile::BlobFile() gc_once_after_open_(false), ttl_range_(std::make_pair(0, 0)), time_range_(std::make_pair(0, 0)), - sn_range_(std::make_pair(0, 0)), + sn_range_(std::make_pair(kMaxSequenceNumber, 0)), last_access_(-1), last_fsync_(0), header_valid_(false) {} @@ -55,7 +56,7 @@ BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn) gc_once_after_open_(false), ttl_range_(std::make_pair(0, 0)), time_range_(std::make_pair(0, 0)), - sn_range_(std::make_pair(0, 0)), + sn_range_(std::make_pair(kMaxSequenceNumber, 0)), last_access_(-1), last_fsync_(0), header_valid_(false) {} diff --git a/utilities/blob_db/blob_index.h b/utilities/blob_db/blob_index.h new file mode 100644 index 000000000..fd91b547a --- /dev/null +++ b/utilities/blob_db/blob_index.h @@ -0,0 +1,161 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/options.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace rocksdb { +namespace blob_db { + +// BlobIndex is a pointer to the blob and metadata of the blob. The index is +// stored in base DB as ValueType::kTypeBlobIndex. +// There are three types of blob index: +// +// kInlinedTTL: +// +------+------------+---------------+ +// | type | expiration | value | +// +------+------------+---------------+ +// | char | varint64 | variable size | +// +------+------------+---------------+ +// +// kBlob: +// +------+-------------+----------+----------+-------------+ +// | type | file number | offset | size | compression | +// +------+-------------+----------+----------+-------------+ +// | char | varint64 | varint64 | varint64 | char | +// +------+-------------+----------+----------+-------------+ +// +// kBlobTTL: +// +------+------------+-------------+----------+----------+-------------+ +// | type | expiration | file number | offset | size | compression | +// +------+------------+-------------+----------+----------+-------------+ +// | char | varint64 | varint64 | varint64 | varint64 | char | +// +------+------------+-------------+----------+----------+-------------+ +// +// There isn't a kInlined (without TTL) type since we can store it as a plain +// value (i.e. ValueType::kTypeValue). +class BlobIndex { + public: + enum class Type : unsigned char { + kInlinedTTL = 0, + kBlob = 1, + kBlobTTL = 2, + kUnknown = 3, + }; + + BlobIndex() : type_(Type::kUnknown) {} + + bool IsInlined() const { return type_ == Type::kInlinedTTL; } + + bool HasTTL() const { + return type_ == Type::kInlinedTTL || type_ == Type::kBlobTTL; + } + + uint64_t expiration() const { + assert(HasTTL()); + return expiration_; + } + + const Slice& value() const { + assert(IsInlined()); + return value_; + } + + uint64_t file_number() const { + assert(!IsInlined()); + return file_number_; + } + + uint64_t offset() const { + assert(!IsInlined()); + return offset_; + } + + uint64_t size() const { + assert(!IsInlined()); + return size_; + } + + Status DecodeFrom(Slice slice) { + static const std::string kErrorMessage = "Error while decoding blob index"; + assert(slice.size() > 0); + type_ = static_cast(*slice.data()); + if (type_ >= Type::kUnknown) { + return Status::Corruption( + kErrorMessage, + "Unknown blob index type: " + ToString(static_cast(type_))); + } + slice = Slice(slice.data() + 1, slice.size() - 1); + if (HasTTL()) { + if (!GetVarint64(&slice, &expiration_)) { + return Status::Corruption(kErrorMessage, "Corrupted expiration"); + } + } + if (IsInlined()) { + value_ = slice; + } else { + if (GetVarint64(&slice, &file_number_) && GetVarint64(&slice, &offset_) && + GetVarint64(&slice, &size_) && slice.size() == 1) { + compression_ = static_cast(*slice.data()); + } else { + return Status::Corruption(kErrorMessage, "Corrupted blob offset"); + } + } + return Status::OK(); + } + + static void EncodeInlinedTTL(std::string* dst, uint64_t expiration, + const Slice& value) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(1 + kMaxVarint64Length + value.size()); + dst->push_back(static_cast(Type::kInlinedTTL)); + PutVarint64(dst, expiration); + dst->append(value.data(), value.size()); + } + + static void EncodeBlob(std::string* dst, uint64_t file_number, + uint64_t offset, uint64_t size, + CompressionType compression) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(kMaxVarint64Length * 3 + 2); + dst->push_back(static_cast(Type::kBlob)); + PutVarint64(dst, file_number); + PutVarint64(dst, offset); + PutVarint64(dst, size); + dst->push_back(static_cast(compression)); + } + + static void EncodeBlobTTL(std::string* dst, uint64_t expiration, + uint64_t file_number, uint64_t offset, + uint64_t size, CompressionType compression) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(kMaxVarint64Length * 4 + 2); + dst->push_back(static_cast(Type::kBlobTTL)); + PutVarint64(dst, expiration); + PutVarint64(dst, file_number); + PutVarint64(dst, offset); + PutVarint64(dst, size); + dst->push_back(static_cast(compression)); + } + + private: + Type type_ = Type::kUnknown; + uint64_t expiration_ = 0; + Slice value_; + uint64_t file_number_ = 0; + uint64_t offset_ = 0; + uint64_t size_ = 0; + CompressionType compression_ = kNoCompression; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE