Make it explicit blob db doesn't support CF
Summary: Blob db doesn't currently support column families. Return NotSupported status explicitly. Closes https://github.com/facebook/rocksdb/pull/2825 Differential Revision: D5757438 Pulled By: yiwu-arbug fbshipit-source-id: 44de9408fd032c98e8ae337d4db4ed37169bd9fa
This commit is contained in:
parent
65aec19df1
commit
eae53de3b5
@ -85,34 +85,55 @@ struct BlobDBOptions {
|
|||||||
class BlobDB : public StackableDB {
|
class BlobDB : public StackableDB {
|
||||||
public:
|
public:
|
||||||
using rocksdb::StackableDB::Put;
|
using rocksdb::StackableDB::Put;
|
||||||
|
virtual Status Put(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value) override = 0;
|
||||||
virtual Status Put(const WriteOptions& options,
|
virtual Status Put(const WriteOptions& options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
const Slice& value) override = 0;
|
const Slice& value) override {
|
||||||
|
if (column_family != DefaultColumnFamily()) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
}
|
||||||
|
return Put(options, key, value);
|
||||||
|
}
|
||||||
|
|
||||||
using rocksdb::StackableDB::Delete;
|
using rocksdb::StackableDB::Delete;
|
||||||
virtual Status Delete(const WriteOptions& options,
|
virtual Status Delete(const WriteOptions& options,
|
||||||
ColumnFamilyHandle* column_family,
|
|
||||||
const Slice& key) override = 0;
|
const Slice& key) override = 0;
|
||||||
|
virtual Status Delete(const WriteOptions& options,
|
||||||
virtual Status PutWithTTL(const WriteOptions& options,
|
ColumnFamilyHandle* column_family,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
const Slice& key) override {
|
||||||
const Slice& value, uint64_t ttl) = 0;
|
if (column_family != DefaultColumnFamily()) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
}
|
||||||
|
return Delete(options, key);
|
||||||
|
}
|
||||||
|
|
||||||
virtual Status PutWithTTL(const WriteOptions& options, const Slice& key,
|
virtual Status PutWithTTL(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value, uint64_t ttl) = 0;
|
||||||
|
virtual Status PutWithTTL(const WriteOptions& options,
|
||||||
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
const Slice& value, uint64_t ttl) {
|
const Slice& value, uint64_t ttl) {
|
||||||
return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl);
|
if (column_family != DefaultColumnFamily()) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
}
|
||||||
|
return PutWithTTL(options, key, value, ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put with expiration. Key with expiration time equal to
|
// Put with expiration. Key with expiration time equal to
|
||||||
// std::numeric_limits<uint64_t>::max() means the key don't expire.
|
// std::numeric_limits<uint64_t>::max() means the key don't expire.
|
||||||
|
virtual Status PutUntil(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value, uint64_t expiration) = 0;
|
||||||
virtual Status PutUntil(const WriteOptions& options,
|
virtual Status PutUntil(const WriteOptions& options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
const Slice& value, uint64_t expiration) = 0;
|
|
||||||
|
|
||||||
virtual Status PutUntil(const WriteOptions& options, const Slice& key,
|
|
||||||
const Slice& value, uint64_t expiration) {
|
const Slice& value, uint64_t expiration) {
|
||||||
return PutUntil(options, DefaultColumnFamily(), key, value, expiration);
|
if (column_family != DefaultColumnFamily()) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
}
|
||||||
|
return PutUntil(options, key, value, expiration);
|
||||||
}
|
}
|
||||||
|
|
||||||
using rocksdb::StackableDB::Get;
|
using rocksdb::StackableDB::Get;
|
||||||
@ -123,25 +144,52 @@ class BlobDB : public StackableDB {
|
|||||||
using rocksdb::StackableDB::MultiGet;
|
using rocksdb::StackableDB::MultiGet;
|
||||||
virtual std::vector<Status> MultiGet(
|
virtual std::vector<Status> MultiGet(
|
||||||
const ReadOptions& options,
|
const ReadOptions& options,
|
||||||
const std::vector<ColumnFamilyHandle*>& column_family,
|
|
||||||
const std::vector<Slice>& keys,
|
const std::vector<Slice>& keys,
|
||||||
std::vector<std::string>* values) override = 0;
|
std::vector<std::string>* values) override = 0;
|
||||||
|
virtual std::vector<Status> MultiGet(
|
||||||
|
const ReadOptions& options,
|
||||||
|
const std::vector<ColumnFamilyHandle*>& column_families,
|
||||||
|
const std::vector<Slice>& keys,
|
||||||
|
std::vector<std::string>* values) override {
|
||||||
|
for (auto column_family : column_families) {
|
||||||
|
if (column_family != DefaultColumnFamily()) {
|
||||||
|
return std::vector<Status>(
|
||||||
|
column_families.size(),
|
||||||
|
Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family."));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return MultiGet(options, keys, values);
|
||||||
|
}
|
||||||
|
|
||||||
using rocksdb::StackableDB::SingleDelete;
|
using rocksdb::StackableDB::SingleDelete;
|
||||||
virtual Status SingleDelete(const WriteOptions& wopts,
|
virtual Status SingleDelete(const WriteOptions& /*wopts*/,
|
||||||
ColumnFamilyHandle* column_family,
|
ColumnFamilyHandle* /*column_family*/,
|
||||||
const Slice& key) override = 0;
|
const Slice& /*key*/) override {
|
||||||
|
return Status::NotSupported("Not supported operation in blob db.");
|
||||||
|
}
|
||||||
|
|
||||||
using rocksdb::StackableDB::Merge;
|
using rocksdb::StackableDB::Merge;
|
||||||
virtual Status Merge(const WriteOptions& options,
|
virtual Status Merge(const WriteOptions& /*options*/,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* /*column_family*/,
|
||||||
const Slice& value) override {
|
const Slice& /*key*/, const Slice& /*value*/) override {
|
||||||
return Status::NotSupported("Not supported operation in blob db.");
|
return Status::NotSupported("Not supported operation in blob db.");
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status Write(const WriteOptions& opts,
|
virtual Status Write(const WriteOptions& opts,
|
||||||
WriteBatch* updates) override = 0;
|
WriteBatch* updates) override = 0;
|
||||||
|
|
||||||
|
using rocksdb::StackableDB::NewIterator;
|
||||||
|
virtual Iterator* NewIterator(const ReadOptions& options) override = 0;
|
||||||
|
virtual Iterator* NewIterator(const ReadOptions& options,
|
||||||
|
ColumnFamilyHandle* column_family) override {
|
||||||
|
if (column_family != DefaultColumnFamily()) {
|
||||||
|
// Blob DB doesn't support non-default column family.
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
return NewIterator(options);
|
||||||
|
}
|
||||||
|
|
||||||
// Starting point for opening a Blob DB.
|
// Starting point for opening a Blob DB.
|
||||||
// changed_options - critical. Blob DB loads and inserts listeners
|
// changed_options - critical. Blob DB loads and inserts listeners
|
||||||
// into options which are necessary for recovery and atomicity
|
// into options which are necessary for recovery and atomicity
|
||||||
|
@ -749,32 +749,20 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
|
|||||||
return bfile;
|
return bfile;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::Put(const WriteOptions& options,
|
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
const Slice& value) {
|
const Slice& value) {
|
||||||
std::string new_value;
|
std::string new_value;
|
||||||
Slice value_slice;
|
Slice value_slice;
|
||||||
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
|
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
|
||||||
return PutUntil(options, column_family, key, value_slice, expiration);
|
return PutUntil(options, key, value_slice, expiration);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::Delete(const WriteOptions& options,
|
Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) {
|
||||||
ColumnFamilyHandle* column_family, const Slice& key) {
|
|
||||||
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
|
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
|
||||||
Status s = db_->Delete(options, column_family, key);
|
Status s = db_->Delete(options, key);
|
||||||
|
|
||||||
// add deleted key to list of keys that have been deleted for book-keeping
|
// add deleted key to list of keys that have been deleted for book-keeping
|
||||||
delete_keys_q_.enqueue({column_family, key.ToString(), lsn});
|
delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn});
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
Status BlobDBImpl::SingleDelete(const WriteOptions& wopts,
|
|
||||||
ColumnFamilyHandle* column_family,
|
|
||||||
const Slice& key) {
|
|
||||||
SequenceNumber lsn = db_impl_->GetLatestSequenceNumber();
|
|
||||||
Status s = db_->SingleDelete(wopts, column_family, key);
|
|
||||||
|
|
||||||
delete_keys_q_.enqueue({column_family, key.ToString(), lsn});
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -788,10 +776,17 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|||||||
std::shared_ptr<BlobFile> last_file_;
|
std::shared_ptr<BlobFile> last_file_;
|
||||||
bool has_put_;
|
bool has_put_;
|
||||||
std::string new_value_;
|
std::string new_value_;
|
||||||
|
uint32_t default_cf_id_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
|
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
|
||||||
: impl_(impl), sequence_(seq), has_put_(false) {}
|
: impl_(impl),
|
||||||
|
sequence_(seq),
|
||||||
|
has_put_(false),
|
||||||
|
default_cf_id_(reinterpret_cast<ColumnFamilyHandleImpl*>(
|
||||||
|
impl_->DefaultColumnFamily())
|
||||||
|
->cfd()
|
||||||
|
->GetID()) {}
|
||||||
|
|
||||||
WriteBatch& updates_blob() { return updates_blob_; }
|
WriteBatch& updates_blob() { return updates_blob_; }
|
||||||
|
|
||||||
@ -803,6 +798,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|||||||
|
|
||||||
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
||||||
const Slice& value_slice) override {
|
const Slice& value_slice) override {
|
||||||
|
if (column_family_id != default_cf_id_) {
|
||||||
|
batch_rewrite_status_ = Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
return batch_rewrite_status_;
|
||||||
|
}
|
||||||
Slice value_unc;
|
Slice value_unc;
|
||||||
uint64_t expiration =
|
uint64_t expiration =
|
||||||
impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
|
impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
|
||||||
@ -851,11 +851,28 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|||||||
|
|
||||||
virtual Status DeleteCF(uint32_t column_family_id,
|
virtual Status DeleteCF(uint32_t column_family_id,
|
||||||
const Slice& key) override {
|
const Slice& key) override {
|
||||||
|
if (column_family_id != default_cf_id_) {
|
||||||
|
batch_rewrite_status_ = Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
return batch_rewrite_status_;
|
||||||
|
}
|
||||||
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key);
|
WriteBatchInternal::Delete(&updates_blob_, column_family_id, key);
|
||||||
sequence_++;
|
sequence_++;
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual Status DeleteRange(uint32_t column_family_id,
|
||||||
|
const Slice& begin_key, const Slice& end_key) {
|
||||||
|
if (column_family_id != default_cf_id_) {
|
||||||
|
batch_rewrite_status_ = Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
return batch_rewrite_status_;
|
||||||
|
}
|
||||||
|
WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id,
|
||||||
|
begin_key, end_key);
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
|
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
|
||||||
const Slice& /*key*/) override {
|
const Slice& /*key*/) override {
|
||||||
batch_rewrite_status_ =
|
batch_rewrite_status_ =
|
||||||
@ -932,12 +949,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
||||||
ColumnFamilyHandle* column_family,
|
|
||||||
const Slice& key, const Slice& value,
|
const Slice& key, const Slice& value,
|
||||||
uint64_t ttl) {
|
uint64_t ttl) {
|
||||||
uint64_t now = EpochNow();
|
uint64_t now = EpochNow();
|
||||||
assert(std::numeric_limits<uint64_t>::max() - now > ttl);
|
assert(std::numeric_limits<uint64_t>::max() - now > ttl);
|
||||||
return PutUntil(options, column_family, key, value, now + ttl);
|
return PutUntil(options, key, value, now + ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
||||||
@ -952,8 +968,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
|||||||
return *compression_output;
|
return *compression_output;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
const Slice& value_unc, uint64_t expiration) {
|
const Slice& value_unc, uint64_t expiration) {
|
||||||
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
|
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
|
||||||
MutexLock l(&write_mutex_);
|
MutexLock l(&write_mutex_);
|
||||||
@ -992,14 +1007,11 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
|
|||||||
bfile->PathName().c_str(), key.ToString().c_str(),
|
bfile->PathName().c_str(), key.ToString().c_str(),
|
||||||
value.size(), s.ToString().c_str(),
|
value.size(), s.ToString().c_str(),
|
||||||
bfile->DumpState().c_str());
|
bfile->DumpState().c_str());
|
||||||
// Fallback just write to the LSM and get going
|
return s;
|
||||||
WriteBatch batch;
|
|
||||||
batch.Put(column_family, key, value);
|
|
||||||
return db_->Write(options, &batch);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
WriteBatch batch;
|
WriteBatch batch;
|
||||||
batch.Put(column_family, key, index_entry);
|
batch.Put(key, index_entry);
|
||||||
|
|
||||||
// this goes to the base db and can be expensive
|
// this goes to the base db and can be expensive
|
||||||
s = db_->Write(options, &batch);
|
s = db_->Write(options, &batch);
|
||||||
@ -1123,7 +1135,6 @@ Status BlobDBImpl::AppendSN(const std::shared_ptr<BlobFile>& bfile,
|
|||||||
|
|
||||||
std::vector<Status> BlobDBImpl::MultiGet(
|
std::vector<Status> BlobDBImpl::MultiGet(
|
||||||
const ReadOptions& read_options,
|
const ReadOptions& read_options,
|
||||||
const std::vector<ColumnFamilyHandle*>& column_family,
|
|
||||||
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
||||||
// Get a snapshot to avoid blob file get deleted between we
|
// Get a snapshot to avoid blob file get deleted between we
|
||||||
// fetch and index entry and reading from the file.
|
// fetch and index entry and reading from the file.
|
||||||
@ -1131,21 +1142,18 @@ std::vector<Status> BlobDBImpl::MultiGet(
|
|||||||
bool snapshot_created = SetSnapshotIfNeeded(&ro);
|
bool snapshot_created = SetSnapshotIfNeeded(&ro);
|
||||||
std::vector<std::string> values_lsm;
|
std::vector<std::string> values_lsm;
|
||||||
values_lsm.resize(keys.size());
|
values_lsm.resize(keys.size());
|
||||||
auto statuses = db_->MultiGet(ro, column_family, keys, &values_lsm);
|
auto statuses = db_->MultiGet(ro, keys, &values_lsm);
|
||||||
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1");
|
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1");
|
||||||
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2");
|
TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2");
|
||||||
|
|
||||||
values->resize(keys.size());
|
values->resize(keys.size());
|
||||||
assert(statuses.size() == keys.size());
|
assert(statuses.size() == keys.size());
|
||||||
|
assert(values_lsm.size() == keys.size());
|
||||||
for (size_t i = 0; i < keys.size(); ++i) {
|
for (size_t i = 0; i < keys.size(); ++i) {
|
||||||
if (!statuses[i].ok()) {
|
if (!statuses[i].ok()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
Status s = CommonGet(keys[i], values_lsm[i], &((*values)[i]));
|
||||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
|
|
||||||
auto cfd = cfh->cfd();
|
|
||||||
|
|
||||||
Status s = CommonGet(cfd, keys[i], values_lsm[i], &((*values)[i]));
|
|
||||||
statuses[i] = s;
|
statuses[i] = s;
|
||||||
}
|
}
|
||||||
if (snapshot_created) {
|
if (snapshot_created) {
|
||||||
@ -1163,9 +1171,8 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
|
Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry,
|
||||||
const std::string& index_entry, std::string* value,
|
std::string* value, SequenceNumber* sequence) {
|
||||||
SequenceNumber* sequence) {
|
|
||||||
Slice index_entry_slice(index_entry);
|
Slice index_entry_slice(index_entry);
|
||||||
BlobHandle handle;
|
BlobHandle handle;
|
||||||
Status s = handle.DecodeFrom(&index_entry_slice);
|
Status s = handle.DecodeFrom(&index_entry_slice);
|
||||||
@ -1269,10 +1276,12 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
|
|||||||
|
|
||||||
if (bdb_options_.compression != kNoCompression) {
|
if (bdb_options_.compression != kNoCompression) {
|
||||||
BlockContents contents;
|
BlockContents contents;
|
||||||
|
auto cfh =
|
||||||
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
||||||
s = UncompressBlockContentsForCompressionType(
|
s = UncompressBlockContentsForCompressionType(
|
||||||
blob_value.data(), blob_value.size(), &contents,
|
blob_value.data(), blob_value.size(), &contents,
|
||||||
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
|
kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression,
|
||||||
*(cfd->ioptions()));
|
*(cfh->cfd()->ioptions()));
|
||||||
*value = contents.data.ToString();
|
*value = contents.data.ToString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1299,9 +1308,10 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
|
|||||||
Status BlobDBImpl::Get(const ReadOptions& read_options,
|
Status BlobDBImpl::Get(const ReadOptions& read_options,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
ColumnFamilyHandle* column_family, const Slice& key,
|
||||||
PinnableSlice* value) {
|
PinnableSlice* value) {
|
||||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
if (column_family != DefaultColumnFamily()) {
|
||||||
auto cfd = cfh->cfd();
|
return Status::NotSupported(
|
||||||
|
"Blob DB doesn't support non-default column family.");
|
||||||
|
}
|
||||||
// Get a snapshot to avoid blob file get deleted between we
|
// Get a snapshot to avoid blob file get deleted between we
|
||||||
// fetch and index entry and reading from the file.
|
// fetch and index entry and reading from the file.
|
||||||
// TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
|
// TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
|
||||||
@ -1310,11 +1320,11 @@ Status BlobDBImpl::Get(const ReadOptions& read_options,
|
|||||||
|
|
||||||
Status s;
|
Status s;
|
||||||
std::string index_entry;
|
std::string index_entry;
|
||||||
s = db_->Get(ro, column_family, key, &index_entry);
|
s = db_->Get(ro, key, &index_entry);
|
||||||
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
|
||||||
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
s = CommonGet(cfd, key, index_entry, value->GetSelf());
|
s = CommonGet(key, index_entry, value->GetSelf());
|
||||||
value->PinSelf();
|
value->PinSelf();
|
||||||
}
|
}
|
||||||
if (snapshot_created) {
|
if (snapshot_created) {
|
||||||
@ -1324,15 +1334,11 @@ Status BlobDBImpl::Get(const ReadOptions& read_options,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Slice BlobDBIterator::value() const {
|
Slice BlobDBIterator::value() const {
|
||||||
Slice index_entry = iter_->value();
|
|
||||||
|
|
||||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh_);
|
|
||||||
auto cfd = cfh->cfd();
|
|
||||||
|
|
||||||
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1");
|
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1");
|
||||||
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2");
|
TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2");
|
||||||
Status s = db_impl_->CommonGet(cfd, iter_->key(), index_entry.ToString(false),
|
Slice index_entry = iter_->value();
|
||||||
&vpart_);
|
Status s =
|
||||||
|
db_impl_->CommonGet(iter_->key(), index_entry.ToString(false), &vpart_);
|
||||||
return Slice(vpart_);
|
return Slice(vpart_);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2248,14 +2254,13 @@ std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
|
|||||||
return std::make_pair(true, -1);
|
return std::make_pair(true, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options,
|
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
|
||||||
ColumnFamilyHandle* column_family) {
|
|
||||||
// Get a snapshot to avoid blob file get deleted between we
|
// Get a snapshot to avoid blob file get deleted between we
|
||||||
// fetch and index entry and reading from the file.
|
// fetch and index entry and reading from the file.
|
||||||
ReadOptions ro(read_options);
|
ReadOptions ro(read_options);
|
||||||
bool snapshot_created = SetSnapshotIfNeeded(&ro);
|
bool snapshot_created = SetSnapshotIfNeeded(&ro);
|
||||||
return new BlobDBIterator(db_->NewIterator(ro, column_family), column_family,
|
return new BlobDBIterator(db_->NewIterator(ro), this, snapshot_created,
|
||||||
this, snapshot_created, ro.snapshot);
|
ro.snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DestroyBlobDB(const std::string& dbname, const Options& options,
|
Status DestroyBlobDB(const std::string& dbname, const Options& options,
|
||||||
@ -2299,8 +2304,7 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
|
|||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
return CommonGet(key, index_entry, nullptr, sequence);
|
||||||
return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
|
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
|
||||||
|
@ -205,44 +205,34 @@ class BlobDBImpl : public BlobDB {
|
|||||||
// how often to schedule check seq files period
|
// how often to schedule check seq files period
|
||||||
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
|
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
|
||||||
|
|
||||||
using rocksdb::StackableDB::Put;
|
using BlobDB::Put;
|
||||||
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
|
Status Put(const WriteOptions& options, const Slice& key,
|
||||||
const Slice& key, const Slice& value) override;
|
const Slice& value) override;
|
||||||
|
|
||||||
using rocksdb::StackableDB::Delete;
|
using BlobDB::Delete;
|
||||||
Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
|
Status Delete(const WriteOptions& options, const Slice& key) override;
|
||||||
const Slice& key) override;
|
|
||||||
|
|
||||||
using rocksdb::StackableDB::SingleDelete;
|
using BlobDB::Get;
|
||||||
virtual Status SingleDelete(const WriteOptions& wopts,
|
|
||||||
ColumnFamilyHandle* column_family,
|
|
||||||
const Slice& key) override;
|
|
||||||
|
|
||||||
using rocksdb::StackableDB::Get;
|
|
||||||
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
||||||
const Slice& key, PinnableSlice* value) override;
|
const Slice& key, PinnableSlice* value) override;
|
||||||
|
|
||||||
using rocksdb::StackableDB::NewIterator;
|
using BlobDB::NewIterator;
|
||||||
virtual Iterator* NewIterator(const ReadOptions& read_options,
|
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
|
||||||
ColumnFamilyHandle* column_family) override;
|
|
||||||
|
|
||||||
using rocksdb::StackableDB::MultiGet;
|
using BlobDB::MultiGet;
|
||||||
virtual std::vector<Status> MultiGet(
|
virtual std::vector<Status> MultiGet(
|
||||||
const ReadOptions& read_options,
|
const ReadOptions& read_options,
|
||||||
const std::vector<ColumnFamilyHandle*>& column_family,
|
|
||||||
const std::vector<Slice>& keys,
|
const std::vector<Slice>& keys,
|
||||||
std::vector<std::string>* values) override;
|
std::vector<std::string>* values) override;
|
||||||
|
|
||||||
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
|
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
|
||||||
|
|
||||||
using BlobDB::PutWithTTL;
|
using BlobDB::PutWithTTL;
|
||||||
Status PutWithTTL(const WriteOptions& options,
|
Status PutWithTTL(const WriteOptions& options, const Slice& key,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
const Slice& value, uint64_t ttl) override;
|
const Slice& value, uint64_t ttl) override;
|
||||||
|
|
||||||
using BlobDB::PutUntil;
|
using BlobDB::PutUntil;
|
||||||
Status PutUntil(const WriteOptions& options,
|
Status PutUntil(const WriteOptions& options, const Slice& key,
|
||||||
ColumnFamilyHandle* column_family, const Slice& key,
|
|
||||||
const Slice& value_unc, uint64_t expiration) override;
|
const Slice& value_unc, uint64_t expiration) override;
|
||||||
|
|
||||||
Status LinkToBaseDB(DB* db) override;
|
Status LinkToBaseDB(DB* db) override;
|
||||||
@ -282,9 +272,8 @@ class BlobDBImpl : public BlobDB {
|
|||||||
// Return true if a snapshot is created.
|
// Return true if a snapshot is created.
|
||||||
bool SetSnapshotIfNeeded(ReadOptions* read_options);
|
bool SetSnapshotIfNeeded(ReadOptions* read_options);
|
||||||
|
|
||||||
Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
|
Status CommonGet(const Slice& key, const std::string& index_entry,
|
||||||
const std::string& index_entry, std::string* value,
|
std::string* value, SequenceNumber* sequence = nullptr);
|
||||||
SequenceNumber* sequence = nullptr);
|
|
||||||
|
|
||||||
Slice GetCompressedSlice(const Slice& raw,
|
Slice GetCompressedSlice(const Slice& raw,
|
||||||
std::string* compression_output) const;
|
std::string* compression_output) const;
|
||||||
@ -705,11 +694,9 @@ class BlobFile {
|
|||||||
|
|
||||||
class BlobDBIterator : public Iterator {
|
class BlobDBIterator : public Iterator {
|
||||||
public:
|
public:
|
||||||
explicit BlobDBIterator(Iterator* iter, ColumnFamilyHandle* column_family,
|
explicit BlobDBIterator(Iterator* iter, BlobDBImpl* impl, bool own_snapshot,
|
||||||
BlobDBImpl* impl, bool own_snapshot,
|
|
||||||
const Snapshot* snapshot)
|
const Snapshot* snapshot)
|
||||||
: iter_(iter),
|
: iter_(iter),
|
||||||
cfh_(column_family),
|
|
||||||
db_impl_(impl),
|
db_impl_(impl),
|
||||||
own_snapshot_(own_snapshot),
|
own_snapshot_(own_snapshot),
|
||||||
snapshot_(snapshot) {
|
snapshot_(snapshot) {
|
||||||
@ -748,7 +735,6 @@ class BlobDBIterator : public Iterator {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
Iterator* iter_;
|
Iterator* iter_;
|
||||||
ColumnFamilyHandle* cfh_;
|
|
||||||
BlobDBImpl* db_impl_;
|
BlobDBImpl* db_impl_;
|
||||||
bool own_snapshot_;
|
bool own_snapshot_;
|
||||||
const Snapshot* snapshot_;
|
const Snapshot* snapshot_;
|
||||||
|
@ -803,6 +803,39 @@ TEST_F(BlobDBTest, ReadWhileGC) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
|
||||||
|
Options options;
|
||||||
|
options.env = mock_env_.get();
|
||||||
|
mock_env_->set_now_micros(0);
|
||||||
|
Open(BlobDBOptions(), options);
|
||||||
|
ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
|
||||||
|
ColumnFamilyHandle *handle = nullptr;
|
||||||
|
std::string value;
|
||||||
|
std::vector<std::string> values;
|
||||||
|
// The call simply pass through to base db. It should succeed.
|
||||||
|
ASSERT_OK(
|
||||||
|
blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
|
||||||
|
ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
|
||||||
|
ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
|
||||||
|
.IsNotSupported());
|
||||||
|
ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
|
||||||
|
.IsNotSupported());
|
||||||
|
WriteBatch batch;
|
||||||
|
batch.Put("k1", "v1");
|
||||||
|
batch.Put(handle, "k2", "v2");
|
||||||
|
ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
|
||||||
|
ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
|
||||||
|
ASSERT_TRUE(
|
||||||
|
blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
|
||||||
|
auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
|
||||||
|
{"k1", "k2"}, &values);
|
||||||
|
ASSERT_EQ(2, statuses.size());
|
||||||
|
ASSERT_TRUE(statuses[0].IsNotSupported());
|
||||||
|
ASSERT_TRUE(statuses[1].IsNotSupported());
|
||||||
|
ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
|
||||||
|
delete handle;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace blob_db
|
} // namespace blob_db
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user