Blob DB: Store blob index as kTypeBlobIndex in base db

Summary:
Blob db insert blob index to base db as kTypeBlobIndex type, to tell apart values written by plain rocksdb or blob db. This is to make it possible to migrate from existing rocksdb to blob db.

Also with the patch blob db garbage collection get away from OptimisticTransaction. Instead it use a custom write callback to achieve similar behavior as OptimisticTransaction. This is because we need to pass the is_blob_index flag to DBImpl::Get but OptimisticTransaction don't support it.
Closes https://github.com/facebook/rocksdb/pull/3000

Differential Revision: D6050044

Pulled By: yiwu-arbug

fbshipit-source-id: 61dc72ab9977625e75f78cd968e7d8a3976e3632
This commit is contained in:
Yi Wu 2017-10-17 17:24:25 -07:00 committed by Facebook Github Bot
parent 0552029b5c
commit eaaef91178
8 changed files with 378 additions and 241 deletions

View File

@ -2556,7 +2556,8 @@ SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
#ifndef ROCKSDB_LITE
Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only, SequenceNumber* seq,
bool* found_record_for_key) {
bool* found_record_for_key,
bool* is_blob_index) {
Status s;
MergeContext merge_context;
RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(),
@ -2571,7 +2572,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
// Check if there is a record for this key in the latest memtable
sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
read_options);
read_options, nullptr /*read_callback*/, is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
@ -2590,7 +2591,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
// Check if there is a record for this key in the immutable memtables
sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
read_options);
read_options, nullptr /*read_callback*/, is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
@ -2609,7 +2610,7 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
// Check if there is a record for this key in the immutable memtables
sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg,
seq, read_options);
seq, read_options, is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading memtable.
@ -2633,7 +2634,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
// Check tables
sv->current->Get(read_options, lkey, nullptr, &s, &merge_context,
&range_del_agg, nullptr /* value_found */,
found_record_for_key, seq);
found_record_for_key, seq, nullptr /*read_callback*/,
is_blob_index);
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
// unexpected error reading SST files

View File

@ -96,6 +96,14 @@ class DBImpl : public DB {
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
bool* value_found = nullptr, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
using DB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
@ -295,7 +303,8 @@ class DBImpl : public DB {
// TODO(andrewkr): this API need to be aware of range deletion operations
Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool cache_only, SequenceNumber* seq,
bool* found_record_for_key);
bool* found_record_for_key,
bool* is_blob_index = nullptr);
using DB::IngestExternalFile;
virtual Status IngestExternalFile(
@ -1272,13 +1281,6 @@ class DBImpl : public DB {
#endif // ROCKSDB_LITE
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value,
bool* value_found = nullptr, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr);
bool GetIntPropertyInternal(ColumnFamilyData* cfd,
const DBPropertyInfo& property_info,
bool is_locked, uint64_t* value);

View File

@ -109,14 +109,13 @@ bool MemTableListVersion::Get(const LookupKey& key, std::string* value,
seq, read_opts, callback, is_blob_index);
}
bool MemTableListVersion::GetFromHistory(const LookupKey& key,
std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
SequenceNumber* seq,
const ReadOptions& read_opts) {
bool MemTableListVersion::GetFromHistory(
const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context, RangeDelAggregator* range_del_agg,
SequenceNumber* seq, const ReadOptions& read_opts, bool* is_blob_index) {
return GetFromList(&memlist_history_, key, value, s, merge_context,
range_del_agg, seq, read_opts);
range_del_agg, seq, read_opts, nullptr /*read_callback*/,
is_blob_index);
}
bool MemTableListVersion::GetFromList(

View File

@ -73,14 +73,16 @@ class MemTableListVersion {
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg, SequenceNumber* seq,
const ReadOptions& read_opts);
const ReadOptions& read_opts,
bool* is_blob_index = nullptr);
bool GetFromHistory(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
RangeDelAggregator* range_del_agg,
const ReadOptions& read_opts) {
const ReadOptions& read_opts,
bool* is_blob_index = nullptr) {
SequenceNumber seq;
return GetFromHistory(key, value, s, merge_context, range_del_agg, &seq,
read_opts);
read_opts, is_blob_index);
}
Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,

