From eae53de3b5dba6afb57dbff5a63f1bb21ffffbff Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 8 Sep 2017 10:57:12 -0700 Subject: [PATCH] Make it explicit blob db doesn't support CF Summary: Blob db doesn't currently support column families. Return NotSupported status explicitly. Closes https://github.com/facebook/rocksdb/pull/2825 Differential Revision: D5757438 Pulled By: yiwu-arbug fbshipit-source-id: 44de9408fd032c98e8ae337d4db4ed37169bd9fa --- utilities/blob_db/blob_db.h | 86 +++++++++++++++++----- utilities/blob_db/blob_db_impl.cc | 116 +++++++++++++++--------------- utilities/blob_db/blob_db_impl.h | 42 ++++------- utilities/blob_db/blob_db_test.cc | 33 +++++++++ 4 files changed, 174 insertions(+), 103 deletions(-) diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 8d6725f60..67463d07b 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -85,34 +85,55 @@ struct BlobDBOptions { class BlobDB : public StackableDB { public: using rocksdb::StackableDB::Put; - + virtual Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override = 0; virtual Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override = 0; + const Slice& value) override { + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return Put(options, key, value); + } using rocksdb::StackableDB::Delete; virtual Status Delete(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key) override = 0; - - virtual Status PutWithTTL(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value, uint64_t ttl) = 0; + virtual Status Delete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) override { + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return Delete(options, key); + } virtual Status PutWithTTL(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) = 0; + virtual Status PutWithTTL(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, uint64_t ttl) { - return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl); + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return PutWithTTL(options, key, value, ttl); } // Put with expiration. Key with expiration time equal to // std::numeric_limits::max() means the key don't expire. + virtual Status PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) = 0; virtual Status PutUntil(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value, uint64_t expiration) = 0; - - virtual Status PutUntil(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration) { - return PutUntil(options, DefaultColumnFamily(), key, value, expiration); + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return PutUntil(options, key, value, expiration); } using rocksdb::StackableDB::Get; @@ -123,25 +144,52 @@ class BlobDB : public StackableDB { using rocksdb::StackableDB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, - const std::vector& column_family, const std::vector& keys, std::vector* values) override = 0; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_families, + const std::vector& keys, + std::vector* values) override { + for (auto column_family : column_families) { + if (column_family != DefaultColumnFamily()) { + return std::vector( + column_families.size(), + Status::NotSupported( + "Blob DB doesn't support non-default column family.")); + } + } + return MultiGet(options, keys, values); + } using rocksdb::StackableDB::SingleDelete; - virtual Status SingleDelete(const WriteOptions& wopts, - ColumnFamilyHandle* column_family, - const Slice& key) override = 0; + virtual Status SingleDelete(const WriteOptions& /*wopts*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } using rocksdb::StackableDB::Merge; - virtual Status Merge(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) override { + virtual Status Merge(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*value*/) override { return Status::NotSupported("Not supported operation in blob db."); } virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override = 0; + using rocksdb::StackableDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options) override = 0; + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override { + if (column_family != DefaultColumnFamily()) { + // Blob DB doesn't support non-default column family. + return nullptr; + } + return NewIterator(options); + } + // Starting point for opening a Blob DB. // changed_options - critical. Blob DB loads and inserts listeners // into options which are necessary for recovery and atomicity diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 553f89f2a..777018aef 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -749,32 +749,20 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { return bfile; } -Status BlobDBImpl::Put(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, +Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, const Slice& value) { std::string new_value; Slice value_slice; uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value); - return PutUntil(options, column_family, key, value_slice, expiration); + return PutUntil(options, key, value_slice, expiration); } -Status BlobDBImpl::Delete(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key) { +Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) { SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); - Status s = db_->Delete(options, column_family, key); + Status s = db_->Delete(options, key); // add deleted key to list of keys that have been deleted for book-keeping - delete_keys_q_.enqueue({column_family, key.ToString(), lsn}); - return s; -} - -Status BlobDBImpl::SingleDelete(const WriteOptions& wopts, - ColumnFamilyHandle* column_family, - const Slice& key) { - SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); - Status s = db_->SingleDelete(wopts, column_family, key); - - delete_keys_q_.enqueue({column_family, key.ToString(), lsn}); + delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn}); return s; } @@ -788,10 +776,17 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { std::shared_ptr last_file_; bool has_put_; std::string new_value_; + uint32_t default_cf_id_; public: explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq) - : impl_(impl), sequence_(seq), has_put_(false) {} + : impl_(impl), + sequence_(seq), + has_put_(false), + default_cf_id_(reinterpret_cast( + impl_->DefaultColumnFamily()) + ->cfd() + ->GetID()) {} WriteBatch& updates_blob() { return updates_blob_; } @@ -803,6 +798,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value_slice) override { + if (column_family_id != default_cf_id_) { + batch_rewrite_status_ = Status::NotSupported( + "Blob DB doesn't support non-default column family."); + return batch_rewrite_status_; + } Slice value_unc; uint64_t expiration = impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_); @@ -851,11 +851,28 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + if (column_family_id != default_cf_id_) { + batch_rewrite_status_ = Status::NotSupported( + "Blob DB doesn't support non-default column family."); + return batch_rewrite_status_; + } WriteBatchInternal::Delete(&updates_blob_, column_family_id, key); sequence_++; return Status::OK(); } + virtual Status DeleteRange(uint32_t column_family_id, + const Slice& begin_key, const Slice& end_key) { + if (column_family_id != default_cf_id_) { + batch_rewrite_status_ = Status::NotSupported( + "Blob DB doesn't support non-default column family."); + return batch_rewrite_status_; + } + WriteBatchInternal::DeleteRange(&updates_blob_, column_family_id, + begin_key, end_key); + return Status::OK(); + } + virtual Status SingleDeleteCF(uint32_t /*column_family_id*/, const Slice& /*key*/) override { batch_rewrite_status_ = @@ -932,12 +949,11 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { } Status BlobDBImpl::PutWithTTL(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, uint64_t ttl) { uint64_t now = EpochNow(); assert(std::numeric_limits::max() - now > ttl); - return PutUntil(options, column_family, key, value, now + ttl); + return PutUntil(options, key, value, now + ttl); } Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, @@ -952,8 +968,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, return *compression_output; } -Status BlobDBImpl::PutUntil(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, +Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, const Slice& value_unc, uint64_t expiration) { TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); MutexLock l(&write_mutex_); @@ -992,14 +1007,11 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, bfile->PathName().c_str(), key.ToString().c_str(), value.size(), s.ToString().c_str(), bfile->DumpState().c_str()); - // Fallback just write to the LSM and get going - WriteBatch batch; - batch.Put(column_family, key, value); - return db_->Write(options, &batch); + return s; } WriteBatch batch; - batch.Put(column_family, key, index_entry); + batch.Put(key, index_entry); // this goes to the base db and can be expensive s = db_->Write(options, &batch); @@ -1123,7 +1135,6 @@ Status BlobDBImpl::AppendSN(const std::shared_ptr& bfile, std::vector BlobDBImpl::MultiGet( const ReadOptions& read_options, - const std::vector& column_family, const std::vector& keys, std::vector* values) { // Get a snapshot to avoid blob file get deleted between we // fetch and index entry and reading from the file. @@ -1131,21 +1142,18 @@ std::vector BlobDBImpl::MultiGet( bool snapshot_created = SetSnapshotIfNeeded(&ro); std::vector values_lsm; values_lsm.resize(keys.size()); - auto statuses = db_->MultiGet(ro, column_family, keys, &values_lsm); + auto statuses = db_->MultiGet(ro, keys, &values_lsm); TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:1"); TEST_SYNC_POINT("BlobDBImpl::MultiGet:AfterIndexEntryGet:2"); values->resize(keys.size()); assert(statuses.size() == keys.size()); + assert(values_lsm.size() == keys.size()); for (size_t i = 0; i < keys.size(); ++i) { if (!statuses[i].ok()) { continue; } - - auto cfh = reinterpret_cast(column_family[i]); - auto cfd = cfh->cfd(); - - Status s = CommonGet(cfd, keys[i], values_lsm[i], &((*values)[i])); + Status s = CommonGet(keys[i], values_lsm[i], &((*values)[i])); statuses[i] = s; } if (snapshot_created) { @@ -1163,9 +1171,8 @@ bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { return true; } -Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, - const std::string& index_entry, std::string* value, - SequenceNumber* sequence) { +Status BlobDBImpl::CommonGet(const Slice& key, const std::string& index_entry, + std::string* value, SequenceNumber* sequence) { Slice index_entry_slice(index_entry); BlobHandle handle; Status s = handle.DecodeFrom(&index_entry_slice); @@ -1269,10 +1276,12 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, if (bdb_options_.compression != kNoCompression) { BlockContents contents; + auto cfh = + reinterpret_cast(DefaultColumnFamily()); s = UncompressBlockContentsForCompressionType( blob_value.data(), blob_value.size(), &contents, kBlockBasedTableVersionFormat, Slice(), bdb_options_.compression, - *(cfd->ioptions())); + *(cfh->cfd()->ioptions())); *value = contents.data.ToString(); } } @@ -1299,9 +1308,10 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key, Status BlobDBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { - auto cfh = reinterpret_cast(column_family); - auto cfd = cfh->cfd(); - + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } // Get a snapshot to avoid blob file get deleted between we // fetch and index entry and reading from the file. // TODO(yiwu): For Get() retry if file not found would be a simpler strategy. @@ -1310,11 +1320,11 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, Status s; std::string index_entry; - s = db_->Get(ro, column_family, key, &index_entry); + s = db_->Get(ro, key, &index_entry); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); if (s.ok()) { - s = CommonGet(cfd, key, index_entry, value->GetSelf()); + s = CommonGet(key, index_entry, value->GetSelf()); value->PinSelf(); } if (snapshot_created) { @@ -1324,15 +1334,11 @@ Status BlobDBImpl::Get(const ReadOptions& read_options, } Slice BlobDBIterator::value() const { - Slice index_entry = iter_->value(); - - auto cfh = reinterpret_cast(cfh_); - auto cfd = cfh->cfd(); - TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:1"); TEST_SYNC_POINT("BlobDBIterator::value:BeforeGetBlob:2"); - Status s = db_impl_->CommonGet(cfd, iter_->key(), index_entry.ToString(false), - &vpart_); + Slice index_entry = iter_->value(); + Status s = + db_impl_->CommonGet(iter_->key(), index_entry.ToString(false), &vpart_); return Slice(vpart_); } @@ -2248,14 +2254,13 @@ std::pair BlobDBImpl::RunGC(bool aborted) { return std::make_pair(true, -1); } -Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) { +Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { // Get a snapshot to avoid blob file get deleted between we // fetch and index entry and reading from the file. ReadOptions ro(read_options); bool snapshot_created = SetSnapshotIfNeeded(&ro); - return new BlobDBIterator(db_->NewIterator(ro, column_family), column_family, - this, snapshot_created, ro.snapshot); + return new BlobDBIterator(db_->NewIterator(ro), this, snapshot_created, + ro.snapshot); } Status DestroyBlobDB(const std::string& dbname, const Options& options, @@ -2299,8 +2304,7 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key, if (!s.ok()) { return s; } - auto cfh = reinterpret_cast(DefaultColumnFamily()); - return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence); + return CommonGet(key, index_entry, nullptr, sequence); } std::vector> BlobDBImpl::TEST_GetBlobFiles() const { diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index e7c49b20d..d8dec6d4c 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -205,44 +205,34 @@ class BlobDBImpl : public BlobDB { // how often to schedule check seq files period static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000; - using rocksdb::StackableDB::Put; - Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, const Slice& value) override; + using BlobDB::Put; + Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override; - using rocksdb::StackableDB::Delete; - Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, - const Slice& key) override; + using BlobDB::Delete; + Status Delete(const WriteOptions& options, const Slice& key) override; - using rocksdb::StackableDB::SingleDelete; - virtual Status SingleDelete(const WriteOptions& wopts, - ColumnFamilyHandle* column_family, - const Slice& key) override; - - using rocksdb::StackableDB::Get; + using BlobDB::Get; Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; - using rocksdb::StackableDB::NewIterator; - virtual Iterator* NewIterator(const ReadOptions& read_options, - ColumnFamilyHandle* column_family) override; + using BlobDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& read_options) override; - using rocksdb::StackableDB::MultiGet; + using BlobDB::MultiGet; virtual std::vector MultiGet( const ReadOptions& read_options, - const std::vector& column_family, const std::vector& keys, std::vector* values) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; using BlobDB::PutWithTTL; - Status PutWithTTL(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, + Status PutWithTTL(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) override; using BlobDB::PutUntil; - Status PutUntil(const WriteOptions& options, - ColumnFamilyHandle* column_family, const Slice& key, + Status PutUntil(const WriteOptions& options, const Slice& key, const Slice& value_unc, uint64_t expiration) override; Status LinkToBaseDB(DB* db) override; @@ -282,9 +272,8 @@ class BlobDBImpl : public BlobDB { // Return true if a snapshot is created. bool SetSnapshotIfNeeded(ReadOptions* read_options); - Status CommonGet(const ColumnFamilyData* cfd, const Slice& key, - const std::string& index_entry, std::string* value, - SequenceNumber* sequence = nullptr); + Status CommonGet(const Slice& key, const std::string& index_entry, + std::string* value, SequenceNumber* sequence = nullptr); Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const; @@ -705,11 +694,9 @@ class BlobFile { class BlobDBIterator : public Iterator { public: - explicit BlobDBIterator(Iterator* iter, ColumnFamilyHandle* column_family, - BlobDBImpl* impl, bool own_snapshot, + explicit BlobDBIterator(Iterator* iter, BlobDBImpl* impl, bool own_snapshot, const Snapshot* snapshot) : iter_(iter), - cfh_(column_family), db_impl_(impl), own_snapshot_(own_snapshot), snapshot_(snapshot) { @@ -748,7 +735,6 @@ class BlobDBIterator : public Iterator { private: Iterator* iter_; - ColumnFamilyHandle* cfh_; BlobDBImpl* db_impl_; bool own_snapshot_; const Snapshot* snapshot_; diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 63b08bf45..6e8c9af4b 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -803,6 +803,39 @@ TEST_F(BlobDBTest, ReadWhileGC) { } } +TEST_F(BlobDBTest, ColumnFamilyNotSupported) { + Options options; + options.env = mock_env_.get(); + mock_env_->set_now_micros(0); + Open(BlobDBOptions(), options); + ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily(); + ColumnFamilyHandle *handle = nullptr; + std::string value; + std::vector values; + // The call simply pass through to base db. It should succeed. + ASSERT_OK( + blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle)); + ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported()); + ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60) + .IsNotSupported()); + ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100) + .IsNotSupported()); + WriteBatch batch; + batch.Put("k1", "v1"); + batch.Put(handle, "k2", "v2"); + ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported()); + ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_TRUE( + blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported()); + auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle}, + {"k1", "k2"}, &values); + ASSERT_EQ(2, statuses.size()); + ASSERT_TRUE(statuses[0].IsNotSupported()); + ASSERT_TRUE(statuses[1].IsNotSupported()); + ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle)); + delete handle; +} + } // namespace blob_db } // namespace rocksdb