View File

@ -32,8 +32,7 @@
#include "util/random.h"
#include "util/sync_point.h"
#include "util/timer_queue.h"
#include "utilities/transactions/optimistic_transaction.h"
#include "utilities/transactions/optimistic_transaction_db_impl.h"
#include "utilities/blob_db/blob_db_iterator.h"
namespace {
int kBlockBasedTableVersionFormat = 2;
@ -78,7 +77,7 @@ class BlobHandle {
void EncodeTo(std::string* dst) const;
Status DecodeFrom(Slice* input);
Status DecodeFrom(const Slice& input);
void clear();
@ -109,10 +108,12 @@ void BlobHandle::clear() {
compression_ = kNoCompression;
}
Status BlobHandle::DecodeFrom(Slice* input) {
if (GetVarint64(input, &file_number_) && GetVarint64(input, &offset_) &&
GetVarint64(input, &size_)) {
compression_ = static_cast<CompressionType>(input->data()[0]);
Status BlobHandle::DecodeFrom(const Slice& input) {
Slice s(input);
Slice* p = &s;
if (GetVarint64(p, &file_number_) && GetVarint64(p, &offset_) &&
GetVarint64(p, &size_)) {
compression_ = static_cast<CompressionType>(p->data()[0]);
return Status::OK();
} else {
clear();
@ -149,8 +150,7 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction(
value_type ==
CompactionEventListener::CompactionListenerValueType::kValue) {
BlobHandle handle;
Slice lsmval(existing_value);
Status s = handle.DecodeFrom(&lsmval);
Status s = handle.DecodeFrom(existing_value);
if (s.ok()) {
if (impl_->debug_level_ >= 3)
ROCKS_LOG_INFO(impl_->db_options_.info_log,
@ -211,8 +211,6 @@ Status BlobDBImpl::LinkToBaseDB(DB* db) {
env_ = db_->GetEnv();
opt_db_.reset(new OptimisticTransactionDBImpl(db, false));
Status s = env_->CreateDirIfMissing(blob_dir_);
if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
@ -237,7 +235,6 @@ BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
: BlobDB(db),
db_impl_(static_cast_with_check<DBImpl, DB>(db)),
opt_db_(new OptimisticTransactionDBImpl(db, false)),
wo_set_(false),
bdb_options_(blob_db_options),
db_options_(db->GetOptions()),
@ -827,8 +824,8 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
extendTTL(&(bfile->ttl_range_), expiration);
}
return WriteBatchInternal::Put(&updates_blob_, column_family_id, key,
index_entry);
return WriteBatchInternal::PutBlobIndex(&updates_blob_, column_family_id,
key, index_entry);
}
virtual Status DeleteCF(uint32_t column_family_id,
@ -997,18 +994,6 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
std::string headerbuf;
Writer::ConstructBlobHeader(&headerbuf, key, value, expiration, -1);
// this is another more safer way to do it, where you keep the writeLock
// for the entire write path. this will increase latency and reduce
// throughput
// WriteLock lockbfile_w(&bfile->mutex_);
// std::shared_ptr<Writer> writer =
// CheckOrCreateWriterLocked(bfile);
if (debug_level_ >= 3)
ROCKS_LOG_DEBUG(
db_options_.info_log, ">Adding KEY FILE: %s: KEY: %s VALSZ: %d",
bfile->PathName().c_str(), key.ToString().c_str(), value.size());
std::string index_entry;
Status s = AppendBlob(bfile, headerbuf, key, value, &index_entry);
if (!s.ok()) {
@ -1022,20 +1007,25 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
}
WriteBatch batch;
batch.Put(key, index_entry);
uint32_t column_family_id =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
s = WriteBatchInternal::PutBlobIndex(&batch, column_family_id, key,
index_entry);
// this goes to the base db and can be expensive
s = db_->Write(options, &batch);
// this is the sequence number of the write.
SequenceNumber sn = WriteBatchInternal::Sequence(&batch);
bfile->ExtendSequenceRange(sn);
if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration);
if (s.ok()) {
s = db_->Write(options, &batch);
}
if (s.ok()) {
// this is the sequence number of the write.
SequenceNumber sn = WriteBatchInternal::Sequence(&batch);
bfile->ExtendSequenceRange(sn);
if (expiration != kNoExpiration) {
extendTTL(&(bfile->ttl_range_), expiration);
}
s = CloseBlobFileIfNeeded(bfile);
}
@ -1112,21 +1102,16 @@ std::vector<Status> BlobDBImpl::MultiGet(
// fetch and index entry and reading from the file.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
std::vector<std::string> values_lsm;
values_lsm.resize(keys.size());
auto statuses = db_->MultiGet(ro, keys, &values_lsm);
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1");
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2");
values->resize(keys.size());
assert(statuses.size() == keys.size());
assert(values_lsm.size() == keys.size());
for (size_t i = 0; i < keys.size(); ++i) {
if (!statuses[i].ok()) {
continue;
}
Status s = CommonGet(keys[i], values_lsm[i], &((*values)[i]));
statuses[i] = s;
std::vector<Status> statuses;
statuses.reserve(keys.size());
values->clear();
values->reserve(keys.size());
PinnableSlice value;
for (size_t i = 0; i < keys.size(); i++) {
statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
values->push_back(value.ToString());
value.Reset();
}
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
@ -1143,12 +1128,11 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
return true;
}
Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
std::string* value) {
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value) {
assert(value != nullptr);
Slice index_entry_slice(index_entry);
BlobHandle handle;
Status s = handle.DecodeFrom(&index_entry_slice);
Status s = handle.DecodeFrom(index_entry);
if (!s.ok()) return s;
// offset has to have certain min, as we will read CRC
@ -1179,9 +1163,8 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
bfile = hitr->second;
}
// 0 - size
if (!handle.size() && value != nullptr) {
value->clear();
if (handle.size() == 0 && value != nullptr) {
value->PinSelf("");
return Status::OK();
}
@ -1189,7 +1172,7 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
std::shared_ptr<RandomAccessFileReader> reader =
GetOrOpenRandomAccessReader(bfile, env_, env_options_);
std::string* valueptr = value;
std::string* valueptr = value->GetSelf();
std::string value_c;
if (bdb_options_.compression != kNoCompression) {
valueptr = &value_c;
@ -1251,9 +1234,11 @@ Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
*(cfh->cfd()->ioptions()));
*value = contents.data.ToString();
*(value->GetSelf()) = contents.data.ToString();
}
value->PinSelf();
return s;
}
@ -1271,13 +1256,16 @@ Status BlobDBImpl::Get(const ReadOptions& read_options,
bool snapshot_created = SetSnapshotIfNeeded(&ro);
Status s;
std::string index_entry;
s = db_->Get(ro, key, &index_entry);
bool is_blob_index = false;
s = db_impl_->GetImpl(ro, column_family, key, value, nullptr /*value_found*/,
nullptr /*read_callback*/, &is_blob_index);
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
if (s.ok()) {
s = CommonGet(key, index_entry, value->GetSelf());
value->PinSelf();
if (is_blob_index) {
PinnableSlice index_entry = std::move(*value);
s = GetBlobValue(key, index_entry, value);
}
}
if (snapshot_created) {
db_->ReleaseSnapshot(ro.snapshot);
@ -1285,15 +1273,6 @@ Status BlobDBImpl::Get(const ReadOptions& read_options,
return s;
}
Slice BlobDBIterator::value() const {
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1");
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2");
Slice index_entry = iter_->value();
Status s =
db_impl_->CommonGet(iter_->key(), index_entry.ToString(false), &vpart_);
return Slice(vpart_);
}
std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
if (aborted) return std::make_pair(false, -1);
@ -1411,14 +1390,13 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
return true;
}
bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& lsmValue) {
Slice val(lsmValue);
bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) {
BlobHandle handle;
Status s = handle.DecodeFrom(&val);
Status s = handle.DecodeFrom(index_entry);
if (!s.ok()) {
ROCKS_LOG_INFO(db_options_.info_log,
"Could not parse lsm val in MarkBlobDeleted %s",
lsmValue.ToString().c_str());
index_entry.ToString().c_str());
return false;
}
bool succ = FindFileAndEvictABlob(handle.filenumber(), key.size(),
@ -1618,7 +1596,52 @@ std::pair<bool, int64_t> BlobDBImpl::WaStats(bool aborted) {
return std::make_pair(true, -1);
}
////////////////////////////////////////////////////////////////////////////////
// Write callback for garbage collection to check if key has been updated
// since last read. Similar to how OptimisticTransaction works. See inline
// comment in GCFileAndUpdateLSM().
class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback {
public:
GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key,
SequenceNumber upper_bound)
: cfd_(cfd), key_(key), upper_bound_(upper_bound) {}
virtual Status Callback(DB* db) override {
auto* db_impl = reinterpret_cast<DBImpl*>(db);
auto* sv = db_impl->GetAndRefSuperVersion(cfd_);
SequenceNumber latest_seq = 0;
bool found_record_for_key = false;
bool is_blob_index = false;
Status s = db_impl->GetLatestSequenceForKey(
sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key,
&is_blob_index);
db_impl->ReturnAndCleanupSuperVersion(cfd_, sv);
if (!s.ok() && !s.IsNotFound()) {
// Error.
assert(!s.IsBusy());
return s;
}
if (s.IsNotFound()) {
assert(!found_record_for_key);
return Status::Busy("Key deleted");
}
assert(found_record_for_key);
assert(is_blob_index);
if (latest_seq > upper_bound_) {
return Status::Busy("Key overwritten");
}
return s;
}
virtual bool AllowWriteBatching() override { return false; }
private:
ColumnFamilyData* cfd_;
// Key to check
Slice key_;
// Upper bound of sequence number to proceed.
SequenceNumber upper_bound_;
};
// iterate over the blobs sequentially and check if the blob sequence number
// is the latest. If it is the latest, preserve it, otherwise delete it
// if it is TTL based, and the TTL has expired, then
@ -1631,7 +1654,6 @@ std::pair<bool, int64_t> BlobDBImpl::WaStats(bool aborted) {
//
// if it is not TTL based, then we can blow the key if the key has been
// DELETED in the LSM
////////////////////////////////////////////////////////////////////////////////
Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gc_stats) {
uint64_t now = EpochNow();
@ -1656,14 +1678,14 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
bool first_gc = bfptr->gc_once_after_open_;
ColumnFamilyHandle* cfh = bfptr->GetColumnFamily(db_);
auto* cfh = bfptr->GetColumnFamily(db_);
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto column_family_id = cfd->GetID();
bool has_ttl = header.HasTTL();
// this reads the key but skips the blob
Reader::ReadLevel shallow = Reader::kReadHeaderKey;
assert(opt_db_);
bool no_relocation_ttl = (has_ttl && now >= bfptr->GetTTLRange().second);
bool no_relocation_lsmdel = false;
@ -1683,59 +1705,52 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
BlobLogRecord record;
std::shared_ptr<BlobFile> newfile;
std::shared_ptr<Writer> new_writer;
Transaction* transaction = nullptr;
uint64_t blob_offset = 0;
bool retry = false;
static const WriteOptions kGarbageCollectionWriteOptions = []() {
WriteOptions write_options;
// It is ok to ignore column families that were dropped.
write_options.ignore_missing_column_families = true;
return write_options;
}();
while (true) {
assert(s.ok());
if (retry) {
// Retry in case transaction fail with Status::TryAgain.
retry = false;
} else {
// Read the next blob record.
Status read_record_status =
reader->ReadRecord(&record, shallow, &blob_offset);
// Exit if we reach the end of blob file.
// TODO(yiwu): properly handle ReadRecord error.
if (!read_record_status.ok()) {
break;
}
gc_stats->blob_count++;
}
transaction =
opt_db_->BeginTransaction(kGarbageCollectionWriteOptions,
OptimisticTransactionOptions(), transaction);
std::string index_entry;
Status get_status = transaction->GetForUpdate(ReadOptions(), cfh,
record.Key(), &index_entry);
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate");
if (get_status.IsNotFound()) {
// Key has been deleted. Drop the blob record.
continue;
// Read the next blob record.
Status read_record_status =
reader->ReadRecord(&record, shallow, &blob_offset);
// Exit if we reach the end of blob file.
// TODO(yiwu): properly handle ReadRecord error.
if (!read_record_status.ok()) {
break;
}
if (!get_status.ok()) {
gc_stats->blob_count++;
// Similar to OptimisticTransaction, we obtain latest_seq from
// base DB, which is guaranteed to be no smaller than the sequence of
// current key. We use a WriteCallback on write to check the key sequence
// on write. If the key sequence is larger than latest_seq, we know
// a new versions is inserted and the old blob can be disgard.
//
// We cannot use OptimisticTransaction because we need to pass
// is_blob_index flag to GetImpl.
SequenceNumber latest_seq = GetLatestSequenceNumber();
bool is_blob_index = false;
PinnableSlice index_entry;
Status get_status = db_impl_->GetImpl(
ReadOptions(), cfh, record.Key(), &index_entry, nullptr /*value_found*/,
nullptr /*read_callback*/, &is_blob_index);
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
if (!get_status.ok() && !get_status.ok()) {
// error
s = get_status;
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while getting index entry: %s",
s.ToString().c_str());
break;
}
if (get_status.IsNotFound() || !is_blob_index) {
// Either the key is deleted or updated with a newer version whish is
// inlined in LSM.
continue;
}
// TODO(yiwu): We should have an override of GetForUpdate returning a
// PinnableSlice.
Slice index_entry_slice(index_entry);
BlobHandle handle;
s = handle.DecodeFrom(&index_entry_slice);
s = handle.DecodeFrom(index_entry);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Error while decoding index entry: %s",
@ -1748,21 +1763,24 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
continue;
}
GarbageCollectionWriteCallback callback(cfd, record.Key(), latest_seq);
// If key has expired, remove it from base DB.
if (no_relocation_ttl || (has_ttl && now >= record.GetTTL())) {
gc_stats->num_deletes++;
gc_stats->deleted_size += record.GetBlobSize();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
transaction->Delete(cfh, record.Key());
Status delete_status = transaction->Commit();
WriteBatch delete_batch;
Status delete_status = delete_batch.Delete(record.Key());
if (delete_status.ok()) {
delete_status = db_impl_->WriteWithCallback(WriteOptions(),
&delete_batch, &callback);
}
if (delete_status.ok()) {
gc_stats->delete_succeeded++;
} else if (delete_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
gc_stats->overwritten_while_delete++;
} else if (delete_status.IsTryAgain()) {
// Retry the transaction.
retry = true;
} else {
// We hit an error.
s = delete_status;
@ -1829,29 +1847,27 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
BlobLogRecord::kHeaderSize + record.Key().size() + record.Blob().size();
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
transaction->Put(cfh, record.Key(), new_index_entry);
Status put_status = transaction->Commit();
if (put_status.ok()) {
WriteBatch rewrite_batch;
Status rewrite_status = WriteBatchInternal::PutBlobIndex(
&rewrite_batch, column_family_id, record.Key(), new_index_entry);
if (rewrite_status.ok()) {
rewrite_status = db_impl_->WriteWithCallback(WriteOptions(),
&rewrite_batch, &callback);
}
if (rewrite_status.ok()) {
gc_stats->relocate_succeeded++;
} else if (put_status.IsBusy()) {
} else if (rewrite_status.IsBusy()) {
// The key is overwritten in the meanwhile. Drop the blob record.
gc_stats->overwritten_while_relocate++;
} else if (put_status.IsTryAgain()) {
// Retry the transaction.
// TODO(yiwu): On retry, we can reuse the new blob record.
retry = true;
} else {
// We hit an error.
s = put_status;
s = rewrite_status;
ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s",
s.ToString().c_str());
break;
}
} // end of ReadRecord loop
if (transaction != nullptr) {
delete transaction;
}
ROCKS_LOG_INFO(
db_options_.info_log,
"%s blob file %" PRIu64
@ -2195,12 +2211,20 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
}
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
// Get a snapshot to avoid blob file get deleted between we
// fetch and index entry and reading from the file.
ReadOptions ro(read_options);
bool snapshot_created = SetSnapshotIfNeeded(&ro);
return new BlobDBIterator(db_->NewIterator(ro), this, snapshot_created,
ro.snapshot);
ManagedSnapshot* own_snapshot = nullptr;
const Snapshot* snapshot = read_options.snapshot;
if (snapshot == nullptr) {
own_snapshot = new ManagedSnapshot(db_);
snapshot = own_snapshot->snapshot();
}
auto* iter = db_impl_->NewIteratorImpl(
read_options, cfd, snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/);
return new BlobDBIterator(own_snapshot, iter, this);
}
Status DestroyBlobDB(const std::string& dbname, const Options& options,

View File

@ -18,6 +18,7 @@
#include <utility>
#include <vector>
#include "db/db_iter.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/listener.h"
@ -37,7 +38,6 @@ namespace rocksdb {
class DBImpl;
class ColumnFamilyHandle;
class ColumnFamilyData;
class OptimisticTransactionDBImpl;
struct FlushJobInfo;
namespace blob_db {
@ -215,9 +215,20 @@ class BlobDBImpl : public BlobDB {
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status GetBlobValue(const Slice& key, const Slice& index_entry,
PinnableSlice* value);
using BlobDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
using BlobDB::NewIterators;
virtual Status NewIterators(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override {
return Status::NotSupported("Not implemented");
}
using BlobDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& read_options,
@ -269,15 +280,14 @@ class BlobDBImpl : public BlobDB {
#endif // !NDEBUG
private:
class GarbageCollectionWriteCallback;
Status OpenPhase1();
// Create a snapshot if there isn't one in read options.
// Return true if a snapshot is created.
bool SetSnapshotIfNeeded(ReadOptions* read_options);
Status CommonGet(const Slice& key, const std::string& index_entry,
std::string* value);
Slice GetCompressedSlice(const Slice& raw,
std::string* compression_output) const;
@ -416,10 +426,6 @@ class BlobDBImpl : public BlobDB {
Env* env_;
TTLExtractor* ttl_extractor_;
// Optimistic Transaction DB used during Garbage collection
// for atomicity
std::unique_ptr<OptimisticTransactionDBImpl> opt_db_;
// a boolean to capture whether write_options has been set
std::atomic<bool> wo_set_;
WriteOptions write_options_;
@ -527,55 +533,6 @@ class BlobDBImpl : public BlobDB {
uint32_t debug_level_;
};
class BlobDBIterator : public Iterator {
public:
explicit BlobDBIterator(Iterator* iter, BlobDBImpl* impl, bool own_snapshot,
const Snapshot* snapshot)
: iter_(iter),
db_impl_(impl),
own_snapshot_(own_snapshot),
snapshot_(snapshot) {
assert(iter != nullptr);
assert(snapshot != nullptr);
}
~BlobDBIterator() {
if (own_snapshot_) {
db_impl_->ReleaseSnapshot(snapshot_);
}
delete iter_;
}
bool Valid() const override { return iter_->Valid(); }
void SeekToFirst() override { iter_->SeekToFirst(); }
void SeekToLast() override { iter_->SeekToLast(); }
void Seek(const Slice& target) override { iter_->Seek(target); }
void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); }
void Next() override { iter_->Next(); }
void Prev() override { iter_->Prev(); }
Slice key() const override { return iter_->key(); }
Slice value() const override;
Status status() const override { return iter_->status(); }
// Iterator::Refresh() not supported.
private:
Iterator* iter_;
BlobDBImpl* db_impl_;
bool own_snapshot_;
const Snapshot* snapshot_;
mutable std::string vpart_;
};
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -0,0 +1,104 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/iterator.h"
#include "utilities/blob_db/blob_db_impl.h"
namespace rocksdb {
namespace blob_db {
using rocksdb::ManagedSnapshot;
class BlobDBIterator : public Iterator {
public:
BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter,
BlobDBImpl* blob_db)
: snapshot_(snapshot), iter_(iter), blob_db_(blob_db) {}
virtual ~BlobDBIterator() = default;
bool Valid() const override {
if (!iter_->Valid()) {
return false;
}
return status_.ok();
}
Status status() const override {
if (!iter_->status().ok()) {
return iter_->status();
}
return status_;
}
void SeekToFirst() override {
iter_->SeekToFirst();
UpdateBlobValue();
}
void SeekToLast() override {
iter_->SeekToLast();
UpdateBlobValue();
}
void Seek(const Slice& target) override {
iter_->Seek(target);
UpdateBlobValue();
}
void SeekForPrev(const Slice& target) override {
iter_->SeekForPrev(target);
UpdateBlobValue();
}
void Next() override {
assert(Valid());
iter_->Next();
UpdateBlobValue();
}
void Prev() override {
assert(Valid());
iter_->Prev();
UpdateBlobValue();
}
Slice key() const override {
assert(Valid());
return iter_->key();
}
Slice value() const override {
assert(Valid());
if (!iter_->IsBlob()) {
return iter_->value();
}
return value_;
}
// Iterator::Refresh() not supported.
private:
void UpdateBlobValue() {
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1");
TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2");
value_.Reset();
if (iter_->Valid() && iter_->IsBlob()) {
status_ = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_);
}
}
std::unique_ptr<ManagedSnapshot> snapshot_;
std::unique_ptr<ArenaWrappedDBIter> iter_;
BlobDBImpl* blob_db_;
Status status_;
PinnableSlice value_;
};
} // namespace blob_db
} // namespace rocksdb
#endif // !ROCKSDB_LITE

View File

@ -88,9 +88,14 @@ class BlobDBTest : public testing::Test {
void PutRandom(const std::string &key, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
PutRandom(blob_db_, key, rnd, data);
}
void PutRandom(DB *db, const std::string &key, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1;
std::string value = test::RandomHumanReadableString(rnd, len);
ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value)));
ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
if (data != nullptr) {
(*data)[key] = value;
}
@ -116,9 +121,12 @@ class BlobDBTest : public testing::Test {
}
// Verify blob db contain expected data and nothing more.
// TODO(yiwu): Verify blob files are consistent with data in LSM.
void VerifyDB(const std::map<std::string, std::string> &data) {
Iterator *iter = blob_db_->NewIterator(ReadOptions());
VerifyDB(blob_db_, data);
}
void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
Iterator *iter = db->NewIterator(ReadOptions());
iter->SeekToFirst();
for (auto &p : data) {
ASSERT_TRUE(iter->Valid());
@ -593,7 +601,7 @@ TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate",
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
@ -630,7 +638,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
mock_env_->set_now_micros(300 * 1000000);
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetForUpdate",
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
"BlobDBImpl::PutUntil:Start"},
{"BlobDBImpl::PutUntil:Finish",
"BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
@ -687,7 +695,7 @@ TEST_F(BlobDBTest, GCOldestSimpleBlobFileWhenOutOfSpace) {
TEST_F(BlobDBTest, ReadWhileGC) {
// run the same test for Get(), MultiGet() and Iterator each.
for (int i = 0; i < 3; i++) {
for (int i = 0; i < 2; i++) {
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options);
@ -710,17 +718,10 @@ TEST_F(BlobDBTest, ReadWhileGC) {
break;
case 1:
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::MultiGet:AfterIndexEntryGet:1",
{{"BlobDBIterator::UpdateBlobValue:Start:1",
"BlobDBTest::ReadWhileGC:1"},
{"BlobDBTest::ReadWhileGC:2",
"BlobDBImpl::MultiGet:AfterIndexEntryGet:2"}});
break;
case 2:
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBIterator::value:BeforeGetBlob:1",
"BlobDBTest::ReadWhileGC:1"},
{"BlobDBTest::ReadWhileGC:2",
"BlobDBIterator::value:BeforeGetBlob:2"}});
"BlobDBIterator::UpdateBlobValue:Start:2"}});
break;
}
SyncPoint::GetInstance()->EnableProcessing();
@ -735,12 +736,6 @@ TEST_F(BlobDBTest, ReadWhileGC) {
ASSERT_EQ("bar", value);
break;
case 1:
statuses = blob_db_->MultiGet(ReadOptions(), {"foo"}, &values);
ASSERT_EQ(1, statuses.size());
ASSERT_EQ(1, values.size());
ASSERT_EQ("bar", values[0]);
break;
case 2:
// VerifyDB use iterator to scan the DB.
VerifyDB({{"foo", "bar"}});
break;
@ -834,6 +829,58 @@ TEST_F(BlobDBTest, GetLiveFilesMetaData) {
VerifyDB(data);
}
TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
constexpr size_t kNumKey = 20;
constexpr size_t kNumIteration = 10;
Random rnd(301);
std::map<std::string, std::string> data;
std::vector<bool> is_blob(kNumKey, false);
// Write to plain rocksdb.
Options options;
options.create_if_missing = true;
DB *db = nullptr;
ASSERT_OK(DB::Open(options, dbname_, &db));
for (size_t i = 0; i < kNumIteration; i++) {
auto key_index = rnd.Next() % kNumKey;
std::string key = "key" + ToString(key_index);
PutRandom(db, key, &rnd, &data);
}
VerifyDB(db, data);
delete db;
db = nullptr;
// Open as blob db. Verify it can read existing data.
Open();
VerifyDB(blob_db_, data);
for (size_t i = 0; i < kNumIteration; i++) {
auto key_index = rnd.Next() % kNumKey;
std::string key = "key" + ToString(key_index);
is_blob[key_index] = true;
PutRandom(blob_db_, key, &rnd, &data);
}
VerifyDB(blob_db_, data);
delete blob_db_;
blob_db_ = nullptr;
// Verify plain db return error for keys written by blob db.
ASSERT_OK(DB::Open(options, dbname_, &db));
std::string value;
for (size_t i = 0; i < kNumKey; i++) {
std::string key = "key" + ToString(i);
Status s = db->Get(ReadOptions(), key, &value);
if (data.count(key) == 0) {
ASSERT_TRUE(s.IsNotFound());
} else if (is_blob[i]) {
ASSERT_TRUE(s.IsNotSupported());
} else {
ASSERT_OK(s);
ASSERT_EQ(data[key], value);
}
}
delete db;
}
} // namespace blob_db
} // namespace rocksdb